diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java index c81079d2b1..5d36bda618 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java @@ -25,7 +25,7 @@ import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Service; import org.joda.time.DateTime; -import org.onlab.netty.NettyMessagingService; +import org.onlab.netty.NettyMessagingManager; import org.onlab.packet.IpAddress; import org.onlab.util.KryoNamespace; import org.onosproject.cluster.ClusterEvent; @@ -108,7 +108,7 @@ public class DistributedClusterStore private final Map allNodes = Maps.newConcurrentMap(); private final Map nodeStates = Maps.newConcurrentMap(); private final Map nodeStateLastUpdatedTimes = Maps.newConcurrentMap(); - private NettyMessagingService messagingService; + private NettyMessagingManager messagingService; private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor( groupedThreads("onos/cluster/membership", "heartbeat-sender")); private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor( @@ -148,7 +148,7 @@ public class DistributedClusterStore establishSelfIdentity(); - messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT); + messagingService = new NettyMessagingManager(HEARTBEAT_FD_PORT); try { messagingService.activate(); } catch (InterruptedException e) { diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java index 6f47b4817d..1a4512d3df 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java +++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java @@ -21,8 +21,8 @@ import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; -import org.onlab.netty.NettyMessagingService; -import org.onlab.nio.service.IOLoopMessagingService; +import org.onlab.netty.NettyMessagingManager; +import org.onlab.nio.service.IOLoopMessagingManager; import org.onosproject.cluster.ClusterService; import org.onosproject.cluster.ControllerNode; import org.onosproject.cluster.NodeId; @@ -69,7 +69,7 @@ public class ClusterCommunicationManager public void activate() { ControllerNode localNode = clusterService.getLocalNode(); if (useNetty) { - NettyMessagingService netty = new NettyMessagingService(localNode.ip(), localNode.tcpPort()); + NettyMessagingManager netty = new NettyMessagingManager(localNode.ip(), localNode.tcpPort()); try { netty.activate(); messagingService = netty; @@ -77,7 +77,7 @@ public class ClusterCommunicationManager log.error("NettyMessagingService#activate", e); } } else { - IOLoopMessagingService ioLoop = new IOLoopMessagingService(localNode.ip(), localNode.tcpPort()); + IOLoopMessagingManager ioLoop = new IOLoopMessagingManager(localNode.ip(), localNode.tcpPort()); try { ioLoop.activate(); messagingService = ioLoop; @@ -94,9 +94,9 @@ public class ClusterCommunicationManager // FIXME: workaround until it becomes a service. try { if (useNetty) { - ((NettyMessagingService) messagingService).deactivate(); + ((NettyMessagingManager) messagingService).deactivate(); } else { - ((IOLoopMessagingService) messagingService).deactivate(); + ((IOLoopMessagingManager) messagingService).deactivate(); } } catch (Exception e) { log.error("MessagingService#deactivate", e); diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java index e63a6d4f99..1a106ba5b2 100644 --- a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java +++ b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java @@ -22,7 +22,7 @@ import org.junit.Test; import org.onosproject.cluster.DefaultControllerNode; import org.onosproject.cluster.NodeId; import org.onosproject.store.cluster.impl.ClusterNodesDelegate; -import org.onlab.netty.NettyMessagingService; +import org.onlab.netty.NettyMessagingManager; import org.onlab.packet.IpAddress; import java.util.concurrent.CountDownLatch; @@ -56,7 +56,7 @@ public class ClusterCommunicationManagerTest { @Before public void setUp() throws Exception { - NettyMessagingService messagingService = new NettyMessagingService(); + NettyMessagingManager messagingService = new NettyMessagingManager(); messagingService.activate(); ccm1 = new ClusterCommunicationManager(); diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java similarity index 98% rename from utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java rename to utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java index eeba05e335..bfc1c70263 100644 --- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java +++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java @@ -65,7 +65,7 @@ import com.google.common.cache.RemovalNotification; /** * Implementation of MessagingService based on Netty framework. */ -public class NettyMessagingService implements MessagingService { +public class NettyMessagingManager implements MessagingService { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -111,15 +111,15 @@ public class NettyMessagingService implements MessagingService { clientChannelClass = NioSocketChannel.class; } - public NettyMessagingService(IpAddress ip, int port) { + public NettyMessagingManager(IpAddress ip, int port) { localEp = new Endpoint(ip, port); } - public NettyMessagingService() { + public NettyMessagingManager() { this(8080); } - public NettyMessagingService(int port) { + public NettyMessagingManager(int port) { try { localEp = new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), port); } catch (UnknownHostException e) { diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java index 61d8541536..53a36e3184 100644 --- a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java +++ b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java @@ -38,8 +38,8 @@ public class PingPongTest { @Ignore("Turning off fragile test") @Test public void testPingPong() throws Exception { - NettyMessagingService pinger = new NettyMessagingService(8085); - NettyMessagingService ponger = new NettyMessagingService(9086); + NettyMessagingManager pinger = new NettyMessagingManager(8085); + NettyMessagingManager ponger = new NettyMessagingManager(9086); try { pinger.activate(); ponger.activate(); diff --git a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingService.java b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingManager.java similarity index 96% rename from utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingService.java rename to utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingManager.java index ce917f7dd6..c183523bcc 100644 --- a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingService.java +++ b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingManager.java @@ -38,6 +38,7 @@ import java.util.function.Function; import org.apache.commons.pool.KeyedPoolableObjectFactory; import org.apache.commons.pool.impl.GenericKeyedObjectPool; import org.onlab.nio.AcceptorLoop; +import org.onlab.nio.SelectorLoop; import org.onlab.packet.IpAddress; import org.onosproject.store.cluster.messaging.Endpoint; import org.onosproject.store.cluster.messaging.MessagingService; @@ -53,7 +54,7 @@ import com.google.common.collect.Lists; /** * MessagingService implementation based on IOLoop. */ -public class IOLoopMessagingService implements MessagingService { +public class IOLoopMessagingManager implements MessagingService { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -81,7 +82,7 @@ public class IOLoopMessagingService implements MessagingService { private final Endpoint localEp; private GenericKeyedObjectPool streams = - new GenericKeyedObjectPool(new DefaultMessageStreamFactory()); + new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory()); private final ConcurrentMap> handlers = new ConcurrentHashMap<>(); private final AtomicLong messageIdGenerator = new AtomicLong(0); @@ -97,20 +98,21 @@ public class IOLoopMessagingService implements MessagingService { .build(); - public IOLoopMessagingService(int port) { + public IOLoopMessagingManager(int port) { this(new Endpoint(IpAddress.valueOf("127.0.0.1"), port)); } - public IOLoopMessagingService(IpAddress ip, int port) { + public IOLoopMessagingManager(IpAddress ip, int port) { this(new Endpoint(ip, port)); } - public IOLoopMessagingService(Endpoint localEp) { + public IOLoopMessagingManager(Endpoint localEp) { this.localEp = localEp; } /** * Returns the local endpoint. + * * @return local endpoint */ public Endpoint localEp() { @@ -119,6 +121,7 @@ public class IOLoopMessagingService implements MessagingService { /** * Activates IO Loops. + * * @throws IOException is activation fails */ public void activate() throws IOException { @@ -129,7 +132,7 @@ public class IOLoopMessagingService implements MessagingService { ioLoops.add(new DefaultIOLoop(this::dispatchLocally)); } - ioLoops.forEach(loop -> ioThreadPool.execute(loop)); + ioLoops.forEach(ioThreadPool::execute); acceptorThreadPool.execute(acceptorLoop); ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT)); acceptorLoop.awaitStart(TIMEOUT); @@ -139,7 +142,7 @@ public class IOLoopMessagingService implements MessagingService { * Shuts down IO loops. */ public void deactivate() { - ioLoops.forEach(loop -> loop.shutdown()); + ioLoops.forEach(SelectorLoop::shutdown); acceptorLoop.shutdown(); ioThreadPool.shutdown(); acceptorThreadPool.shutdown();