diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java index d51263573b..bd898b6f05 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java @@ -1,14 +1,19 @@ package org.onlab.onos.store.service.impl; import static com.google.common.base.Preconditions.checkNotNull; +import static org.slf4j.LoggerFactory.getLogger; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import net.kuujo.copycat.Copycat; +import net.kuujo.copycat.event.EventHandler; +import net.kuujo.copycat.event.LeaderElectEvent; import org.onlab.onos.store.service.BatchReadRequest; import org.onlab.onos.store.service.BatchWriteRequest; @@ -16,20 +21,54 @@ import org.onlab.onos.store.service.DatabaseException; import org.onlab.onos.store.service.ReadResult; import org.onlab.onos.store.service.VersionedValue; import org.onlab.onos.store.service.WriteResult; +import org.slf4j.Logger; /** * Client for interacting with the Copycat Raft cluster. */ public class DatabaseClient { + private final Logger log = getLogger(getClass()); + private final Copycat copycat; public DatabaseClient(Copycat copycat) { this.copycat = checkNotNull(copycat); } - public boolean createTable(String tableName) { + public void waitForLeader() { + if (copycat.leader() != null) { + return; + } + log.info("No leader in cluster, waiting for election."); + final CountDownLatch latch = new CountDownLatch(1); + final EventHandler leaderLsnr = new EventHandler() { + + @Override + public void handle(LeaderElectEvent event) { + log.info("Leader chosen: {}", event); + latch.countDown(); + } + }; + + copycat.event(LeaderElectEvent.class).registerHandler(leaderLsnr); + try { + while (copycat.leader() == null) { + latch.await(200, TimeUnit.MILLISECONDS); + } + log.info("Leader appeared: {}", copycat.leader()); + return; + } catch (InterruptedException e) { + log.error("Interrupted while waiting for Leader", e); + Thread.currentThread().interrupt(); + } finally { + copycat.event(LeaderElectEvent.class).unregisterHandler(leaderLsnr); + } + } + + public boolean createTable(String tableName) { + waitForLeader(); CompletableFuture future = copycat.submit("createTable", tableName); try { return future.get(); @@ -39,7 +78,7 @@ public class DatabaseClient { } public boolean createTable(String tableName, int ttlMillis) { - + waitForLeader(); CompletableFuture future = copycat.submit("createTableWithExpiration", tableName); try { return future.get(); @@ -49,7 +88,7 @@ public class DatabaseClient { } public void dropTable(String tableName) { - + waitForLeader(); CompletableFuture future = copycat.submit("dropTable", tableName); try { future.get(); @@ -59,7 +98,7 @@ public class DatabaseClient { } public void dropAllTables() { - + waitForLeader(); CompletableFuture future = copycat.submit("dropAllTables"); try { future.get(); @@ -69,7 +108,7 @@ public class DatabaseClient { } public Set listTables() { - + waitForLeader(); CompletableFuture> future = copycat.submit("listTables"); try { return future.get(); @@ -79,7 +118,7 @@ public class DatabaseClient { } public List batchRead(BatchReadRequest batchRequest) { - + waitForLeader(); CompletableFuture> future = copycat.submit("read", batchRequest); try { return future.get(); @@ -89,7 +128,7 @@ public class DatabaseClient { } public List batchWrite(BatchWriteRequest batchRequest) { - + waitForLeader(); CompletableFuture> future = copycat.submit("write", batchRequest); try { return future.get(); @@ -99,6 +138,7 @@ public class DatabaseClient { } public Map getAll(String tableName) { + waitForLeader(); CompletableFuture> future = copycat.submit("getAll", tableName); try { return future.get(); diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java index d84df22885..645de00164 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java @@ -10,6 +10,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import net.kuujo.copycat.Copycat; @@ -99,7 +100,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { private boolean autoAddMember = false; @Activate - public void activate() { + public void activate() throws InterruptedException, ExecutionException { // TODO: Not every node should be part of the consensus ring. @@ -176,9 +177,10 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker); - copycat.start(); + copycat.start().get(); client = new DatabaseClient(copycat); + client.waitForLeader(); log.info("Started."); }