diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml index eebb8ffcc4..846583f61f 100644 --- a/core/store/dist/pom.xml +++ b/core/store/dist/pom.xml @@ -44,6 +44,24 @@ ${project.version} + + net.kuujo.copycat + copycat + 0.4.0-SNAPSHOT + + + + net.kuujo.copycat + copycat-chronicle + 0.4.0-SNAPSHOT + + + + net.kuujo.copycat + copycat-tcp + 0.4.0-SNAPSHOT + + com.fasterxml.jackson.core jackson-databind diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java new file mode 100644 index 0000000000..7cb5f557b8 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java @@ -0,0 +1,35 @@ +package org.onlab.onos.store.service; + +import java.util.List; + +/** + * Service for running administrative tasks on a Database. + */ +public interface DatabaseAdminService { + + /** + * Creates a new table. + * Table creation is idempotent. Attempting to create a table + * that already exists will be a noop. + * @param name table name. + * @return true if the table was created by this call, false otherwise. + */ + public boolean createTable(String name); + + /** + * Lists all the tables in the database. + * @return list of table names. + */ + public List listTables(); + + /** + * Deletes a table from the database. + * @param name name of the table to delete. + */ + public void dropTable(String name); + + /** + * Deletes all tables from the database. + */ + public void dropAllTables(); +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseException.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseException.java new file mode 100644 index 0000000000..bbc2dafe22 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseException.java @@ -0,0 +1,22 @@ +package org.onlab.onos.store.service; + +/** + * Base exception type for database failures. + */ +@SuppressWarnings("serial") +public class DatabaseException extends RuntimeException { + public DatabaseException(String message, Throwable t) { + super(message, t); + } + + public DatabaseException(String message) { + super(message); + } + + public DatabaseException(Throwable t) { + super(t); + } + + public DatabaseException() { + }; +} \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java new file mode 100644 index 0000000000..08bfa36875 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java @@ -0,0 +1,39 @@ +package org.onlab.onos.store.service; + +import java.util.List; + +public interface DatabaseService { + + /** + * Performs a read on the database. + * @param request read request. + * @return ReadResult + * @throws DatabaseException + */ + ReadResult read(ReadRequest request); + + /** + * Performs a batch read operation on the database. + * The main advantage of batch read operation is parallelization. + * @param batch batch of read requests to execute. + * @return + */ + List> batchRead(List batch); + + /** + * Performs a write operation on the database. + * @param request + * @return write result. + * @throws DatabaseException + */ + WriteResult write(WriteRequest request); + + /** + * Performs a batch write operation on the database. + * Batch write provides transactional semantics. Either all operations + * succeed or none of them do. + * @param batch batch of write requests to execute as a transaction. + * @return result of executing the batch write operation. + */ + List> batchWrite(List batch); +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/NoSuchTableException.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/NoSuchTableException.java new file mode 100644 index 0000000000..fad17ce4b2 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/NoSuchTableException.java @@ -0,0 +1,9 @@ +package org.onlab.onos.store.service; + +/** + * Exception thrown when an operation (read or write) is requested for + * a table that does not exist. + */ +@SuppressWarnings("serial") +public class NoSuchTableException extends DatabaseException { +} \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/OptimisticLockException.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/OptimisticLockException.java new file mode 100644 index 0000000000..090eb63274 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/OptimisticLockException.java @@ -0,0 +1,8 @@ +package org.onlab.onos.store.service; + +/** + * Exception that indicates a optimistic lock failure. + */ +@SuppressWarnings("serial") +public class OptimisticLockException extends PreconditionFailedException { +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/OptionalResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/OptionalResult.java new file mode 100644 index 0000000000..fa38e3fffd --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/OptionalResult.java @@ -0,0 +1,26 @@ +package org.onlab.onos.store.service; + +/** + * A container object which either has a result or an exception. + *

+ * If a result is present, get() will return it otherwise get() will throw + * the exception that was encountered in the process of generating the result. + * + * @param type of result. + * @param exception encountered in generating the result. + */ +public interface OptionalResult { + + /** + * Returns the result. + * @return result + * @throws E if there is no valid result. + */ + public R get(); + + /** + * Returns true if there is a valid result. + * @return true is yes, false otherwise. + */ + public boolean hasValidResult(); +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/PreconditionFailedException.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/PreconditionFailedException.java new file mode 100644 index 0000000000..f16fc47e54 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/PreconditionFailedException.java @@ -0,0 +1,14 @@ +package org.onlab.onos.store.service; + +/** + * Exception that indicates a precondition failure. + *

    Scenarios that can cause this exception: + *
  • An operation that attempts to write a new value iff the current value is equal + * to some specified value.
  • + *
  • An operation that attempts to write a new value iff the current version + * matches a specified value
  • + *
+ */ +@SuppressWarnings("serial") +public class PreconditionFailedException extends DatabaseException { +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java new file mode 100644 index 0000000000..6c0ba30094 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java @@ -0,0 +1,36 @@ +package org.onlab.onos.store.service; + +/** + * Database read request. + */ +public class ReadRequest { + + private final String tableName; + private final String key; + + public ReadRequest(String tableName, String key) { + this.tableName = tableName; + this.key = key; + } + + /** + * Return the name of the table. + * @return table name. + */ + public String tableName() { + return tableName; + } + + /** + * Returns the key. + * @return key. + */ + public String key() { + return key; + } + + @Override + public String toString() { + return "ReadRequest [tableName=" + tableName + ", key=" + key + "]"; + } +} \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java new file mode 100644 index 0000000000..2d7649a371 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java @@ -0,0 +1,43 @@ +package org.onlab.onos.store.service; + +import org.onlab.onos.store.service.impl.VersionedValue; + +/** + * Database read result. + */ +public class ReadResult { + + private final String tableName; + private final String key; + private final VersionedValue value; + + public ReadResult(String tableName, String key, VersionedValue value) { + this.tableName = tableName; + this.key = key; + this.value = value; + } + + /** + * Database table name. + * @return + */ + public String tableName() { + return tableName; + } + + /** + * Database table key. + * @return key. + */ + public String key() { + return key; + } + + /** + * value associated with the key. + * @return non-null value if the table contains one, null otherwise. + */ + public VersionedValue value() { + return value; + } +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteAborted.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteAborted.java new file mode 100644 index 0000000000..a7d3fe32f4 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteAborted.java @@ -0,0 +1,9 @@ +package org.onlab.onos.store.service; + +/** + * Exception that indicates a write operation is aborted. + * Aborted operations do not mutate database state is any form. + */ +@SuppressWarnings("serial") +public class WriteAborted extends DatabaseException { +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteRequest.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteRequest.java new file mode 100644 index 0000000000..7314e4f653 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteRequest.java @@ -0,0 +1,93 @@ +package org.onlab.onos.store.service; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.Objects; + +/** + * Database write request. + */ +public class WriteRequest { + + private final String tableName; + private final String key; + private final byte[] newValue; + private final long previousVersion; + private final byte[] oldValue; + + public WriteRequest(String tableName, String key, byte[] newValue) { + this(tableName, key, newValue, -1, null); + } + + public WriteRequest(String tableName, String key, byte[] newValue, long previousVersion) { + this(tableName, key, newValue, previousVersion, null); + checkArgument(previousVersion >= 0); + } + + public WriteRequest(String tableName, String key, byte[] newValue, byte[] oldValue) { + this(tableName, key, newValue, -1, oldValue); + } + + private WriteRequest(String tableName, String key, byte[] newValue, long previousVersion, byte[] oldValue) { + + checkArgument(tableName != null); + checkArgument(key != null); + checkArgument(newValue != null); + + this.tableName = tableName; + this.key = key; + this.newValue = newValue; + this.previousVersion = previousVersion; + this.oldValue = oldValue; + } + + public String tableName() { + return tableName; + } + + public String key() { + return key; + } + + public byte[] newValue() { + return newValue; + } + + public long previousVersion() { + return previousVersion; + } + + public byte[] oldValue() { + return oldValue; + } + + @Override + public String toString() { + return "WriteRequest [tableName=" + tableName + ", key=" + key + + ", newValue=" + newValue + + ", previousVersion=" + previousVersion + + ", oldValue=" + oldValue; + } + + @Override + public int hashCode() { + return Objects.hash(key, tableName, previousVersion); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + WriteRequest other = (WriteRequest) obj; + return Objects.equals(this.key, other.key) && + Objects.equals(this.tableName, other.tableName) && + Objects.equals(this.previousVersion, other.previousVersion); + } +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteResult.java new file mode 100644 index 0000000000..64e9b74c04 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteResult.java @@ -0,0 +1,31 @@ +package org.onlab.onos.store.service; + +import org.onlab.onos.store.service.impl.VersionedValue; + +/** + * Database write result. + */ +public class WriteResult { + + private final String tableName; + private final String key; + private final VersionedValue previousValue; + + public WriteResult(String tableName, String key, VersionedValue previousValue) { + this.tableName = tableName; + this.key = key; + this.previousValue = previousValue; + } + + public String tableName() { + return tableName; + } + + public String key() { + return key; + } + + public VersionedValue previousValue() { + return previousValue; + } +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java new file mode 100644 index 0000000000..3c92800a56 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java @@ -0,0 +1,144 @@ +package org.onlab.onos.store.service.impl; + +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import net.kuujo.copycat.protocol.Response.Status; +import net.kuujo.copycat.protocol.SubmitRequest; +import net.kuujo.copycat.protocol.SubmitResponse; +import net.kuujo.copycat.spi.protocol.ProtocolClient; + +import org.apache.commons.lang3.RandomUtils; +import org.onlab.netty.Endpoint; +import org.onlab.netty.NettyMessagingService; +import org.onlab.onos.store.service.DatabaseException; +import org.onlab.onos.store.service.ReadRequest; +import org.onlab.onos.store.service.WriteRequest; + +public class DatabaseClient { + + private final Endpoint copycatEp; + ProtocolClient client; + NettyMessagingService messagingService; + + public DatabaseClient(Endpoint copycatEp) { + this.copycatEp = copycatEp; + } + + private static String nextId() { + return UUID.randomUUID().toString(); + } + + public void activate() throws Exception { + messagingService = new NettyMessagingService(RandomUtils.nextInt(10000, 40000)); + messagingService.activate(); + client = new NettyProtocolClient(copycatEp, messagingService); + } + + public void deactivate() throws Exception { + messagingService.deactivate(); + } + + public boolean createTable(String tableName) { + + SubmitRequest request = + new SubmitRequest( + nextId(), + "createTable", + Arrays.asList(tableName)); + CompletableFuture future = client.submit(request); + try { + return (boolean) future.get().result(); + } catch (InterruptedException | ExecutionException e) { + throw new DatabaseException(e); + } + } + + public void dropTable(String tableName) { + + SubmitRequest request = + new SubmitRequest( + nextId(), + "dropTable", + Arrays.asList(tableName)); + CompletableFuture future = client.submit(request); + try { + if (future.get().status() == Status.OK) { + throw new DatabaseException(future.get().toString()); + } + + } catch (InterruptedException | ExecutionException e) { + throw new DatabaseException(e); + } + } + + public void dropAllTables() { + + SubmitRequest request = + new SubmitRequest( + nextId(), + "dropAllTables", + Arrays.asList()); + CompletableFuture future = client.submit(request); + try { + if (future.get().status() != Status.OK) { + throw new DatabaseException(future.get().toString()); + } + } catch (InterruptedException | ExecutionException e) { + throw new DatabaseException(e); + } + } + + @SuppressWarnings("unchecked") + public List listTables() { + + SubmitRequest request = + new SubmitRequest( + nextId(), + "listTables", + Arrays.asList()); + CompletableFuture future = client.submit(request); + try { + return (List) future.get().result(); + } catch (InterruptedException | ExecutionException e) { + throw new DatabaseException(e); + } + } + + @SuppressWarnings("unchecked") + public List batchRead(List requests) { + + SubmitRequest request = new SubmitRequest( + nextId(), + "read", + Arrays.asList(requests)); + + CompletableFuture future = client.submit(request); + try { + List internalReadResults = (List) future.get().result(); + return internalReadResults; + } catch (InterruptedException | ExecutionException e) { + throw new DatabaseException(e); + } + } + + @SuppressWarnings("unchecked") + public List batchWrite(List requests) { + + SubmitRequest request = new SubmitRequest( + nextId(), + "write", + Arrays.asList(requests)); + + CompletableFuture future = client.submit(request); + try { + List internalWriteResults = (List) future.get().result(); + return internalWriteResults; + } catch (InterruptedException | ExecutionException e) { + throw new DatabaseException(e); + } + } +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java new file mode 100644 index 0000000000..00ce12d818 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java @@ -0,0 +1,210 @@ +package org.onlab.onos.store.service.impl; + +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import net.kuujo.copycat.Copycat; +import net.kuujo.copycat.StateMachine; +import net.kuujo.copycat.cluster.TcpCluster; +import net.kuujo.copycat.cluster.TcpClusterConfig; +import net.kuujo.copycat.cluster.TcpMember; +import net.kuujo.copycat.log.ChronicleLog; +import net.kuujo.copycat.log.Log; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.netty.Endpoint; +import org.onlab.onos.cluster.ClusterService; +import org.onlab.onos.cluster.ControllerNode; +import org.onlab.onos.store.service.DatabaseAdminService; +import org.onlab.onos.store.service.DatabaseException; +import org.onlab.onos.store.service.DatabaseService; +import org.onlab.onos.store.service.NoSuchTableException; +import org.onlab.onos.store.service.OptimisticLockException; +import org.onlab.onos.store.service.OptionalResult; +import org.onlab.onos.store.service.PreconditionFailedException; +import org.onlab.onos.store.service.ReadRequest; +import org.onlab.onos.store.service.ReadResult; +import org.onlab.onos.store.service.WriteAborted; +import org.onlab.onos.store.service.WriteRequest; +import org.onlab.onos.store.service.WriteResult; +import org.slf4j.Logger; + +import com.google.common.collect.Lists; + +/** + * Strongly consistent and durable state management service based on + * Copycat implementation of Raft consensus protocol. + */ +@Component(immediate = true) +@Service +public class DatabaseManager implements DatabaseService, DatabaseAdminService { + + private final Logger log = getLogger(getClass()); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + ClusterService clusterService; + + public static final String LOG_FILE_PREFIX = "onos-copy-cat-log"; + + private Copycat copycat; + private DatabaseClient client; + + @Activate + public void activate() { + TcpMember localMember = + new TcpMember( + clusterService.getLocalNode().ip().toString(), + clusterService.getLocalNode().tcpPort()); + List remoteMembers = Lists.newArrayList(); + + for (ControllerNode node : clusterService.getNodes()) { + TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort()); + if (!member.equals(localMember)) { + remoteMembers.add(member); + } + } + + // Configure the cluster. + TcpClusterConfig config = new TcpClusterConfig(); + + config.setLocalMember(localMember); + config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{})); + + // Create the cluster. + TcpCluster cluster = new TcpCluster(config); + + StateMachine stateMachine = new DatabaseStateMachine(); + ControllerNode thisNode = clusterService.getLocalNode(); + Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id()); + + copycat = new Copycat(stateMachine, consensusLog, cluster, new NettyProtocol()); + copycat.start(); + + client = new DatabaseClient(new Endpoint(localMember.host(), localMember.port())); + + log.info("Started."); + } + + @Activate + public void deactivate() { + copycat.stop(); + } + + @Override + public boolean createTable(String name) { + return client.createTable(name); + } + + @Override + public void dropTable(String name) { + client.dropTable(name); + } + + @Override + public void dropAllTables() { + client.dropAllTables(); + } + + @Override + public List listTables() { + return client.listTables(); + } + + @Override + public ReadResult read(ReadRequest request) { + return batchRead(Arrays.asList(request)).get(0).get(); + } + + @Override + public List> batchRead( + List batch) { + List> readResults = new ArrayList<>(batch.size()); + for (InternalReadResult internalReadResult : client.batchRead(batch)) { + if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) { + readResults.add(new DatabaseOperationResult( + new NoSuchTableException())); + } else { + readResults.add(new DatabaseOperationResult( + internalReadResult.result())); + } + } + return readResults; + } + + @Override + public WriteResult write(WriteRequest request) { + return batchWrite(Arrays.asList(request)).get(0).get(); + } + + @Override + public List> batchWrite( + List batch) { + List> writeResults = new ArrayList<>(batch.size()); + for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) { + if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) { + writeResults.add(new DatabaseOperationResult( + new NoSuchTableException())); + } else if (internalWriteResult.status() == InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE) { + writeResults.add(new DatabaseOperationResult( + new OptimisticLockException())); + } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) { + // TODO: throw a different exception? + writeResults.add(new DatabaseOperationResult( + new PreconditionFailedException())); + } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) { + writeResults.add(new DatabaseOperationResult( + new WriteAborted())); + } else { + writeResults.add(new DatabaseOperationResult( + internalWriteResult.result())); + } + } + return writeResults; + + } + + private class DatabaseOperationResult implements OptionalResult { + + private final R result; + private final DatabaseException exception; + + public DatabaseOperationResult(R result) { + this.result = result; + this.exception = null; + } + + public DatabaseOperationResult(DatabaseException exception) { + this.result = null; + this.exception = exception; + } + + @Override + public R get() { + if (result != null) { + return result; + } + throw exception; + } + + @Override + public boolean hasValidResult() { + return result != null; + } + + @Override + public String toString() { + if (result != null) { + return result.toString(); + } else { + return exception.toString(); + } + } + } +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java new file mode 100644 index 0000000000..cbca729ea9 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java @@ -0,0 +1,169 @@ +package org.onlab.onos.store.service.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import net.kuujo.copycat.Command; +import net.kuujo.copycat.Query; +import net.kuujo.copycat.StateMachine; + +import org.onlab.onos.store.serializers.KryoSerializer; +import org.onlab.onos.store.service.ReadRequest; +import org.onlab.onos.store.service.ReadResult; +import org.onlab.onos.store.service.WriteRequest; +import org.onlab.onos.store.service.WriteResult; +import org.onlab.util.KryoNamespace; + +import com.google.common.collect.Maps; + +public class DatabaseStateMachine implements StateMachine { + + public static final KryoSerializer SERIALIZER = new KryoSerializer() { + @Override + protected void setupKryoPool() { + serializerPool = KryoNamespace.newBuilder() + .register(VersionedValue.class) + .register(State.class) + .register(NettyProtocol.COMMON) + .build() + .populate(1); + } + }; + + private State state = new State(); + + @Command + public boolean createTable(String tableName) { + return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null; + } + + @Command + public boolean dropTable(String tableName) { + return state.getTables().remove(tableName) != null; + } + + @Command + public boolean dropAllTables() { + state.getTables().clear(); + return true; + } + + @Query + public Set listTables() { + return state.getTables().keySet(); + } + + @Query + public List read(List requests) { + List results = new ArrayList<>(requests.size()); + for (ReadRequest request : requests) { + Map table = state.getTables().get(request.tableName()); + if (table == null) { + results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null)); + continue; + } + VersionedValue value = table.get(request.key()); + results.add(new InternalReadResult( + InternalReadResult.Status.OK, + new ReadResult( + request.tableName(), + request.key(), + value))); + } + return results; + } + + @Command + public List write(List requests) { + boolean abort = false; + List validationResults = new ArrayList<>(requests.size()); + for (WriteRequest request : requests) { + Map table = state.getTables().get(request.tableName()); + if (table == null) { + validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE); + abort = true; + continue; + } + VersionedValue value = table.get(request.key()); + if (value == null) { + if (request.oldValue() != null) { + validationResults.add(InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH); + abort = true; + continue; + } else if (request.previousVersion() >= 0) { + validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE); + abort = true; + continue; + } + } + if (request.previousVersion() >= 0 && value.version() != request.previousVersion()) { + validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE); + abort = true; + continue; + } + + validationResults.add(InternalWriteResult.Status.OK); + } + + List results = new ArrayList<>(requests.size()); + + if (abort) { + for (InternalWriteResult.Status validationResult : validationResults) { + if (validationResult == InternalWriteResult.Status.OK) { + results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null)); + } else { + results.add(new InternalWriteResult(validationResult, null)); + } + } + return results; + } + + for (WriteRequest request : requests) { + Map table = state.getTables().get(request.tableName()); + synchronized (table) { + VersionedValue previousValue = + table.put(request.key(), new VersionedValue(request.newValue(), state.nextVersion())); + results.add(new InternalWriteResult( + InternalWriteResult.Status.OK, + new WriteResult(request.tableName(), request.key(), previousValue))); + } + } + return results; + } + + public class State { + + private final Map> tables = + Maps.newHashMap(); + private long versionCounter = 1; + + Map> getTables() { + return tables; + } + + long nextVersion() { + return versionCounter++; + } + } + + @Override + public byte[] takeSnapshot() { + try { + return SERIALIZER.encode(state); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + } + + @Override + public void installSnapshot(byte[] data) { + try { + this.state = SERIALIZER.decode(data); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalReadResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalReadResult.java new file mode 100644 index 0000000000..f6fcf51bca --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalReadResult.java @@ -0,0 +1,33 @@ +package org.onlab.onos.store.service.impl; + +import org.onlab.onos.store.service.ReadResult; + +public class InternalReadResult { + + public enum Status { + OK, + NO_SUCH_TABLE + } + + private final Status status; + private final ReadResult result; + + public InternalReadResult(Status status, ReadResult result) { + this.status = status; + this.result = result; + } + + public Status status() { + return status; + } + + public ReadResult result() { + return result; + } + + @Override + public String toString() { + return "InternalReadResult [status=" + status + ", result=" + result + + "]"; + } +} \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalWriteResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalWriteResult.java new file mode 100644 index 0000000000..e6dbb1f970 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalWriteResult.java @@ -0,0 +1,30 @@ +package org.onlab.onos.store.service.impl; + +import org.onlab.onos.store.service.WriteResult; + +public class InternalWriteResult { + + public enum Status { + OK, + ABORTED, + NO_SUCH_TABLE, + OPTIMISTIC_LOCK_FAILURE, + PREVIOUS_VALUE_MISMATCH + } + + private final Status status; + private final WriteResult result; + + public InternalWriteResult(Status status, WriteResult result) { + this.status = status; + this.result = result; + } + + public Status status() { + return status; + } + + public WriteResult result() { + return result; + } +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java new file mode 100644 index 0000000000..9855ec6e91 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java @@ -0,0 +1,145 @@ +package org.onlab.onos.store.service.impl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Vector; + +import net.kuujo.copycat.cluster.TcpClusterConfig; +import net.kuujo.copycat.cluster.TcpMember; +import net.kuujo.copycat.internal.log.ConfigurationEntry; +import net.kuujo.copycat.internal.log.CopycatEntry; +import net.kuujo.copycat.internal.log.OperationEntry; +import net.kuujo.copycat.internal.log.SnapshotEntry; +import net.kuujo.copycat.protocol.PingRequest; +import net.kuujo.copycat.protocol.PingResponse; +import net.kuujo.copycat.protocol.PollRequest; +import net.kuujo.copycat.protocol.PollResponse; +import net.kuujo.copycat.protocol.Response.Status; +import net.kuujo.copycat.protocol.SubmitRequest; +import net.kuujo.copycat.protocol.SubmitResponse; +import net.kuujo.copycat.protocol.SyncRequest; +import net.kuujo.copycat.protocol.SyncResponse; +import net.kuujo.copycat.spi.protocol.Protocol; +import net.kuujo.copycat.spi.protocol.ProtocolClient; +import net.kuujo.copycat.spi.protocol.ProtocolServer; + +import org.onlab.onos.store.serializers.ImmutableListSerializer; +import org.onlab.onos.store.serializers.ImmutableMapSerializer; +import org.onlab.onos.store.serializers.ImmutableSetSerializer; +import org.onlab.onos.store.serializers.KryoSerializer; +import org.onlab.onos.store.service.ReadRequest; +import org.onlab.onos.store.service.ReadResult; +import org.onlab.onos.store.service.WriteRequest; +import org.onlab.onos.store.service.WriteResult; +import org.onlab.util.KryoNamespace; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.serializers.CollectionSerializer; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +/** + * {@link Protocol} based on {@link org.onlab.netty.NettyMessagingService}. + */ +public class NettyProtocol implements Protocol { + + public static final String COPYCAT_PING = "copycat-raft-consensus-ping"; + public static final String COPYCAT_SYNC = "copycat-raft-consensus-sync"; + public static final String COPYCAT_POLL = "copycat-raft-consensus-poll"; + public static final String COPYCAT_SUBMIT = "copycat-raft-consensus-submit"; + + // TODO: make this configurable. + public static final long RETRY_INTERVAL_MILLIS = 2000; + + private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder() + .register(PingRequest.class) + .register(PingResponse.class) + .register(PollRequest.class) + .register(PollResponse.class) + .register(SyncRequest.class) + .register(SyncResponse.class) + .register(SubmitRequest.class) + .register(SubmitResponse.class) + .register(Status.class) + .register(ConfigurationEntry.class) + .register(SnapshotEntry.class) + .register(CopycatEntry.class) + .register(OperationEntry.class) + .register(TcpClusterConfig.class) + .register(TcpMember.class) + .build(); + + // TODO: Move to the right place. + private static final KryoNamespace CRAFT = KryoNamespace.newBuilder() + .register(ReadRequest.class) + .register(WriteRequest.class) + .register(InternalReadResult.class) + .register(InternalWriteResult.class) + .register(InternalReadResult.Status.class) + .register(WriteResult.class) + .register(ReadResult.class) + .register(InternalWriteResult.Status.class) + .register(VersionedValue.class) + .build(); + + public static final KryoNamespace COMMON = KryoNamespace.newBuilder() + .register(Arrays.asList().getClass(), new CollectionSerializer() { + @Override + @SuppressWarnings("rawtypes") + protected Collection create(Kryo kryo, Input input, Class type) { + return new ArrayList(); + } + }) + .register(ImmutableMap.class, new ImmutableMapSerializer()) + .register(ImmutableList.class, new ImmutableListSerializer()) + .register(ImmutableSet.class, new ImmutableSetSerializer()) + .register( + Vector.class, + ArrayList.class, + Arrays.asList().getClass(), + HashMap.class, + HashSet.class, + LinkedList.class, + byte[].class) + .build(); + + public static final KryoSerializer SERIALIZER = new KryoSerializer() { + @Override + protected void setupKryoPool() { + serializerPool = KryoNamespace.newBuilder() + .register(COPYCAT) + .register(COMMON) + .register(CRAFT) + .build() + .populate(1); + } + }; + + private NettyProtocolServer server = null; + + // FIXME: This is a total hack.Assumes + // ProtocolServer is initialized before ProtocolClient + protected NettyProtocolServer getServer() { + if (server == null) { + throw new IllegalStateException("ProtocolServer is not initialized yet!"); + } + return server; + } + + @Override + public ProtocolServer createServer(TcpMember member) { + server = new NettyProtocolServer(member); + return server; + } + + @Override + public ProtocolClient createClient(TcpMember member) { + return new NettyProtocolClient(this, member); + } +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java new file mode 100644 index 0000000000..a791990c2d --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java @@ -0,0 +1,148 @@ +package org.onlab.onos.store.service.impl; + +import static org.slf4j.LoggerFactory.getLogger; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import net.kuujo.copycat.cluster.TcpMember; +import net.kuujo.copycat.protocol.PingRequest; +import net.kuujo.copycat.protocol.PingResponse; +import net.kuujo.copycat.protocol.PollRequest; +import net.kuujo.copycat.protocol.PollResponse; +import net.kuujo.copycat.protocol.SubmitRequest; +import net.kuujo.copycat.protocol.SubmitResponse; +import net.kuujo.copycat.protocol.SyncRequest; +import net.kuujo.copycat.protocol.SyncResponse; +import net.kuujo.copycat.spi.protocol.ProtocolClient; + +import org.onlab.netty.Endpoint; +import org.onlab.netty.NettyMessagingService; +import org.slf4j.Logger; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * {@link NettyMessagingService} based Copycat protocol client. + */ +public class NettyProtocolClient implements ProtocolClient { + + private final Logger log = getLogger(getClass()); + private static final ThreadFactory THREAD_FACTORY = + new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build(); + + // Remote endpoint, this client instance is used + // for communicating with. + private final Endpoint remoteEp; + private final NettyMessagingService messagingService; + + // TODO: Is 10 the right number of threads? + private static final ScheduledExecutorService THREAD_POOL = + new ScheduledThreadPoolExecutor(10, THREAD_FACTORY); + + public NettyProtocolClient(NettyProtocol protocol, TcpMember member) { + this(new Endpoint(member.host(), member.port()), protocol.getServer().getNettyMessagingService()); + } + + public NettyProtocolClient(Endpoint remoteEp, NettyMessagingService messagingService) { + this.remoteEp = remoteEp; + this.messagingService = messagingService; + } + + @Override + public CompletableFuture ping(PingRequest request) { + return requestReply(request); + } + + @Override + public CompletableFuture sync(SyncRequest request) { + return requestReply(request); + } + + @Override + public CompletableFuture poll(PollRequest request) { + return requestReply(request); + } + + @Override + public CompletableFuture submit(SubmitRequest request) { + return requestReply(request); + } + + @Override + public CompletableFuture connect() { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture close() { + return CompletableFuture.completedFuture(null); + } + + public String messageType(I input) { + Class clazz = input.getClass(); + if (clazz.equals(PollRequest.class)) { + return NettyProtocol.COPYCAT_POLL; + } else if (clazz.equals(SyncRequest.class)) { + return NettyProtocol.COPYCAT_SYNC; + } else if (clazz.equals(SubmitRequest.class)) { + return NettyProtocol.COPYCAT_SUBMIT; + } else if (clazz.equals(PingRequest.class)) { + return NettyProtocol.COPYCAT_PING; + } else { + throw new IllegalArgumentException("Unknown class " + clazz.getName()); + } + + } + + private CompletableFuture requestReply(I request) { + CompletableFuture future = new CompletableFuture<>(); + THREAD_POOL.schedule(new RPCTask(request, future), 0, TimeUnit.MILLISECONDS); + return future; + } + + private class RPCTask implements Runnable { + + private final String messageType; + private final byte[] payload; + + private final CompletableFuture future; + + public RPCTask(I request, CompletableFuture future) { + this.messageType = messageType(request); + this.payload = NettyProtocol.SERIALIZER.encode(request); + this.future = future; + } + + @Override + public void run() { + try { + byte[] response = messagingService + .sendAndReceive(remoteEp, messageType, payload) + .get(NettyProtocol.RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + future.complete(NettyProtocol.SERIALIZER.decode(response)); + + } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) { + if (messageType.equals(NettyProtocol.COPYCAT_SYNC) || + messageType.equals(NettyProtocol.COPYCAT_PING)) { + log.warn("Request to {} failed. Will retry " + + "in {} ms", remoteEp, NettyProtocol.RETRY_INTERVAL_MILLIS); + THREAD_POOL.schedule( + this, + NettyProtocol.RETRY_INTERVAL_MILLIS, + TimeUnit.MILLISECONDS); + } else { + future.completeExceptionally(e); + } + } catch (Exception e) { + future.completeExceptionally(e); + } + } + } +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java new file mode 100644 index 0000000000..d06999e9e4 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java @@ -0,0 +1,115 @@ +package org.onlab.onos.store.service.impl; + +import static org.slf4j.LoggerFactory.getLogger; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import net.kuujo.copycat.cluster.TcpMember; +import net.kuujo.copycat.protocol.PingRequest; +import net.kuujo.copycat.protocol.PollRequest; +import net.kuujo.copycat.protocol.RequestHandler; +import net.kuujo.copycat.protocol.SubmitRequest; +import net.kuujo.copycat.protocol.SyncRequest; +import net.kuujo.copycat.spi.protocol.ProtocolServer; + +import org.onlab.netty.Message; +import org.onlab.netty.MessageHandler; +import org.onlab.netty.NettyMessagingService; +import org.slf4j.Logger; + +/** + * {@link NettyMessagingService} based Copycat protocol server. + */ +public class NettyProtocolServer implements ProtocolServer { + + private final Logger log = getLogger(getClass()); + + private final NettyMessagingService messagingService; + private RequestHandler handler; + + + public NettyProtocolServer(TcpMember member) { + messagingService = new NettyMessagingService(member.host(), member.port()); + + messagingService.registerHandler(NettyProtocol.COPYCAT_PING, new CopycatMessageHandler()); + messagingService.registerHandler(NettyProtocol.COPYCAT_SYNC, new CopycatMessageHandler()); + messagingService.registerHandler(NettyProtocol.COPYCAT_POLL, new CopycatMessageHandler()); + messagingService.registerHandler(NettyProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler()); + } + + protected NettyMessagingService getNettyMessagingService() { + return messagingService; + } + + @Override + public void requestHandler(RequestHandler handler) { + this.handler = handler; + } + + @Override + public CompletableFuture listen() { + try { + messagingService.activate(); + return CompletableFuture.completedFuture(null); + } catch (Exception e) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(e); + return future; + } + } + + @Override + public CompletableFuture close() { + CompletableFuture future = new CompletableFuture<>(); + try { + messagingService.deactivate(); + future.complete(null); + return future; + } catch (Exception e) { + future.completeExceptionally(e); + return future; + } + } + + private class CopycatMessageHandler implements MessageHandler { + + @Override + public void handle(Message message) throws IOException { + T request = NettyProtocol.SERIALIZER.decode(message.payload()); + if (request.getClass().equals(PingRequest.class)) { + handler.ping((PingRequest) request).whenComplete((response, error) -> { + try { + message.respond(NettyProtocol.SERIALIZER.encode(response)); + } catch (Exception e) { + log.error("Failed to respond to ping request", e); + } + }); + } else if (request.getClass().equals(PollRequest.class)) { + handler.poll((PollRequest) request).whenComplete((response, error) -> { + try { + message.respond(NettyProtocol.SERIALIZER.encode(response)); + } catch (Exception e) { + log.error("Failed to respond to poll request", e); + } + }); + } else if (request.getClass().equals(SyncRequest.class)) { + handler.sync((SyncRequest) request).whenComplete((response, error) -> { + try { + message.respond(NettyProtocol.SERIALIZER.encode(response)); + } catch (Exception e) { + log.error("Failed to respond to sync request", e); + } + }); + } else if (request.getClass().equals(SubmitRequest.class)) { + handler.submit((SubmitRequest) request).whenComplete((response, error) -> { + try { + message.respond(NettyProtocol.SERIALIZER.encode(response)); + } catch (Exception e) { + log.error("Failed to respond to submit request", e); + } + }); + } + } + } +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/VersionedValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/VersionedValue.java new file mode 100644 index 0000000000..31bdcc257d --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/VersionedValue.java @@ -0,0 +1,44 @@ +package org.onlab.onos.store.service.impl; + +import java.util.Arrays; + +/** + * Wrapper object that holds the object (as byte array) and its version. + */ +public class VersionedValue { + + private final byte[] value; + private final long version; + + /** + * Creates a new instance with the specified value and version. + * @param value + * @param version + */ + public VersionedValue(byte[] value, long version) { + this.value = value; + this.version = version; + } + + /** + * Returns the value. + * @return value. + */ + public byte[] value() { + return value; + } + + /** + * Returns the version. + * @return version. + */ + public long version() { + return version; + } + + @Override + public String toString() { + return "VersionedValue [value=" + Arrays.toString(value) + ", version=" + + version + "]"; + } +} diff --git a/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/RoleReplyInfo.java b/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/RoleReplyInfo.java index b408c44ad9..6efb3331af 100644 --- a/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/RoleReplyInfo.java +++ b/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/RoleReplyInfo.java @@ -32,9 +32,15 @@ public class RoleReplyInfo { this.genId = genId; this.xid = xid; } - public RoleState getRole() { return role; } - public U64 getGenId() { return genId; } - public long getXid() { return xid; } + public RoleState getRole() { + return role; + } + public U64 getGenId() { + return genId; + } + public long getXid() { + return xid; + } @Override public String toString() { return "[Role:" + role + " GenId:" + genId + " Xid:" + xid + "]"; diff --git a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/RoleManager.java b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/RoleManager.java index 6e5b236b46..734e0ba784 100644 --- a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/RoleManager.java +++ b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/RoleManager.java @@ -347,7 +347,7 @@ class RoleManager implements RoleHandler { RoleState role = null; OFNiciraControllerRole ncr = nrr.getRole(); - switch(ncr) { + switch (ncr) { case ROLE_MASTER: role = RoleState.MASTER; break; @@ -383,7 +383,7 @@ class RoleManager implements RoleHandler { throws SwitchStateException { OFControllerRole cr = rrmsg.getRole(); RoleState role = null; - switch(cr) { + switch (cr) { case ROLE_EQUAL: role = RoleState.EQUAL; break; diff --git a/pom.xml b/pom.xml index e84c8462d9..dfc86a3da9 100644 --- a/pom.xml +++ b/pom.xml @@ -414,7 +414,7 @@ org.apache.felix maven-bundle-plugin - 2.3.7 + 2.5.3 true @@ -493,6 +493,12 @@ onos-build-conf 1.0
+ + + com.puppycrawl.tools + checkstyle + 5.9 + onos/checkstyle.xml