From 9b19a82ceddc9a2ca178db45b49bcc087fa4e2e8 Mon Sep 17 00:00:00 2001 From: Madan Jampani Date: Tue, 4 Nov 2014 21:37:13 -0800 Subject: [PATCH] Copycat messaging new happens over the same cluster messaging used for all other ONOS p2p communication --- .../onos/store/service/DatabaseService.java | 2 +- .../onlab/onos/store/service/ReadResult.java | 8 +- .../onos/store/service/VersionedValue.java | 4 +- ...col.java => ClusterMessagingProtocol.java} | 96 +++++++++++---- ...va => ClusterMessagingProtocolClient.java} | 89 ++++++++------ .../impl/ClusterMessagingProtocolServer.java | 110 +++++++++++++++++ .../store/service/impl/DatabaseClient.java | 21 +--- .../store/service/impl/DatabaseManager.java | 13 +- .../service/impl/DatabaseStateMachine.java | 2 +- .../service/impl/NettyProtocolServer.java | 115 ------------------ 10 files changed, 254 insertions(+), 206 deletions(-) rename core/store/dist/src/main/java/org/onlab/onos/store/service/impl/{NettyProtocol.java => ClusterMessagingProtocol.java} (58%) rename core/store/dist/src/main/java/org/onlab/onos/store/service/impl/{NettyProtocolClient.java => ClusterMessagingProtocolClient.java} (56%) create mode 100644 core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java delete mode 100644 core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java index 449fd71dd0..11cc5ffa6e 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java @@ -26,7 +26,7 @@ public interface DatabaseService { /** * Performs a write operation on the database. - * @param request + * @param request write request * @return write result. * @throws DatabaseException if there is failure in execution write. */ diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java index 0db5dfedf8..33b57d2447 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java @@ -17,15 +17,15 @@ public class ReadResult { } /** - * Database table name. - * @return + * Returns database table name. + * @return table name. */ public String tableName() { return tableName; } /** - * Database table key. + * Returns database table key. * @return key. */ public String key() { @@ -33,7 +33,7 @@ public class ReadResult { } /** - * value associated with the key. + * Returns value associated with the key. * @return non-null value if the table contains one, null otherwise. */ public VersionedValue value() { diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java index ee1d0f0761..d88d35ec7a 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java @@ -12,8 +12,8 @@ public class VersionedValue { /** * Creates a new instance with the specified value and version. - * @param value - * @param version + * @param value value + * @param version version */ public VersionedValue(byte[] value, long version) { this.value = value; diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java similarity index 58% rename from core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java rename to core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java index 9a2259abb8..2e7fe11903 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java @@ -1,5 +1,8 @@ package org.onlab.onos.store.service.impl; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.slf4j.LoggerFactory.getLogger; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -27,6 +30,16 @@ import net.kuujo.copycat.spi.protocol.Protocol; import net.kuujo.copycat.spi.protocol.ProtocolClient; import net.kuujo.copycat.spi.protocol.ProtocolServer; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.onos.cluster.ClusterService; +import org.onlab.onos.cluster.ControllerNode; +import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; +import org.onlab.onos.store.cluster.messaging.MessageSubject; import org.onlab.onos.store.serializers.ImmutableListSerializer; import org.onlab.onos.store.serializers.ImmutableMapSerializer; import org.onlab.onos.store.serializers.ImmutableSetSerializer; @@ -37,6 +50,7 @@ import org.onlab.onos.store.service.VersionedValue; import org.onlab.onos.store.service.WriteRequest; import org.onlab.onos.store.service.WriteResult; import org.onlab.util.KryoNamespace; +import org.slf4j.Logger; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; @@ -46,17 +60,44 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; /** - * {@link Protocol} based on {@link org.onlab.netty.NettyMessagingService}. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under + * the License. */ -public class NettyProtocol implements Protocol { - public static final String COPYCAT_PING = "copycat-raft-consensus-ping"; - public static final String COPYCAT_SYNC = "copycat-raft-consensus-sync"; - public static final String COPYCAT_POLL = "copycat-raft-consensus-poll"; - public static final String COPYCAT_SUBMIT = "copycat-raft-consensus-submit"; +@Component(immediate = true) +@Service +public class ClusterMessagingProtocol implements Protocol { - // TODO: make this configurable. - public static final long RETRY_INTERVAL_MILLIS = 2000; + private final Logger log = getLogger(getClass()); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + ClusterCommunicationService clusterCommunicator; + + public static final MessageSubject COPYCAT_PING = + new MessageSubject("copycat-raft-consensus-ping"); + public static final MessageSubject COPYCAT_SYNC = + new MessageSubject("copycat-raft-consensus-sync"); + public static final MessageSubject COPYCAT_POLL = + new MessageSubject("copycat-raft-consensus-poll"); + public static final MessageSubject COPYCAT_SUBMIT = + new MessageSubject("copycat-raft-consensus-submit"); private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder() .register(PingRequest.class) @@ -76,8 +117,7 @@ public class NettyProtocol implements Protocol { .register(TcpMember.class) .build(); - // TODO: Move to the right place. - private static final KryoNamespace CRAFT = KryoNamespace.newBuilder() + private static final KryoNamespace DATABASE = KryoNamespace.newBuilder() .register(ReadRequest.class) .register(WriteRequest.class) .register(InternalReadResult.class) @@ -116,31 +156,41 @@ public class NettyProtocol implements Protocol { serializerPool = KryoNamespace.newBuilder() .register(COPYCAT) .register(COMMON) - .register(CRAFT) + .register(DATABASE) .build() .populate(1); } }; - private NettyProtocolServer server = null; + @Activate + public void activate() { + log.info("Started."); + } - // FIXME: This is a total hack.Assumes - // ProtocolServer is initialized before ProtocolClient - protected NettyProtocolServer getServer() { - if (server == null) { - throw new IllegalStateException("ProtocolServer is not initialized yet!"); - } - return server; + @Deactivate + public void deactivate() { + log.info("Stopped."); } @Override public ProtocolServer createServer(TcpMember member) { - server = new NettyProtocolServer(member); - return server; + return new ClusterMessagingProtocolServer(clusterCommunicator); } @Override public ProtocolClient createClient(TcpMember member) { - return new NettyProtocolClient(this, member); + ControllerNode node = getControllerNode(member.host(), member.port()); + checkNotNull(node, "A valid controller node is expected"); + return new ClusterMessagingProtocolClient( + clusterCommunicator, node); } -} + + private ControllerNode getControllerNode(String host, int port) { + for (ControllerNode node : clusterService.getNodes()) { + if (node.ip().toString().equals(host) && node.tcpPort() == port) { + return node; + } + } + return null; + } +} \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java similarity index 56% rename from core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java rename to core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java index a791990c2d..f6384445ed 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java @@ -11,7 +11,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import net.kuujo.copycat.cluster.TcpMember; import net.kuujo.copycat.protocol.PingRequest; import net.kuujo.copycat.protocol.PingResponse; import net.kuujo.copycat.protocol.PollRequest; @@ -22,37 +21,54 @@ import net.kuujo.copycat.protocol.SyncRequest; import net.kuujo.copycat.protocol.SyncResponse; import net.kuujo.copycat.spi.protocol.ProtocolClient; -import org.onlab.netty.Endpoint; -import org.onlab.netty.NettyMessagingService; +import org.onlab.onos.cluster.ControllerNode; +import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; +import org.onlab.onos.store.cluster.messaging.ClusterMessage; +import org.onlab.onos.store.cluster.messaging.MessageSubject; import org.slf4j.Logger; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** - * {@link NettyMessagingService} based Copycat protocol client. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under + * the License. */ -public class NettyProtocolClient implements ProtocolClient { + +public class ClusterMessagingProtocolClient implements ProtocolClient { private final Logger log = getLogger(getClass()); + private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build(); - // Remote endpoint, this client instance is used - // for communicating with. - private final Endpoint remoteEp; - private final NettyMessagingService messagingService; + public static final long RETRY_INTERVAL_MILLIS = 2000; - // TODO: Is 10 the right number of threads? + private final ClusterCommunicationService clusterCommunicator; + private final ControllerNode remoteNode; + + // FIXME: Thread pool sizing. private static final ScheduledExecutorService THREAD_POOL = new ScheduledThreadPoolExecutor(10, THREAD_FACTORY); - public NettyProtocolClient(NettyProtocol protocol, TcpMember member) { - this(new Endpoint(member.host(), member.port()), protocol.getServer().getNettyMessagingService()); - } - - public NettyProtocolClient(Endpoint remoteEp, NettyMessagingService messagingService) { - this.remoteEp = remoteEp; - this.messagingService = messagingService; + public ClusterMessagingProtocolClient( + ClusterCommunicationService clusterCommunicator, + ControllerNode remoteNode) { + this.clusterCommunicator = clusterCommunicator; + this.remoteNode = remoteNode; } @Override @@ -85,16 +101,16 @@ public class NettyProtocolClient implements ProtocolClient { return CompletableFuture.completedFuture(null); } - public String messageType(I input) { + public MessageSubject messageType(I input) { Class clazz = input.getClass(); if (clazz.equals(PollRequest.class)) { - return NettyProtocol.COPYCAT_POLL; + return ClusterMessagingProtocol.COPYCAT_POLL; } else if (clazz.equals(SyncRequest.class)) { - return NettyProtocol.COPYCAT_SYNC; + return ClusterMessagingProtocol.COPYCAT_SYNC; } else if (clazz.equals(SubmitRequest.class)) { - return NettyProtocol.COPYCAT_SUBMIT; + return ClusterMessagingProtocol.COPYCAT_SUBMIT; } else if (clazz.equals(PingRequest.class)) { - return NettyProtocol.COPYCAT_PING; + return ClusterMessagingProtocol.COPYCAT_PING; } else { throw new IllegalArgumentException("Unknown class " + clazz.getName()); } @@ -109,33 +125,34 @@ public class NettyProtocolClient implements ProtocolClient { private class RPCTask implements Runnable { - private final String messageType; - private final byte[] payload; - + private final ClusterMessage message; private final CompletableFuture future; public RPCTask(I request, CompletableFuture future) { - this.messageType = messageType(request); - this.payload = NettyProtocol.SERIALIZER.encode(request); + this.message = + new ClusterMessage( + null, + messageType(request), + ClusterMessagingProtocol.SERIALIZER.encode(request)); this.future = future; } @Override public void run() { try { - byte[] response = messagingService - .sendAndReceive(remoteEp, messageType, payload) - .get(NettyProtocol.RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); - future.complete(NettyProtocol.SERIALIZER.decode(response)); + byte[] response = clusterCommunicator + .sendAndReceive(message, remoteNode.id()) + .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + future.complete(ClusterMessagingProtocol.SERIALIZER.decode(response)); } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) { - if (messageType.equals(NettyProtocol.COPYCAT_SYNC) || - messageType.equals(NettyProtocol.COPYCAT_PING)) { + if (message.subject().equals(ClusterMessagingProtocol.COPYCAT_SYNC) || + message.subject().equals(ClusterMessagingProtocol.COPYCAT_PING)) { log.warn("Request to {} failed. Will retry " - + "in {} ms", remoteEp, NettyProtocol.RETRY_INTERVAL_MILLIS); + + "in {} ms", remoteNode, RETRY_INTERVAL_MILLIS); THREAD_POOL.schedule( this, - NettyProtocol.RETRY_INTERVAL_MILLIS, + RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); } else { future.completeExceptionally(e); @@ -145,4 +162,4 @@ public class NettyProtocolClient implements ProtocolClient { } } } -} +} \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java new file mode 100644 index 0000000000..0449b8a6b1 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java @@ -0,0 +1,110 @@ +package org.onlab.onos.store.service.impl; + +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.concurrent.CompletableFuture; + +import net.kuujo.copycat.protocol.PingRequest; +import net.kuujo.copycat.protocol.PollRequest; +import net.kuujo.copycat.protocol.RequestHandler; +import net.kuujo.copycat.protocol.SubmitRequest; +import net.kuujo.copycat.protocol.SyncRequest; +import net.kuujo.copycat.spi.protocol.ProtocolServer; + +import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; +import org.onlab.onos.store.cluster.messaging.ClusterMessage; +import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; +import org.slf4j.Logger; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under + * the License. + */ + +public class ClusterMessagingProtocolServer implements ProtocolServer { + + private final Logger log = getLogger(getClass()); + private RequestHandler handler; + + public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) { + + clusterCommunicator.addSubscriber( + ClusterMessagingProtocol.COPYCAT_PING, new CopycatMessageHandler()); + clusterCommunicator.addSubscriber( + ClusterMessagingProtocol.COPYCAT_SYNC, new CopycatMessageHandler()); + clusterCommunicator.addSubscriber( + ClusterMessagingProtocol.COPYCAT_POLL, new CopycatMessageHandler()); + clusterCommunicator.addSubscriber( + ClusterMessagingProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler()); + } + + @Override + public void requestHandler(RequestHandler handler) { + this.handler = handler; + } + + @Override + public CompletableFuture listen() { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture close() { + return CompletableFuture.completedFuture(null); + } + + private class CopycatMessageHandler implements ClusterMessageHandler { + + @Override + public void handle(ClusterMessage message) { + T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload()); + if (request.getClass().equals(PingRequest.class)) { + handler.ping((PingRequest) request).whenComplete((response, error) -> { + try { + message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response)); + } catch (Exception e) { + log.error("Failed to respond to ping request", e); + } + }); + } else if (request.getClass().equals(PollRequest.class)) { + handler.poll((PollRequest) request).whenComplete((response, error) -> { + try { + message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response)); + } catch (Exception e) { + log.error("Failed to respond to poll request", e); + } + }); + } else if (request.getClass().equals(SyncRequest.class)) { + handler.sync((SyncRequest) request).whenComplete((response, error) -> { + try { + message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response)); + } catch (Exception e) { + log.error("Failed to respond to sync request", e); + } + }); + } else if (request.getClass().equals(SubmitRequest.class)) { + handler.submit((SubmitRequest) request).whenComplete((response, error) -> { + try { + message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response)); + } catch (Exception e) { + log.error("Failed to respond to submit request", e); + } + }); + } + } + } +} \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java index 3c92800a56..d07d1d3197 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java @@ -11,37 +11,22 @@ import net.kuujo.copycat.protocol.SubmitRequest; import net.kuujo.copycat.protocol.SubmitResponse; import net.kuujo.copycat.spi.protocol.ProtocolClient; -import org.apache.commons.lang3.RandomUtils; -import org.onlab.netty.Endpoint; -import org.onlab.netty.NettyMessagingService; import org.onlab.onos.store.service.DatabaseException; import org.onlab.onos.store.service.ReadRequest; import org.onlab.onos.store.service.WriteRequest; public class DatabaseClient { - private final Endpoint copycatEp; - ProtocolClient client; - NettyMessagingService messagingService; + private final ProtocolClient client; - public DatabaseClient(Endpoint copycatEp) { - this.copycatEp = copycatEp; + public DatabaseClient(ProtocolClient client) { + this.client = client; } private static String nextId() { return UUID.randomUUID().toString(); } - public void activate() throws Exception { - messagingService = new NettyMessagingService(RandomUtils.nextInt(10000, 40000)); - messagingService.activate(); - client = new NettyProtocolClient(copycatEp, messagingService); - } - - public void deactivate() throws Exception { - messagingService.deactivate(); - } - public boolean createTable(String tableName) { SubmitRequest request = diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java index 8042cc6893..44d304176b 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java @@ -18,7 +18,6 @@ import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.onlab.netty.Endpoint; import org.onlab.onos.cluster.ClusterService; import org.onlab.onos.cluster.ControllerNode; import org.onlab.onos.store.service.DatabaseAdminService; @@ -50,6 +49,9 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) ClusterService clusterService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + ClusterMessagingProtocol copycatMessagingProtocol; + public static final String LOG_FILE_PREFIX = "onos-copy-cat-log"; private Copycat copycat; @@ -57,15 +59,14 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { @Activate public void activate() { - // FIXME hack tcpPort +1 for copycat communication TcpMember localMember = new TcpMember( clusterService.getLocalNode().ip().toString(), - clusterService.getLocalNode().tcpPort() + 1); + clusterService.getLocalNode().tcpPort()); List remoteMembers = Lists.newArrayList(); for (ControllerNode node : clusterService.getNodes()) { - TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort() + 1); + TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort()); if (!member.equals(localMember)) { remoteMembers.add(member); } @@ -84,10 +85,10 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ControllerNode thisNode = clusterService.getLocalNode(); Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id()); - copycat = new Copycat(stateMachine, consensusLog, cluster, new NettyProtocol()); + copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol); copycat.start(); - client = new DatabaseClient(new Endpoint(localMember.host(), localMember.port())); + client = new DatabaseClient(copycatMessagingProtocol.createClient(localMember)); log.info("Started."); } diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java index 663e9e4fa9..c9744863c2 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java @@ -27,7 +27,7 @@ public class DatabaseStateMachine implements StateMachine { serializerPool = KryoNamespace.newBuilder() .register(VersionedValue.class) .register(State.class) - .register(NettyProtocol.COMMON) + .register(ClusterMessagingProtocol.COMMON) .build() .populate(1); } diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java deleted file mode 100644 index d06999e9e4..0000000000 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java +++ /dev/null @@ -1,115 +0,0 @@ -package org.onlab.onos.store.service.impl; - -import static org.slf4j.LoggerFactory.getLogger; - -import java.io.IOException; -import java.util.concurrent.CompletableFuture; - -import net.kuujo.copycat.cluster.TcpMember; -import net.kuujo.copycat.protocol.PingRequest; -import net.kuujo.copycat.protocol.PollRequest; -import net.kuujo.copycat.protocol.RequestHandler; -import net.kuujo.copycat.protocol.SubmitRequest; -import net.kuujo.copycat.protocol.SyncRequest; -import net.kuujo.copycat.spi.protocol.ProtocolServer; - -import org.onlab.netty.Message; -import org.onlab.netty.MessageHandler; -import org.onlab.netty.NettyMessagingService; -import org.slf4j.Logger; - -/** - * {@link NettyMessagingService} based Copycat protocol server. - */ -public class NettyProtocolServer implements ProtocolServer { - - private final Logger log = getLogger(getClass()); - - private final NettyMessagingService messagingService; - private RequestHandler handler; - - - public NettyProtocolServer(TcpMember member) { - messagingService = new NettyMessagingService(member.host(), member.port()); - - messagingService.registerHandler(NettyProtocol.COPYCAT_PING, new CopycatMessageHandler()); - messagingService.registerHandler(NettyProtocol.COPYCAT_SYNC, new CopycatMessageHandler()); - messagingService.registerHandler(NettyProtocol.COPYCAT_POLL, new CopycatMessageHandler()); - messagingService.registerHandler(NettyProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler()); - } - - protected NettyMessagingService getNettyMessagingService() { - return messagingService; - } - - @Override - public void requestHandler(RequestHandler handler) { - this.handler = handler; - } - - @Override - public CompletableFuture listen() { - try { - messagingService.activate(); - return CompletableFuture.completedFuture(null); - } catch (Exception e) { - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(e); - return future; - } - } - - @Override - public CompletableFuture close() { - CompletableFuture future = new CompletableFuture<>(); - try { - messagingService.deactivate(); - future.complete(null); - return future; - } catch (Exception e) { - future.completeExceptionally(e); - return future; - } - } - - private class CopycatMessageHandler implements MessageHandler { - - @Override - public void handle(Message message) throws IOException { - T request = NettyProtocol.SERIALIZER.decode(message.payload()); - if (request.getClass().equals(PingRequest.class)) { - handler.ping((PingRequest) request).whenComplete((response, error) -> { - try { - message.respond(NettyProtocol.SERIALIZER.encode(response)); - } catch (Exception e) { - log.error("Failed to respond to ping request", e); - } - }); - } else if (request.getClass().equals(PollRequest.class)) { - handler.poll((PollRequest) request).whenComplete((response, error) -> { - try { - message.respond(NettyProtocol.SERIALIZER.encode(response)); - } catch (Exception e) { - log.error("Failed to respond to poll request", e); - } - }); - } else if (request.getClass().equals(SyncRequest.class)) { - handler.sync((SyncRequest) request).whenComplete((response, error) -> { - try { - message.respond(NettyProtocol.SERIALIZER.encode(response)); - } catch (Exception e) { - log.error("Failed to respond to sync request", e); - } - }); - } else if (request.getClass().equals(SubmitRequest.class)) { - handler.submit((SubmitRequest) request).whenComplete((response, error) -> { - try { - message.respond(NettyProtocol.SERIALIZER.encode(response)); - } catch (Exception e) { - log.error("Failed to respond to submit request", e); - } - }); - } - } - } -}