mirror of
https://github.com/opennetworkinglab/onos.git
synced 2025-12-15 22:31:50 +01:00
DatabaseService that uses Copycat Raft to provide a strongly consistent and durable database.
This commit is contained in:
parent
34c8164a61
commit
08822c4243
18
core/store/dist/pom.xml
vendored
18
core/store/dist/pom.xml
vendored
@ -44,6 +44,24 @@
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>net.kuujo.copycat</groupId>
|
||||
<artifactId>copycat</artifactId>
|
||||
<version>0.4.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>net.kuujo.copycat</groupId>
|
||||
<artifactId>copycat-chronicle</artifactId>
|
||||
<version>0.4.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>net.kuujo.copycat</groupId>
|
||||
<artifactId>copycat-tcp</artifactId>
|
||||
<version>0.4.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
|
||||
35
core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
vendored
Normal file
35
core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
vendored
Normal file
@ -0,0 +1,35 @@
|
||||
package org.onlab.onos.store.service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Service for running administrative tasks on a Database.
|
||||
*/
|
||||
public interface DatabaseAdminService {
|
||||
|
||||
/**
|
||||
* Creates a new table.
|
||||
* Table creation is idempotent. Attempting to create a table
|
||||
* that already exists will be a noop.
|
||||
* @param name table name.
|
||||
* @return true if the table was created by this call, false otherwise.
|
||||
*/
|
||||
public boolean createTable(String name);
|
||||
|
||||
/**
|
||||
* Lists all the tables in the database.
|
||||
* @return list of table names.
|
||||
*/
|
||||
public List<String> listTables();
|
||||
|
||||
/**
|
||||
* Deletes a table from the database.
|
||||
* @param name name of the table to delete.
|
||||
*/
|
||||
public void dropTable(String name);
|
||||
|
||||
/**
|
||||
* Deletes all tables from the database.
|
||||
*/
|
||||
public void dropAllTables();
|
||||
}
|
||||
22
core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseException.java
vendored
Normal file
22
core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseException.java
vendored
Normal file
@ -0,0 +1,22 @@
|
||||
package org.onlab.onos.store.service;
|
||||
|
||||
/**
|
||||
* Base exception type for database failures.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class DatabaseException extends RuntimeException {
|
||||
public DatabaseException(String message, Throwable t) {
|
||||
super(message, t);
|
||||
}
|
||||
|
||||
public DatabaseException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public DatabaseException(Throwable t) {
|
||||
super(t);
|
||||
}
|
||||
|
||||
public DatabaseException() {
|
||||
};
|
||||
}
|
||||
39
core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java
vendored
Normal file
39
core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java
vendored
Normal file
@ -0,0 +1,39 @@
|
||||
package org.onlab.onos.store.service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface DatabaseService {
|
||||
|
||||
/**
|
||||
* Performs a read on the database.
|
||||
* @param request read request.
|
||||
* @return ReadResult
|
||||
* @throws DatabaseException
|
||||
*/
|
||||
ReadResult read(ReadRequest request);
|
||||
|
||||
/**
|
||||
* Performs a batch read operation on the database.
|
||||
* The main advantage of batch read operation is parallelization.
|
||||
* @param batch batch of read requests to execute.
|
||||
* @return
|
||||
*/
|
||||
List<OptionalResult<ReadResult, DatabaseException>> batchRead(List<ReadRequest> batch);
|
||||
|
||||
/**
|
||||
* Performs a write operation on the database.
|
||||
* @param request
|
||||
* @return write result.
|
||||
* @throws DatabaseException
|
||||
*/
|
||||
WriteResult write(WriteRequest request);
|
||||
|
||||
/**
|
||||
* Performs a batch write operation on the database.
|
||||
* Batch write provides transactional semantics. Either all operations
|
||||
* succeed or none of them do.
|
||||
* @param batch batch of write requests to execute as a transaction.
|
||||
* @return result of executing the batch write operation.
|
||||
*/
|
||||
List<OptionalResult<WriteResult, DatabaseException>> batchWrite(List<WriteRequest> batch);
|
||||
}
|
||||
9
core/store/dist/src/main/java/org/onlab/onos/store/service/NoSuchTableException.java
vendored
Normal file
9
core/store/dist/src/main/java/org/onlab/onos/store/service/NoSuchTableException.java
vendored
Normal file
@ -0,0 +1,9 @@
|
||||
package org.onlab.onos.store.service;
|
||||
|
||||
/**
|
||||
* Exception thrown when an operation (read or write) is requested for
|
||||
* a table that does not exist.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class NoSuchTableException extends DatabaseException {
|
||||
}
|
||||
8
core/store/dist/src/main/java/org/onlab/onos/store/service/OptimisticLockException.java
vendored
Normal file
8
core/store/dist/src/main/java/org/onlab/onos/store/service/OptimisticLockException.java
vendored
Normal file
@ -0,0 +1,8 @@
|
||||
package org.onlab.onos.store.service;
|
||||
|
||||
/**
|
||||
* Exception that indicates a optimistic lock failure.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class OptimisticLockException extends PreconditionFailedException {
|
||||
}
|
||||
26
core/store/dist/src/main/java/org/onlab/onos/store/service/OptionalResult.java
vendored
Normal file
26
core/store/dist/src/main/java/org/onlab/onos/store/service/OptionalResult.java
vendored
Normal file
@ -0,0 +1,26 @@
|
||||
package org.onlab.onos.store.service;
|
||||
|
||||
/**
|
||||
* A container object which either has a result or an exception.
|
||||
* <p>
|
||||
* If a result is present, get() will return it otherwise get() will throw
|
||||
* the exception that was encountered in the process of generating the result.
|
||||
*
|
||||
* @param <R> type of result.
|
||||
* @param <E> exception encountered in generating the result.
|
||||
*/
|
||||
public interface OptionalResult<R, E extends Throwable> {
|
||||
|
||||
/**
|
||||
* Returns the result.
|
||||
* @return result
|
||||
* @throws E if there is no valid result.
|
||||
*/
|
||||
public R get();
|
||||
|
||||
/**
|
||||
* Returns true if there is a valid result.
|
||||
* @return true is yes, false otherwise.
|
||||
*/
|
||||
public boolean hasValidResult();
|
||||
}
|
||||
14
core/store/dist/src/main/java/org/onlab/onos/store/service/PreconditionFailedException.java
vendored
Normal file
14
core/store/dist/src/main/java/org/onlab/onos/store/service/PreconditionFailedException.java
vendored
Normal file
@ -0,0 +1,14 @@
|
||||
package org.onlab.onos.store.service;
|
||||
|
||||
/**
|
||||
* Exception that indicates a precondition failure.
|
||||
* <ul>Scenarios that can cause this exception:
|
||||
* <li>An operation that attempts to write a new value iff the current value is equal
|
||||
* to some specified value.</li>
|
||||
* <li>An operation that attempts to write a new value iff the current version
|
||||
* matches a specified value</li>
|
||||
* </ul>
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class PreconditionFailedException extends DatabaseException {
|
||||
}
|
||||
36
core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java
vendored
Normal file
36
core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java
vendored
Normal file
@ -0,0 +1,36 @@
|
||||
package org.onlab.onos.store.service;
|
||||
|
||||
/**
|
||||
* Database read request.
|
||||
*/
|
||||
public class ReadRequest {
|
||||
|
||||
private final String tableName;
|
||||
private final String key;
|
||||
|
||||
public ReadRequest(String tableName, String key) {
|
||||
this.tableName = tableName;
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the name of the table.
|
||||
* @return table name.
|
||||
*/
|
||||
public String tableName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the key.
|
||||
* @return key.
|
||||
*/
|
||||
public String key() {
|
||||
return key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ReadRequest [tableName=" + tableName + ", key=" + key + "]";
|
||||
}
|
||||
}
|
||||
43
core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java
vendored
Normal file
43
core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java
vendored
Normal file
@ -0,0 +1,43 @@
|
||||
package org.onlab.onos.store.service;
|
||||
|
||||
import org.onlab.onos.store.service.impl.VersionedValue;
|
||||
|
||||
/**
|
||||
* Database read result.
|
||||
*/
|
||||
public class ReadResult {
|
||||
|
||||
private final String tableName;
|
||||
private final String key;
|
||||
private final VersionedValue value;
|
||||
|
||||
public ReadResult(String tableName, String key, VersionedValue value) {
|
||||
this.tableName = tableName;
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Database table name.
|
||||
* @return
|
||||
*/
|
||||
public String tableName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Database table key.
|
||||
* @return key.
|
||||
*/
|
||||
public String key() {
|
||||
return key;
|
||||
}
|
||||
|
||||
/**
|
||||
* value associated with the key.
|
||||
* @return non-null value if the table contains one, null otherwise.
|
||||
*/
|
||||
public VersionedValue value() {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
9
core/store/dist/src/main/java/org/onlab/onos/store/service/WriteAborted.java
vendored
Normal file
9
core/store/dist/src/main/java/org/onlab/onos/store/service/WriteAborted.java
vendored
Normal file
@ -0,0 +1,9 @@
|
||||
package org.onlab.onos.store.service;
|
||||
|
||||
/**
|
||||
* Exception that indicates a write operation is aborted.
|
||||
* Aborted operations do not mutate database state is any form.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class WriteAborted extends DatabaseException {
|
||||
}
|
||||
93
core/store/dist/src/main/java/org/onlab/onos/store/service/WriteRequest.java
vendored
Normal file
93
core/store/dist/src/main/java/org/onlab/onos/store/service/WriteRequest.java
vendored
Normal file
@ -0,0 +1,93 @@
|
||||
package org.onlab.onos.store.service;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Database write request.
|
||||
*/
|
||||
public class WriteRequest {
|
||||
|
||||
private final String tableName;
|
||||
private final String key;
|
||||
private final byte[] newValue;
|
||||
private final long previousVersion;
|
||||
private final byte[] oldValue;
|
||||
|
||||
public WriteRequest(String tableName, String key, byte[] newValue) {
|
||||
this(tableName, key, newValue, -1, null);
|
||||
}
|
||||
|
||||
public WriteRequest(String tableName, String key, byte[] newValue, long previousVersion) {
|
||||
this(tableName, key, newValue, previousVersion, null);
|
||||
checkArgument(previousVersion >= 0);
|
||||
}
|
||||
|
||||
public WriteRequest(String tableName, String key, byte[] newValue, byte[] oldValue) {
|
||||
this(tableName, key, newValue, -1, oldValue);
|
||||
}
|
||||
|
||||
private WriteRequest(String tableName, String key, byte[] newValue, long previousVersion, byte[] oldValue) {
|
||||
|
||||
checkArgument(tableName != null);
|
||||
checkArgument(key != null);
|
||||
checkArgument(newValue != null);
|
||||
|
||||
this.tableName = tableName;
|
||||
this.key = key;
|
||||
this.newValue = newValue;
|
||||
this.previousVersion = previousVersion;
|
||||
this.oldValue = oldValue;
|
||||
}
|
||||
|
||||
public String tableName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
public String key() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public byte[] newValue() {
|
||||
return newValue;
|
||||
}
|
||||
|
||||
public long previousVersion() {
|
||||
return previousVersion;
|
||||
}
|
||||
|
||||
public byte[] oldValue() {
|
||||
return oldValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "WriteRequest [tableName=" + tableName + ", key=" + key
|
||||
+ ", newValue=" + newValue
|
||||
+ ", previousVersion=" + previousVersion
|
||||
+ ", oldValue=" + oldValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(key, tableName, previousVersion);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
WriteRequest other = (WriteRequest) obj;
|
||||
return Objects.equals(this.key, other.key) &&
|
||||
Objects.equals(this.tableName, other.tableName) &&
|
||||
Objects.equals(this.previousVersion, other.previousVersion);
|
||||
}
|
||||
}
|
||||
31
core/store/dist/src/main/java/org/onlab/onos/store/service/WriteResult.java
vendored
Normal file
31
core/store/dist/src/main/java/org/onlab/onos/store/service/WriteResult.java
vendored
Normal file
@ -0,0 +1,31 @@
|
||||
package org.onlab.onos.store.service;
|
||||
|
||||
import org.onlab.onos.store.service.impl.VersionedValue;
|
||||
|
||||
/**
|
||||
* Database write result.
|
||||
*/
|
||||
public class WriteResult {
|
||||
|
||||
private final String tableName;
|
||||
private final String key;
|
||||
private final VersionedValue previousValue;
|
||||
|
||||
public WriteResult(String tableName, String key, VersionedValue previousValue) {
|
||||
this.tableName = tableName;
|
||||
this.key = key;
|
||||
this.previousValue = previousValue;
|
||||
}
|
||||
|
||||
public String tableName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
public String key() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public VersionedValue previousValue() {
|
||||
return previousValue;
|
||||
}
|
||||
}
|
||||
144
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
vendored
Normal file
144
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
vendored
Normal file
@ -0,0 +1,144 @@
|
||||
package org.onlab.onos.store.service.impl;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import net.kuujo.copycat.protocol.Response.Status;
|
||||
import net.kuujo.copycat.protocol.SubmitRequest;
|
||||
import net.kuujo.copycat.protocol.SubmitResponse;
|
||||
import net.kuujo.copycat.spi.protocol.ProtocolClient;
|
||||
|
||||
import org.apache.commons.lang3.RandomUtils;
|
||||
import org.onlab.netty.Endpoint;
|
||||
import org.onlab.netty.NettyMessagingService;
|
||||
import org.onlab.onos.store.service.DatabaseException;
|
||||
import org.onlab.onos.store.service.ReadRequest;
|
||||
import org.onlab.onos.store.service.WriteRequest;
|
||||
|
||||
public class DatabaseClient {
|
||||
|
||||
private final Endpoint copycatEp;
|
||||
ProtocolClient client;
|
||||
NettyMessagingService messagingService;
|
||||
|
||||
public DatabaseClient(Endpoint copycatEp) {
|
||||
this.copycatEp = copycatEp;
|
||||
}
|
||||
|
||||
private static String nextId() {
|
||||
return UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
public void activate() throws Exception {
|
||||
messagingService = new NettyMessagingService(RandomUtils.nextInt(10000, 40000));
|
||||
messagingService.activate();
|
||||
client = new NettyProtocolClient(copycatEp, messagingService);
|
||||
}
|
||||
|
||||
public void deactivate() throws Exception {
|
||||
messagingService.deactivate();
|
||||
}
|
||||
|
||||
public boolean createTable(String tableName) {
|
||||
|
||||
SubmitRequest request =
|
||||
new SubmitRequest(
|
||||
nextId(),
|
||||
"createTable",
|
||||
Arrays.asList(tableName));
|
||||
CompletableFuture<SubmitResponse> future = client.submit(request);
|
||||
try {
|
||||
return (boolean) future.get().result();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new DatabaseException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void dropTable(String tableName) {
|
||||
|
||||
SubmitRequest request =
|
||||
new SubmitRequest(
|
||||
nextId(),
|
||||
"dropTable",
|
||||
Arrays.asList(tableName));
|
||||
CompletableFuture<SubmitResponse> future = client.submit(request);
|
||||
try {
|
||||
if (future.get().status() == Status.OK) {
|
||||
throw new DatabaseException(future.get().toString());
|
||||
}
|
||||
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new DatabaseException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void dropAllTables() {
|
||||
|
||||
SubmitRequest request =
|
||||
new SubmitRequest(
|
||||
nextId(),
|
||||
"dropAllTables",
|
||||
Arrays.asList());
|
||||
CompletableFuture<SubmitResponse> future = client.submit(request);
|
||||
try {
|
||||
if (future.get().status() != Status.OK) {
|
||||
throw new DatabaseException(future.get().toString());
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new DatabaseException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<String> listTables() {
|
||||
|
||||
SubmitRequest request =
|
||||
new SubmitRequest(
|
||||
nextId(),
|
||||
"listTables",
|
||||
Arrays.asList());
|
||||
CompletableFuture<SubmitResponse> future = client.submit(request);
|
||||
try {
|
||||
return (List<String>) future.get().result();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new DatabaseException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<InternalReadResult> batchRead(List<ReadRequest> requests) {
|
||||
|
||||
SubmitRequest request = new SubmitRequest(
|
||||
nextId(),
|
||||
"read",
|
||||
Arrays.asList(requests));
|
||||
|
||||
CompletableFuture<SubmitResponse> future = client.submit(request);
|
||||
try {
|
||||
List<InternalReadResult> internalReadResults = (List<InternalReadResult>) future.get().result();
|
||||
return internalReadResults;
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new DatabaseException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<InternalWriteResult> batchWrite(List<WriteRequest> requests) {
|
||||
|
||||
SubmitRequest request = new SubmitRequest(
|
||||
nextId(),
|
||||
"write",
|
||||
Arrays.asList(requests));
|
||||
|
||||
CompletableFuture<SubmitResponse> future = client.submit(request);
|
||||
try {
|
||||
List<InternalWriteResult> internalWriteResults = (List<InternalWriteResult>) future.get().result();
|
||||
return internalWriteResults;
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new DatabaseException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
210
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
vendored
Normal file
210
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
vendored
Normal file
@ -0,0 +1,210 @@
|
||||
package org.onlab.onos.store.service.impl;
|
||||
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import net.kuujo.copycat.Copycat;
|
||||
import net.kuujo.copycat.StateMachine;
|
||||
import net.kuujo.copycat.cluster.TcpCluster;
|
||||
import net.kuujo.copycat.cluster.TcpClusterConfig;
|
||||
import net.kuujo.copycat.cluster.TcpMember;
|
||||
import net.kuujo.copycat.log.ChronicleLog;
|
||||
import net.kuujo.copycat.log.Log;
|
||||
|
||||
import org.apache.felix.scr.annotations.Activate;
|
||||
import org.apache.felix.scr.annotations.Component;
|
||||
import org.apache.felix.scr.annotations.Reference;
|
||||
import org.apache.felix.scr.annotations.ReferenceCardinality;
|
||||
import org.apache.felix.scr.annotations.Service;
|
||||
import org.onlab.netty.Endpoint;
|
||||
import org.onlab.onos.cluster.ClusterService;
|
||||
import org.onlab.onos.cluster.ControllerNode;
|
||||
import org.onlab.onos.store.service.DatabaseAdminService;
|
||||
import org.onlab.onos.store.service.DatabaseException;
|
||||
import org.onlab.onos.store.service.DatabaseService;
|
||||
import org.onlab.onos.store.service.NoSuchTableException;
|
||||
import org.onlab.onos.store.service.OptimisticLockException;
|
||||
import org.onlab.onos.store.service.OptionalResult;
|
||||
import org.onlab.onos.store.service.PreconditionFailedException;
|
||||
import org.onlab.onos.store.service.ReadRequest;
|
||||
import org.onlab.onos.store.service.ReadResult;
|
||||
import org.onlab.onos.store.service.WriteAborted;
|
||||
import org.onlab.onos.store.service.WriteRequest;
|
||||
import org.onlab.onos.store.service.WriteResult;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* Strongly consistent and durable state management service based on
|
||||
* Copycat implementation of Raft consensus protocol.
|
||||
*/
|
||||
@Component(immediate = true)
|
||||
@Service
|
||||
public class DatabaseManager implements DatabaseService, DatabaseAdminService {
|
||||
|
||||
private final Logger log = getLogger(getClass());
|
||||
|
||||
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
|
||||
ClusterService clusterService;
|
||||
|
||||
public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
|
||||
|
||||
private Copycat copycat;
|
||||
private DatabaseClient client;
|
||||
|
||||
@Activate
|
||||
public void activate() {
|
||||
TcpMember localMember =
|
||||
new TcpMember(
|
||||
clusterService.getLocalNode().ip().toString(),
|
||||
clusterService.getLocalNode().tcpPort());
|
||||
List<TcpMember> remoteMembers = Lists.newArrayList();
|
||||
|
||||
for (ControllerNode node : clusterService.getNodes()) {
|
||||
TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
|
||||
if (!member.equals(localMember)) {
|
||||
remoteMembers.add(member);
|
||||
}
|
||||
}
|
||||
|
||||
// Configure the cluster.
|
||||
TcpClusterConfig config = new TcpClusterConfig();
|
||||
|
||||
config.setLocalMember(localMember);
|
||||
config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{}));
|
||||
|
||||
// Create the cluster.
|
||||
TcpCluster cluster = new TcpCluster(config);
|
||||
|
||||
StateMachine stateMachine = new DatabaseStateMachine();
|
||||
ControllerNode thisNode = clusterService.getLocalNode();
|
||||
Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
|
||||
|
||||
copycat = new Copycat(stateMachine, consensusLog, cluster, new NettyProtocol());
|
||||
copycat.start();
|
||||
|
||||
client = new DatabaseClient(new Endpoint(localMember.host(), localMember.port()));
|
||||
|
||||
log.info("Started.");
|
||||
}
|
||||
|
||||
@Activate
|
||||
public void deactivate() {
|
||||
copycat.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean createTable(String name) {
|
||||
return client.createTable(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dropTable(String name) {
|
||||
client.dropTable(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dropAllTables() {
|
||||
client.dropAllTables();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listTables() {
|
||||
return client.listTables();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadResult read(ReadRequest request) {
|
||||
return batchRead(Arrays.asList(request)).get(0).get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
|
||||
List<ReadRequest> batch) {
|
||||
List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
|
||||
for (InternalReadResult internalReadResult : client.batchRead(batch)) {
|
||||
if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
|
||||
readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
|
||||
new NoSuchTableException()));
|
||||
} else {
|
||||
readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
|
||||
internalReadResult.result()));
|
||||
}
|
||||
}
|
||||
return readResults;
|
||||
}
|
||||
|
||||
@Override
|
||||
public WriteResult write(WriteRequest request) {
|
||||
return batchWrite(Arrays.asList(request)).get(0).get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
|
||||
List<WriteRequest> batch) {
|
||||
List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
|
||||
for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
|
||||
if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
|
||||
writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
|
||||
new NoSuchTableException()));
|
||||
} else if (internalWriteResult.status() == InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE) {
|
||||
writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
|
||||
new OptimisticLockException()));
|
||||
} else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
|
||||
// TODO: throw a different exception?
|
||||
writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
|
||||
new PreconditionFailedException()));
|
||||
} else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
|
||||
writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
|
||||
new WriteAborted()));
|
||||
} else {
|
||||
writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
|
||||
internalWriteResult.result()));
|
||||
}
|
||||
}
|
||||
return writeResults;
|
||||
|
||||
}
|
||||
|
||||
private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
|
||||
|
||||
private final R result;
|
||||
private final DatabaseException exception;
|
||||
|
||||
public DatabaseOperationResult(R result) {
|
||||
this.result = result;
|
||||
this.exception = null;
|
||||
}
|
||||
|
||||
public DatabaseOperationResult(DatabaseException exception) {
|
||||
this.result = null;
|
||||
this.exception = exception;
|
||||
}
|
||||
|
||||
@Override
|
||||
public R get() {
|
||||
if (result != null) {
|
||||
return result;
|
||||
}
|
||||
throw exception;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasValidResult() {
|
||||
return result != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (result != null) {
|
||||
return result.toString();
|
||||
} else {
|
||||
return exception.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
169
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
vendored
Normal file
169
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
vendored
Normal file
@ -0,0 +1,169 @@
|
||||
package org.onlab.onos.store.service.impl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import net.kuujo.copycat.Command;
|
||||
import net.kuujo.copycat.Query;
|
||||
import net.kuujo.copycat.StateMachine;
|
||||
|
||||
import org.onlab.onos.store.serializers.KryoSerializer;
|
||||
import org.onlab.onos.store.service.ReadRequest;
|
||||
import org.onlab.onos.store.service.ReadResult;
|
||||
import org.onlab.onos.store.service.WriteRequest;
|
||||
import org.onlab.onos.store.service.WriteResult;
|
||||
import org.onlab.util.KryoNamespace;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
public class DatabaseStateMachine implements StateMachine {
|
||||
|
||||
public static final KryoSerializer SERIALIZER = new KryoSerializer() {
|
||||
@Override
|
||||
protected void setupKryoPool() {
|
||||
serializerPool = KryoNamespace.newBuilder()
|
||||
.register(VersionedValue.class)
|
||||
.register(State.class)
|
||||
.register(NettyProtocol.COMMON)
|
||||
.build()
|
||||
.populate(1);
|
||||
}
|
||||
};
|
||||
|
||||
private State state = new State();
|
||||
|
||||
@Command
|
||||
public boolean createTable(String tableName) {
|
||||
return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null;
|
||||
}
|
||||
|
||||
@Command
|
||||
public boolean dropTable(String tableName) {
|
||||
return state.getTables().remove(tableName) != null;
|
||||
}
|
||||
|
||||
@Command
|
||||
public boolean dropAllTables() {
|
||||
state.getTables().clear();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Query
|
||||
public Set<String> listTables() {
|
||||
return state.getTables().keySet();
|
||||
}
|
||||
|
||||
@Query
|
||||
public List<InternalReadResult> read(List<ReadRequest> requests) {
|
||||
List<InternalReadResult> results = new ArrayList<>(requests.size());
|
||||
for (ReadRequest request : requests) {
|
||||
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
|
||||
if (table == null) {
|
||||
results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
|
||||
continue;
|
||||
}
|
||||
VersionedValue value = table.get(request.key());
|
||||
results.add(new InternalReadResult(
|
||||
InternalReadResult.Status.OK,
|
||||
new ReadResult(
|
||||
request.tableName(),
|
||||
request.key(),
|
||||
value)));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
@Command
|
||||
public List<InternalWriteResult> write(List<WriteRequest> requests) {
|
||||
boolean abort = false;
|
||||
List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
|
||||
for (WriteRequest request : requests) {
|
||||
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
|
||||
if (table == null) {
|
||||
validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE);
|
||||
abort = true;
|
||||
continue;
|
||||
}
|
||||
VersionedValue value = table.get(request.key());
|
||||
if (value == null) {
|
||||
if (request.oldValue() != null) {
|
||||
validationResults.add(InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH);
|
||||
abort = true;
|
||||
continue;
|
||||
} else if (request.previousVersion() >= 0) {
|
||||
validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
|
||||
abort = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (request.previousVersion() >= 0 && value.version() != request.previousVersion()) {
|
||||
validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
|
||||
abort = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
validationResults.add(InternalWriteResult.Status.OK);
|
||||
}
|
||||
|
||||
List<InternalWriteResult> results = new ArrayList<>(requests.size());
|
||||
|
||||
if (abort) {
|
||||
for (InternalWriteResult.Status validationResult : validationResults) {
|
||||
if (validationResult == InternalWriteResult.Status.OK) {
|
||||
results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
|
||||
} else {
|
||||
results.add(new InternalWriteResult(validationResult, null));
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
for (WriteRequest request : requests) {
|
||||
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
|
||||
synchronized (table) {
|
||||
VersionedValue previousValue =
|
||||
table.put(request.key(), new VersionedValue(request.newValue(), state.nextVersion()));
|
||||
results.add(new InternalWriteResult(
|
||||
InternalWriteResult.Status.OK,
|
||||
new WriteResult(request.tableName(), request.key(), previousValue)));
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
public class State {
|
||||
|
||||
private final Map<String, Map<String, VersionedValue>> tables =
|
||||
Maps.newHashMap();
|
||||
private long versionCounter = 1;
|
||||
|
||||
Map<String, Map<String, VersionedValue>> getTables() {
|
||||
return tables;
|
||||
}
|
||||
|
||||
long nextVersion() {
|
||||
return versionCounter++;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] takeSnapshot() {
|
||||
try {
|
||||
return SERIALIZER.encode(state);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void installSnapshot(byte[] data) {
|
||||
try {
|
||||
this.state = SERIALIZER.decode(data);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
33
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalReadResult.java
vendored
Normal file
33
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalReadResult.java
vendored
Normal file
@ -0,0 +1,33 @@
|
||||
package org.onlab.onos.store.service.impl;
|
||||
|
||||
import org.onlab.onos.store.service.ReadResult;
|
||||
|
||||
public class InternalReadResult {
|
||||
|
||||
public enum Status {
|
||||
OK,
|
||||
NO_SUCH_TABLE
|
||||
}
|
||||
|
||||
private final Status status;
|
||||
private final ReadResult result;
|
||||
|
||||
public InternalReadResult(Status status, ReadResult result) {
|
||||
this.status = status;
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
public Status status() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public ReadResult result() {
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "InternalReadResult [status=" + status + ", result=" + result
|
||||
+ "]";
|
||||
}
|
||||
}
|
||||
30
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalWriteResult.java
vendored
Normal file
30
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalWriteResult.java
vendored
Normal file
@ -0,0 +1,30 @@
|
||||
package org.onlab.onos.store.service.impl;
|
||||
|
||||
import org.onlab.onos.store.service.WriteResult;
|
||||
|
||||
public class InternalWriteResult {
|
||||
|
||||
public enum Status {
|
||||
OK,
|
||||
ABORTED,
|
||||
NO_SUCH_TABLE,
|
||||
OPTIMISTIC_LOCK_FAILURE,
|
||||
PREVIOUS_VALUE_MISMATCH
|
||||
}
|
||||
|
||||
private final Status status;
|
||||
private final WriteResult result;
|
||||
|
||||
public InternalWriteResult(Status status, WriteResult result) {
|
||||
this.status = status;
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
public Status status() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public WriteResult result() {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
145
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java
vendored
Normal file
145
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java
vendored
Normal file
@ -0,0 +1,145 @@
|
||||
package org.onlab.onos.store.service.impl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Vector;
|
||||
|
||||
import net.kuujo.copycat.cluster.TcpClusterConfig;
|
||||
import net.kuujo.copycat.cluster.TcpMember;
|
||||
import net.kuujo.copycat.internal.log.ConfigurationEntry;
|
||||
import net.kuujo.copycat.internal.log.CopycatEntry;
|
||||
import net.kuujo.copycat.internal.log.OperationEntry;
|
||||
import net.kuujo.copycat.internal.log.SnapshotEntry;
|
||||
import net.kuujo.copycat.protocol.PingRequest;
|
||||
import net.kuujo.copycat.protocol.PingResponse;
|
||||
import net.kuujo.copycat.protocol.PollRequest;
|
||||
import net.kuujo.copycat.protocol.PollResponse;
|
||||
import net.kuujo.copycat.protocol.Response.Status;
|
||||
import net.kuujo.copycat.protocol.SubmitRequest;
|
||||
import net.kuujo.copycat.protocol.SubmitResponse;
|
||||
import net.kuujo.copycat.protocol.SyncRequest;
|
||||
import net.kuujo.copycat.protocol.SyncResponse;
|
||||
import net.kuujo.copycat.spi.protocol.Protocol;
|
||||
import net.kuujo.copycat.spi.protocol.ProtocolClient;
|
||||
import net.kuujo.copycat.spi.protocol.ProtocolServer;
|
||||
|
||||
import org.onlab.onos.store.serializers.ImmutableListSerializer;
|
||||
import org.onlab.onos.store.serializers.ImmutableMapSerializer;
|
||||
import org.onlab.onos.store.serializers.ImmutableSetSerializer;
|
||||
import org.onlab.onos.store.serializers.KryoSerializer;
|
||||
import org.onlab.onos.store.service.ReadRequest;
|
||||
import org.onlab.onos.store.service.ReadResult;
|
||||
import org.onlab.onos.store.service.WriteRequest;
|
||||
import org.onlab.onos.store.service.WriteResult;
|
||||
import org.onlab.util.KryoNamespace;
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo;
|
||||
import com.esotericsoftware.kryo.io.Input;
|
||||
import com.esotericsoftware.kryo.serializers.CollectionSerializer;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
/**
|
||||
* {@link Protocol} based on {@link org.onlab.netty.NettyMessagingService}.
|
||||
*/
|
||||
public class NettyProtocol implements Protocol<TcpMember> {
|
||||
|
||||
public static final String COPYCAT_PING = "copycat-raft-consensus-ping";
|
||||
public static final String COPYCAT_SYNC = "copycat-raft-consensus-sync";
|
||||
public static final String COPYCAT_POLL = "copycat-raft-consensus-poll";
|
||||
public static final String COPYCAT_SUBMIT = "copycat-raft-consensus-submit";
|
||||
|
||||
// TODO: make this configurable.
|
||||
public static final long RETRY_INTERVAL_MILLIS = 2000;
|
||||
|
||||
private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
|
||||
.register(PingRequest.class)
|
||||
.register(PingResponse.class)
|
||||
.register(PollRequest.class)
|
||||
.register(PollResponse.class)
|
||||
.register(SyncRequest.class)
|
||||
.register(SyncResponse.class)
|
||||
.register(SubmitRequest.class)
|
||||
.register(SubmitResponse.class)
|
||||
.register(Status.class)
|
||||
.register(ConfigurationEntry.class)
|
||||
.register(SnapshotEntry.class)
|
||||
.register(CopycatEntry.class)
|
||||
.register(OperationEntry.class)
|
||||
.register(TcpClusterConfig.class)
|
||||
.register(TcpMember.class)
|
||||
.build();
|
||||
|
||||
// TODO: Move to the right place.
|
||||
private static final KryoNamespace CRAFT = KryoNamespace.newBuilder()
|
||||
.register(ReadRequest.class)
|
||||
.register(WriteRequest.class)
|
||||
.register(InternalReadResult.class)
|
||||
.register(InternalWriteResult.class)
|
||||
.register(InternalReadResult.Status.class)
|
||||
.register(WriteResult.class)
|
||||
.register(ReadResult.class)
|
||||
.register(InternalWriteResult.Status.class)
|
||||
.register(VersionedValue.class)
|
||||
.build();
|
||||
|
||||
public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
|
||||
.register(Arrays.asList().getClass(), new CollectionSerializer() {
|
||||
@Override
|
||||
@SuppressWarnings("rawtypes")
|
||||
protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
|
||||
return new ArrayList();
|
||||
}
|
||||
})
|
||||
.register(ImmutableMap.class, new ImmutableMapSerializer())
|
||||
.register(ImmutableList.class, new ImmutableListSerializer())
|
||||
.register(ImmutableSet.class, new ImmutableSetSerializer())
|
||||
.register(
|
||||
Vector.class,
|
||||
ArrayList.class,
|
||||
Arrays.asList().getClass(),
|
||||
HashMap.class,
|
||||
HashSet.class,
|
||||
LinkedList.class,
|
||||
byte[].class)
|
||||
.build();
|
||||
|
||||
public static final KryoSerializer SERIALIZER = new KryoSerializer() {
|
||||
@Override
|
||||
protected void setupKryoPool() {
|
||||
serializerPool = KryoNamespace.newBuilder()
|
||||
.register(COPYCAT)
|
||||
.register(COMMON)
|
||||
.register(CRAFT)
|
||||
.build()
|
||||
.populate(1);
|
||||
}
|
||||
};
|
||||
|
||||
private NettyProtocolServer server = null;
|
||||
|
||||
// FIXME: This is a total hack.Assumes
|
||||
// ProtocolServer is initialized before ProtocolClient
|
||||
protected NettyProtocolServer getServer() {
|
||||
if (server == null) {
|
||||
throw new IllegalStateException("ProtocolServer is not initialized yet!");
|
||||
}
|
||||
return server;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProtocolServer createServer(TcpMember member) {
|
||||
server = new NettyProtocolServer(member);
|
||||
return server;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProtocolClient createClient(TcpMember member) {
|
||||
return new NettyProtocolClient(this, member);
|
||||
}
|
||||
}
|
||||
148
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java
vendored
Normal file
148
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java
vendored
Normal file
@ -0,0 +1,148 @@
|
||||
package org.onlab.onos.store.service.impl;
|
||||
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import net.kuujo.copycat.cluster.TcpMember;
|
||||
import net.kuujo.copycat.protocol.PingRequest;
|
||||
import net.kuujo.copycat.protocol.PingResponse;
|
||||
import net.kuujo.copycat.protocol.PollRequest;
|
||||
import net.kuujo.copycat.protocol.PollResponse;
|
||||
import net.kuujo.copycat.protocol.SubmitRequest;
|
||||
import net.kuujo.copycat.protocol.SubmitResponse;
|
||||
import net.kuujo.copycat.protocol.SyncRequest;
|
||||
import net.kuujo.copycat.protocol.SyncResponse;
|
||||
import net.kuujo.copycat.spi.protocol.ProtocolClient;
|
||||
|
||||
import org.onlab.netty.Endpoint;
|
||||
import org.onlab.netty.NettyMessagingService;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* {@link NettyMessagingService} based Copycat protocol client.
|
||||
*/
|
||||
public class NettyProtocolClient implements ProtocolClient {
|
||||
|
||||
private final Logger log = getLogger(getClass());
|
||||
private static final ThreadFactory THREAD_FACTORY =
|
||||
new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
|
||||
|
||||
// Remote endpoint, this client instance is used
|
||||
// for communicating with.
|
||||
private final Endpoint remoteEp;
|
||||
private final NettyMessagingService messagingService;
|
||||
|
||||
// TODO: Is 10 the right number of threads?
|
||||
private static final ScheduledExecutorService THREAD_POOL =
|
||||
new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
|
||||
|
||||
public NettyProtocolClient(NettyProtocol protocol, TcpMember member) {
|
||||
this(new Endpoint(member.host(), member.port()), protocol.getServer().getNettyMessagingService());
|
||||
}
|
||||
|
||||
public NettyProtocolClient(Endpoint remoteEp, NettyMessagingService messagingService) {
|
||||
this.remoteEp = remoteEp;
|
||||
this.messagingService = messagingService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<PingResponse> ping(PingRequest request) {
|
||||
return requestReply(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<SyncResponse> sync(SyncRequest request) {
|
||||
return requestReply(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<PollResponse> poll(PollRequest request) {
|
||||
return requestReply(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
|
||||
return requestReply(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> connect() {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> close() {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
public <I> String messageType(I input) {
|
||||
Class<?> clazz = input.getClass();
|
||||
if (clazz.equals(PollRequest.class)) {
|
||||
return NettyProtocol.COPYCAT_POLL;
|
||||
} else if (clazz.equals(SyncRequest.class)) {
|
||||
return NettyProtocol.COPYCAT_SYNC;
|
||||
} else if (clazz.equals(SubmitRequest.class)) {
|
||||
return NettyProtocol.COPYCAT_SUBMIT;
|
||||
} else if (clazz.equals(PingRequest.class)) {
|
||||
return NettyProtocol.COPYCAT_PING;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unknown class " + clazz.getName());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private <I, O> CompletableFuture<O> requestReply(I request) {
|
||||
CompletableFuture<O> future = new CompletableFuture<>();
|
||||
THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
|
||||
return future;
|
||||
}
|
||||
|
||||
private class RPCTask<I, O> implements Runnable {
|
||||
|
||||
private final String messageType;
|
||||
private final byte[] payload;
|
||||
|
||||
private final CompletableFuture<O> future;
|
||||
|
||||
public RPCTask(I request, CompletableFuture<O> future) {
|
||||
this.messageType = messageType(request);
|
||||
this.payload = NettyProtocol.SERIALIZER.encode(request);
|
||||
this.future = future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
byte[] response = messagingService
|
||||
.sendAndReceive(remoteEp, messageType, payload)
|
||||
.get(NettyProtocol.RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
|
||||
future.complete(NettyProtocol.SERIALIZER.decode(response));
|
||||
|
||||
} catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
|
||||
if (messageType.equals(NettyProtocol.COPYCAT_SYNC) ||
|
||||
messageType.equals(NettyProtocol.COPYCAT_PING)) {
|
||||
log.warn("Request to {} failed. Will retry "
|
||||
+ "in {} ms", remoteEp, NettyProtocol.RETRY_INTERVAL_MILLIS);
|
||||
THREAD_POOL.schedule(
|
||||
this,
|
||||
NettyProtocol.RETRY_INTERVAL_MILLIS,
|
||||
TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
115
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java
vendored
Normal file
115
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java
vendored
Normal file
@ -0,0 +1,115 @@
|
||||
package org.onlab.onos.store.service.impl;
|
||||
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import net.kuujo.copycat.cluster.TcpMember;
|
||||
import net.kuujo.copycat.protocol.PingRequest;
|
||||
import net.kuujo.copycat.protocol.PollRequest;
|
||||
import net.kuujo.copycat.protocol.RequestHandler;
|
||||
import net.kuujo.copycat.protocol.SubmitRequest;
|
||||
import net.kuujo.copycat.protocol.SyncRequest;
|
||||
import net.kuujo.copycat.spi.protocol.ProtocolServer;
|
||||
|
||||
import org.onlab.netty.Message;
|
||||
import org.onlab.netty.MessageHandler;
|
||||
import org.onlab.netty.NettyMessagingService;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
/**
|
||||
* {@link NettyMessagingService} based Copycat protocol server.
|
||||
*/
|
||||
public class NettyProtocolServer implements ProtocolServer {
|
||||
|
||||
private final Logger log = getLogger(getClass());
|
||||
|
||||
private final NettyMessagingService messagingService;
|
||||
private RequestHandler handler;
|
||||
|
||||
|
||||
public NettyProtocolServer(TcpMember member) {
|
||||
messagingService = new NettyMessagingService(member.host(), member.port());
|
||||
|
||||
messagingService.registerHandler(NettyProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
|
||||
messagingService.registerHandler(NettyProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
|
||||
messagingService.registerHandler(NettyProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
|
||||
messagingService.registerHandler(NettyProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
|
||||
}
|
||||
|
||||
protected NettyMessagingService getNettyMessagingService() {
|
||||
return messagingService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestHandler(RequestHandler handler) {
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> listen() {
|
||||
try {
|
||||
messagingService.activate();
|
||||
return CompletableFuture.completedFuture(null);
|
||||
} catch (Exception e) {
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
future.completeExceptionally(e);
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> close() {
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
try {
|
||||
messagingService.deactivate();
|
||||
future.complete(null);
|
||||
return future;
|
||||
} catch (Exception e) {
|
||||
future.completeExceptionally(e);
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
||||
private class CopycatMessageHandler<T> implements MessageHandler {
|
||||
|
||||
@Override
|
||||
public void handle(Message message) throws IOException {
|
||||
T request = NettyProtocol.SERIALIZER.decode(message.payload());
|
||||
if (request.getClass().equals(PingRequest.class)) {
|
||||
handler.ping((PingRequest) request).whenComplete((response, error) -> {
|
||||
try {
|
||||
message.respond(NettyProtocol.SERIALIZER.encode(response));
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to respond to ping request", e);
|
||||
}
|
||||
});
|
||||
} else if (request.getClass().equals(PollRequest.class)) {
|
||||
handler.poll((PollRequest) request).whenComplete((response, error) -> {
|
||||
try {
|
||||
message.respond(NettyProtocol.SERIALIZER.encode(response));
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to respond to poll request", e);
|
||||
}
|
||||
});
|
||||
} else if (request.getClass().equals(SyncRequest.class)) {
|
||||
handler.sync((SyncRequest) request).whenComplete((response, error) -> {
|
||||
try {
|
||||
message.respond(NettyProtocol.SERIALIZER.encode(response));
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to respond to sync request", e);
|
||||
}
|
||||
});
|
||||
} else if (request.getClass().equals(SubmitRequest.class)) {
|
||||
handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
|
||||
try {
|
||||
message.respond(NettyProtocol.SERIALIZER.encode(response));
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to respond to submit request", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
44
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/VersionedValue.java
vendored
Normal file
44
core/store/dist/src/main/java/org/onlab/onos/store/service/impl/VersionedValue.java
vendored
Normal file
@ -0,0 +1,44 @@
|
||||
package org.onlab.onos.store.service.impl;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* Wrapper object that holds the object (as byte array) and its version.
|
||||
*/
|
||||
public class VersionedValue {
|
||||
|
||||
private final byte[] value;
|
||||
private final long version;
|
||||
|
||||
/**
|
||||
* Creates a new instance with the specified value and version.
|
||||
* @param value
|
||||
* @param version
|
||||
*/
|
||||
public VersionedValue(byte[] value, long version) {
|
||||
this.value = value;
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the value.
|
||||
* @return value.
|
||||
*/
|
||||
public byte[] value() {
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the version.
|
||||
* @return version.
|
||||
*/
|
||||
public long version() {
|
||||
return version;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "VersionedValue [value=" + Arrays.toString(value) + ", version="
|
||||
+ version + "]";
|
||||
}
|
||||
}
|
||||
@ -32,9 +32,15 @@ public class RoleReplyInfo {
|
||||
this.genId = genId;
|
||||
this.xid = xid;
|
||||
}
|
||||
public RoleState getRole() { return role; }
|
||||
public U64 getGenId() { return genId; }
|
||||
public long getXid() { return xid; }
|
||||
public RoleState getRole() {
|
||||
return role;
|
||||
}
|
||||
public U64 getGenId() {
|
||||
return genId;
|
||||
}
|
||||
public long getXid() {
|
||||
return xid;
|
||||
}
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[Role:" + role + " GenId:" + genId + " Xid:" + xid + "]";
|
||||
|
||||
@ -347,7 +347,7 @@ class RoleManager implements RoleHandler {
|
||||
|
||||
RoleState role = null;
|
||||
OFNiciraControllerRole ncr = nrr.getRole();
|
||||
switch(ncr) {
|
||||
switch (ncr) {
|
||||
case ROLE_MASTER:
|
||||
role = RoleState.MASTER;
|
||||
break;
|
||||
@ -383,7 +383,7 @@ class RoleManager implements RoleHandler {
|
||||
throws SwitchStateException {
|
||||
OFControllerRole cr = rrmsg.getRole();
|
||||
RoleState role = null;
|
||||
switch(cr) {
|
||||
switch (cr) {
|
||||
case ROLE_EQUAL:
|
||||
role = RoleState.EQUAL;
|
||||
break;
|
||||
|
||||
8
pom.xml
8
pom.xml
@ -414,7 +414,7 @@
|
||||
<plugin>
|
||||
<groupId>org.apache.felix</groupId>
|
||||
<artifactId>maven-bundle-plugin</artifactId>
|
||||
<version>2.3.7</version>
|
||||
<version>2.5.3</version>
|
||||
<extensions>true</extensions>
|
||||
</plugin>
|
||||
|
||||
@ -493,6 +493,12 @@
|
||||
<artifactId>onos-build-conf</artifactId>
|
||||
<version>1.0</version>
|
||||
</dependency>
|
||||
<!-- For Java 8 lambda support-->
|
||||
<dependency>
|
||||
<groupId>com.puppycrawl.tools</groupId>
|
||||
<artifactId>checkstyle</artifactId>
|
||||
<version>5.9</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<configuration>
|
||||
<configLocation>onos/checkstyle.xml</configLocation>
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user