diff --git a/apps/foo/pom.xml b/apps/foo/pom.xml index 860d70bba8..868b99200d 100644 --- a/apps/foo/pom.xml +++ b/apps/foo/pom.xml @@ -27,10 +27,6 @@ onlab-nio ${project.version} - - org.livetribe.slp - livetribe-slp - org.apache.karaf.shell org.apache.karaf.shell.console diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java index 3ec8c07044..302a0c75d6 100644 --- a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java +++ b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java @@ -233,7 +233,7 @@ public class IOLoopTestClient { } @Override - protected void connect(SelectionKey key) { + protected void connect(SelectionKey key) throws IOException { super.connect(key); TestMessageStream b = (TestMessageStream) key.attachment(); Worker w = ((CustomIOLoop) b.loop()).worker; diff --git a/apps/foo/src/main/resources/org/onlab/onos/foo/FooComponent.properties b/apps/foo/src/main/resources/org/onlab/onos/foo/FooComponent.properties new file mode 100644 index 0000000000..eed1e38d12 --- /dev/null +++ b/apps/foo/src/main/resources/org/onlab/onos/foo/FooComponent.properties @@ -0,0 +1,34 @@ +livetribe.slp.da.expired.services.purge.period=60 +livetribe.slp.sa.client.connect.address=127.0.0.1 +livetribe.slp.sa.client.factory=org.livetribe.slp.sa.StandardServiceAgentClient$Factory +livetribe.slp.sa.factory=org.livetribe.slp.sa.StandardServiceAgent$Factory +livetribe.slp.sa.service.renewal.enabled=true +livetribe.slp.sa.unicast.prefer.tcp=false +livetribe.slp.tcp.connector.factory=org.livetribe.slp.spi.net.SocketTCPConnector$Factory +livetribe.slp.tcp.connector.server.factory=org.livetribe.slp.spi.net.SocketTCPConnectorServer$Factory +livetribe.slp.tcp.message.max.length=4096 +livetribe.slp.tcp.read.timeout=300000 +livetribe.slp.ua.client.factory=org.livetribe.slp.ua.StandardUserAgentClient$Factory +livetribe.slp.ua.factory=org.livetribe.slp.ua.StandardUserAgent$Factory +livetribe.slp.ua.unicast.prefer.tcp=false +livetribe.slp.udp.connector.factory=org.livetribe.slp.spi.net.SocketUDPConnector$Factory +livetribe.slp.udp.connector.server.factory=org.livetribe.slp.spi.net.SocketUDPConnectorServer$Factory +net.slp.DAAddresses= +net.slp.DAAttributes= +net.slp.DAHeartBeat=10800 +net.slp.MTU=1400 +net.slp.SAAttributes= +net.slp.broadcastAddress=255.255.255.255 +net.slp.datagramTimeouts=150,250,400 +net.slp.interfaces=0.0.0.0 +net.slp.isBroadcastOnly=false +net.slp.locale=en +net.slp.multicastAddress=239.255.255.253 +net.slp.multicastMaximumWait=15000 +net.slp.multicastTTL=255 +net.slp.multicastTimeouts=150,250,400,600,1000 +net.slp.notificationPort=1847 +net.slp.port=427 +net.slp.useScopes=default + +org.onlab.cluster.name = TV-ONOS diff --git a/cli/src/main/java/org/onlab/onos/cli/NodeAddCommand.java b/cli/src/main/java/org/onlab/onos/cli/NodeAddCommand.java new file mode 100644 index 0000000000..7c9a163a2f --- /dev/null +++ b/cli/src/main/java/org/onlab/onos/cli/NodeAddCommand.java @@ -0,0 +1,34 @@ +package org.onlab.onos.cli; + +import org.apache.karaf.shell.commands.Argument; +import org.apache.karaf.shell.commands.Command; +import org.onlab.onos.cluster.ClusterAdminService; +import org.onlab.onos.cluster.NodeId; +import org.onlab.packet.IpPrefix; + +/** + * Adds a new controller cluster node. + */ +@Command(scope = "onos", name = "add-node", + description = "Adds a new controller cluster node") +public class NodeAddCommand extends AbstractShellCommand { + + @Argument(index = 0, name = "nodeId", description = "Node ID", + required = true, multiValued = false) + String nodeId = null; + + @Argument(index = 1, name = "ip", description = "Node IP address", + required = true, multiValued = false) + String ip = null; + + @Argument(index = 2, name = "tcpPort", description = "Node TCP listen port", + required = false, multiValued = false) + int tcpPort = 9876; + + @Override + protected void execute() { + ClusterAdminService service = get(ClusterAdminService.class); + service.addNode(new NodeId(nodeId), IpPrefix.valueOf(ip), tcpPort); + } + +} diff --git a/cli/src/main/java/org/onlab/onos/cli/NodeRemoveCommand.java b/cli/src/main/java/org/onlab/onos/cli/NodeRemoveCommand.java new file mode 100644 index 0000000000..219c1871d4 --- /dev/null +++ b/cli/src/main/java/org/onlab/onos/cli/NodeRemoveCommand.java @@ -0,0 +1,25 @@ +package org.onlab.onos.cli; + +import org.apache.karaf.shell.commands.Argument; +import org.apache.karaf.shell.commands.Command; +import org.onlab.onos.cluster.ClusterAdminService; +import org.onlab.onos.cluster.NodeId; + +/** + * Removes a controller cluster node. + */ +@Command(scope = "onos", name = "remove-node", + description = "Removes a new controller cluster node") +public class NodeRemoveCommand extends AbstractShellCommand { + + @Argument(index = 0, name = "nodeId", description = "Node ID", + required = true, multiValued = false) + String nodeId = null; + + @Override + protected void execute() { + ClusterAdminService service = get(ClusterAdminService.class); + service.removeNode(new NodeId(nodeId)); + } + +} diff --git a/cli/src/main/java/org/onlab/onos/cli/NodesListCommand.java b/cli/src/main/java/org/onlab/onos/cli/NodesListCommand.java index 40f722d14c..b7b4556d73 100644 --- a/cli/src/main/java/org/onlab/onos/cli/NodesListCommand.java +++ b/cli/src/main/java/org/onlab/onos/cli/NodesListCommand.java @@ -17,7 +17,7 @@ import static com.google.common.collect.Lists.newArrayList; public class NodesListCommand extends AbstractShellCommand { private static final String FMT = - "id=%s, ip=%s, state=%s %s"; + "id=%s, address=%s:%s, state=%s %s"; @Override protected void execute() { @@ -26,7 +26,7 @@ public class NodesListCommand extends AbstractShellCommand { Collections.sort(nodes, Comparators.NODE_COMPARATOR); ControllerNode self = service.getLocalNode(); for (ControllerNode node : nodes) { - print(FMT, node.id(), node.ip(), + print(FMT, node.id(), node.ip(), node.tcpPort(), service.getState(node.id()), node.equals(self) ? "*" : ""); } diff --git a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml index 30fce6f62a..16b56725b5 100644 --- a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml +++ b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml @@ -4,6 +4,12 @@ + + + + + + diff --git a/core/api/src/main/java/org/onlab/onos/cluster/ClusterAdminService.java b/core/api/src/main/java/org/onlab/onos/cluster/ClusterAdminService.java index 4f98804468..73137e1cba 100644 --- a/core/api/src/main/java/org/onlab/onos/cluster/ClusterAdminService.java +++ b/core/api/src/main/java/org/onlab/onos/cluster/ClusterAdminService.java @@ -1,10 +1,22 @@ package org.onlab.onos.cluster; +import org.onlab.packet.IpPrefix; + /** * Service for administering the cluster node membership. */ public interface ClusterAdminService { + /** + * Adds a new controller node to the cluster. + * + * @param nodeId controller node identifier + * @param ip node IP listen address + * @param tcpPort tcp listen port + * @return newly added node + */ + ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort); + /** * Removes the specified node from the cluster node list. * diff --git a/core/api/src/main/java/org/onlab/onos/cluster/ClusterStore.java b/core/api/src/main/java/org/onlab/onos/cluster/ClusterStore.java index ea5bbd3403..3725706f04 100644 --- a/core/api/src/main/java/org/onlab/onos/cluster/ClusterStore.java +++ b/core/api/src/main/java/org/onlab/onos/cluster/ClusterStore.java @@ -1,6 +1,7 @@ package org.onlab.onos.cluster; import org.onlab.onos.store.Store; +import org.onlab.packet.IpPrefix; import java.util.Set; @@ -39,6 +40,16 @@ public interface ClusterStore extends Store */ ControllerNode.State getState(NodeId nodeId); + /** + * Adds a new controller node to the cluster. + * + * @param nodeId controller node identifier + * @param ip node IP listen address + * @param tcpPort tcp listen port + * @return newly added node + */ + ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort); + /** * Removes the specified node from the inventory of cluster nodes. * diff --git a/core/api/src/main/java/org/onlab/onos/cluster/ControllerNode.java b/core/api/src/main/java/org/onlab/onos/cluster/ControllerNode.java index c6f0cb3342..33fe1c963d 100644 --- a/core/api/src/main/java/org/onlab/onos/cluster/ControllerNode.java +++ b/core/api/src/main/java/org/onlab/onos/cluster/ControllerNode.java @@ -35,4 +35,12 @@ public interface ControllerNode { */ IpPrefix ip(); + + /** + * Returns the TCP port on which the node listens for connections. + * + * @return TCP port + */ + int tcpPort(); + } diff --git a/core/api/src/main/java/org/onlab/onos/cluster/DefaultControllerNode.java b/core/api/src/main/java/org/onlab/onos/cluster/DefaultControllerNode.java index 86ea14c09e..d23b7a32bb 100644 --- a/core/api/src/main/java/org/onlab/onos/cluster/DefaultControllerNode.java +++ b/core/api/src/main/java/org/onlab/onos/cluster/DefaultControllerNode.java @@ -11,13 +11,17 @@ import static com.google.common.base.MoreObjects.toStringHelper; */ public class DefaultControllerNode implements ControllerNode { + private static final int DEFAULT_PORT = 9876; + private final NodeId id; private final IpPrefix ip; + private final int tcpPort; // For serialization private DefaultControllerNode() { this.id = null; this.ip = null; + this.tcpPort = 0; } /** @@ -27,8 +31,19 @@ public class DefaultControllerNode implements ControllerNode { * @param ip instance IP address */ public DefaultControllerNode(NodeId id, IpPrefix ip) { + this(id, ip, DEFAULT_PORT); + } + + /** + * Creates a new instance with the specified id and IP address and TCP port. + * + * @param id instance identifier + * @param ip instance IP address + */ + public DefaultControllerNode(NodeId id, IpPrefix ip, int tcpPort) { this.id = id; this.ip = ip; + this.tcpPort = tcpPort; } @Override @@ -41,6 +56,11 @@ public class DefaultControllerNode implements ControllerNode { return ip; } + @Override + public int tcpPort() { + return tcpPort; + } + @Override public int hashCode() { return Objects.hash(id); @@ -60,7 +80,8 @@ public class DefaultControllerNode implements ControllerNode { @Override public String toString() { - return toStringHelper(this).add("id", id).add("ip", ip).toString(); + return toStringHelper(this).add("id", id) + .add("ip", ip).add("tcpPort", tcpPort).toString(); } } diff --git a/core/api/src/main/java/org/onlab/onos/net/proxyarp/package-info.java b/core/api/src/main/java/org/onlab/onos/net/proxyarp/package-info.java index 4917c6e2f3..30d44e71ef 100644 --- a/core/api/src/main/java/org/onlab/onos/net/proxyarp/package-info.java +++ b/core/api/src/main/java/org/onlab/onos/net/proxyarp/package-info.java @@ -1,4 +1,4 @@ /** * Base abstractions related to the proxy arp service. */ -package org.onlab.onos.net.proxyarp; \ No newline at end of file +package org.onlab.onos.net.proxyarp; diff --git a/core/net/src/main/java/org/onlab/onos/cluster/impl/ClusterManager.java b/core/net/src/main/java/org/onlab/onos/cluster/impl/ClusterManager.java index 9913ad0eea..36f2f7ed50 100644 --- a/core/net/src/main/java/org/onlab/onos/cluster/impl/ClusterManager.java +++ b/core/net/src/main/java/org/onlab/onos/cluster/impl/ClusterManager.java @@ -16,10 +16,12 @@ import org.onlab.onos.cluster.ControllerNode; import org.onlab.onos.cluster.NodeId; import org.onlab.onos.event.AbstractListenerRegistry; import org.onlab.onos.event.EventDeliveryService; +import org.onlab.packet.IpPrefix; import org.slf4j.Logger; import java.util.Set; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static org.slf4j.LoggerFactory.getLogger; @@ -80,6 +82,14 @@ public class ClusterManager implements ClusterService, ClusterAdminService { return store.getState(nodeId); } + @Override + public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) { + checkNotNull(nodeId, INSTANCE_ID_NULL); + checkNotNull(ip, "IP address cannot be null"); + checkArgument(tcpPort > 5000, "TCP port must be > 5000"); + return store.addNode(nodeId, ip, tcpPort); + } + @Override public void removeNode(NodeId nodeId) { checkNotNull(nodeId, INSTANCE_ID_NULL); diff --git a/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/package-info.java b/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/package-info.java index 53830bca00..c0cbd208b6 100644 --- a/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/package-info.java +++ b/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/package-info.java @@ -1,4 +1,4 @@ /** * Core subsystem for responding to arp requests. */ -package org.onlab.onos.net.proxyarp.impl; \ No newline at end of file +package org.onlab.onos.net.proxyarp.impl; diff --git a/core/net/src/test/java/org/onlab/onos/net/device/impl/DistributedDeviceManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/device/impl/DistributedDeviceManagerTest.java index 90cb49c00a..10e9b39038 100644 --- a/core/net/src/test/java/org/onlab/onos/net/device/impl/DistributedDeviceManagerTest.java +++ b/core/net/src/test/java/org/onlab/onos/net/device/impl/DistributedDeviceManagerTest.java @@ -33,8 +33,11 @@ import org.onlab.onos.net.device.PortDescription; import org.onlab.onos.net.provider.AbstractProvider; import org.onlab.onos.net.provider.ProviderId; import org.onlab.onos.store.common.StoreManager; +import org.onlab.onos.store.common.StoreService; import org.onlab.onos.store.common.TestStoreManager; import org.onlab.onos.store.device.impl.DistributedDeviceStore; +import org.onlab.onos.store.serializers.KryoSerializationManager; +import org.onlab.onos.store.serializers.KryoSerializationService; import org.onlab.packet.IpPrefix; import java.util.ArrayList; @@ -92,6 +95,7 @@ public class DistributedDeviceManagerTest { private DistributedDeviceStore dstore; private TestMastershipManager masterManager; private EventDeliveryService eventService; + private KryoSerializationManager serializationMgr; @Before public void setUp() { @@ -107,7 +111,10 @@ public class DistributedDeviceManagerTest { storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); storeManager.activate(); - dstore = new TestDistributedDeviceStore(); + serializationMgr = new KryoSerializationManager(); + serializationMgr.activate(); + + dstore = new TestDistributedDeviceStore(storeManager, serializationMgr); dstore.activate(); mgr.store = dstore; @@ -133,6 +140,7 @@ public class DistributedDeviceManagerTest { mgr.deactivate(); dstore.deactivate(); + serializationMgr.deactivate(); storeManager.deactivate(); } @@ -163,7 +171,7 @@ public class DistributedDeviceManagerTest { public void deviceDisconnected() { connectDevice(DID1, SW1); connectDevice(DID2, SW1); - validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED); + validateEvents(DEVICE_ADDED, DEVICE_ADDED); assertTrue("device should be available", service.isAvailable(DID1)); // Disconnect @@ -182,10 +190,10 @@ public class DistributedDeviceManagerTest { @Test public void deviceUpdated() { connectDevice(DID1, SW1); - validateEvents(DEVICE_ADDED, DEVICE_ADDED); + validateEvents(DEVICE_ADDED); connectDevice(DID1, SW2); - validateEvents(DEVICE_UPDATED, DEVICE_UPDATED); + validateEvents(DEVICE_UPDATED); } @Test @@ -202,7 +210,7 @@ public class DistributedDeviceManagerTest { pds.add(new DefaultPortDescription(P2, true)); pds.add(new DefaultPortDescription(P3, true)); providerService.updatePorts(DID1, pds); - validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED, PORT_ADDED); + validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED, PORT_ADDED); pds.clear(); pds.add(new DefaultPortDescription(P1, false)); @@ -218,7 +226,7 @@ public class DistributedDeviceManagerTest { pds.add(new DefaultPortDescription(P1, true)); pds.add(new DefaultPortDescription(P2, true)); providerService.updatePorts(DID1, pds); - validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED); + validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED); providerService.portStatusChanged(DID1, new DefaultPortDescription(P1, false)); validateEvents(PORT_UPDATED); @@ -233,7 +241,7 @@ public class DistributedDeviceManagerTest { pds.add(new DefaultPortDescription(P1, true)); pds.add(new DefaultPortDescription(P2, true)); providerService.updatePorts(DID1, pds); - validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED); + validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED); assertEquals("wrong port count", 2, service.getPorts(DID1).size()); Port port = service.getPort(DID1, P1); @@ -247,7 +255,7 @@ public class DistributedDeviceManagerTest { connectDevice(DID2, SW2); assertEquals("incorrect device count", 2, service.getDeviceCount()); admin.removeDevice(DID1); - validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_REMOVED, DEVICE_REMOVED); + validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_REMOVED); assertNull("device should not be found", service.getDevice(DID1)); assertNotNull("device should be found", service.getDevice(DID2)); assertEquals("incorrect device count", 1, service.getDeviceCount()); @@ -298,8 +306,10 @@ public class DistributedDeviceManagerTest { private class TestDistributedDeviceStore extends DistributedDeviceStore { - public TestDistributedDeviceStore() { - this.storeService = storeManager; + public TestDistributedDeviceStore(StoreService storeService, + KryoSerializationService kryoSerializationService) { + this.storeService = storeService; + this.kryoSerializationService = kryoSerializationService; } } diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml index 900a2ff6a9..577376a902 100644 --- a/core/store/dist/pom.xml +++ b/core/store/dist/pom.xml @@ -26,6 +26,23 @@ onos-core-serializers ${project.version} + + + + org.onlab.onos + onlab-nio + ${project.version} + + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-annotations + + org.apache.felix org.apache.felix.scr.annotations diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterDefinitionStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterDefinitionStore.java new file mode 100644 index 0000000000..4dc67d4a02 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterDefinitionStore.java @@ -0,0 +1,75 @@ +package org.onlab.onos.store.cluster.impl; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.onlab.onos.cluster.DefaultControllerNode; +import org.onlab.onos.cluster.NodeId; +import org.onlab.packet.IpPrefix; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +/** + * Allows for reading and writing cluster definition as a JSON file. + */ +public class ClusterDefinitionStore { + + private final File file; + + /** + * Creates a reader/writer of the cluster definition file. + * + * @param filePath location of the definition file + */ + public ClusterDefinitionStore(String filePath) { + file = new File(filePath); + } + + /** + * Returns set of the controller nodes, including self. + * + * @return set of controller nodes + */ + public Set read() throws IOException { + Set nodes = new HashSet<>(); + ObjectMapper mapper = new ObjectMapper(); + ObjectNode clusterNodeDef = (ObjectNode) mapper.readTree(file); + Iterator it = ((ArrayNode) clusterNodeDef.get("nodes")).elements(); + while (it.hasNext()) { + ObjectNode nodeDef = (ObjectNode) it.next(); + nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()), + IpPrefix.valueOf(nodeDef.get("ip").asText()), + nodeDef.get("tcpPort").asInt(9876))); + } + return nodes; + } + + /** + * Writes the given set of the controller nodes. + * + * @param nodes set of controller nodes + */ + public void write(Set nodes) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode clusterNodeDef = mapper.createObjectNode(); + ArrayNode nodeDefs = mapper.createArrayNode(); + clusterNodeDef.set("nodes", nodeDefs); + for (DefaultControllerNode node : nodes) { + ObjectNode nodeDef = mapper.createObjectNode(); + nodeDef.put("id", node.id().toString()) + .put("ip", node.ip().toString()) + .put("tcpPort", node.tcpPort()); + nodeDefs.add(nodeDef); + } + mapper.writeTree(new JsonFactory().createGenerator(file, JsonEncoding.UTF8), + clusterNodeDef); + } + +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java new file mode 100644 index 0000000000..5cd9d9eb5f --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java @@ -0,0 +1,393 @@ +package org.onlab.onos.store.cluster.impl; + +import com.google.common.collect.ImmutableSet; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.onlab.nio.AcceptorLoop; +import org.onlab.nio.IOLoop; +import org.onlab.nio.MessageStream; +import org.onlab.onos.cluster.ClusterEvent; +import org.onlab.onos.cluster.ClusterStore; +import org.onlab.onos.cluster.ClusterStoreDelegate; +import org.onlab.onos.cluster.ControllerNode; +import org.onlab.onos.cluster.DefaultControllerNode; +import org.onlab.onos.cluster.NodeId; +import org.onlab.onos.store.AbstractStore; +import org.onlab.packet.IpPrefix; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.ByteChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static java.net.InetAddress.getByAddress; +import static org.onlab.onos.cluster.ControllerNode.State; +import static org.onlab.packet.IpPrefix.valueOf; +import static org.onlab.util.Tools.namedThreads; + +/** + * Distributed implementation of the cluster nodes store. + */ +@Component(immediate = true) +@Service +public class DistributedClusterStore + extends AbstractStore + implements ClusterStore { + + private static final int HELLO_MSG = 1; + private static final int ECHO_MSG = 2; + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private static final long CONNECTION_CUSTODIAN_DELAY = 1000L; + private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000; + + private static final long START_TIMEOUT = 1000; + private static final long SELECT_TIMEOUT = 50; + private static final int WORKERS = 3; + private static final int COMM_BUFFER_SIZE = 32 * 1024; + private static final int COMM_IDLE_TIME = 500; + + private static final boolean SO_NO_DELAY = false; + private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE; + private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE; + + private DefaultControllerNode self; + private final Map nodes = new ConcurrentHashMap<>(); + private final Map states = new ConcurrentHashMap<>(); + + // Means to track message streams to other nodes. + private final Map streams = new ConcurrentHashMap<>(); + private final Map nodesByChannel = new ConcurrentHashMap<>(); + + // Executor pools for listening and managing connections to other nodes. + private final ExecutorService listenExecutor = + Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen")); + private final ExecutorService commExecutors = + Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster")); + private final ExecutorService heartbeatExecutor = + Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat")); + + private final Timer timer = new Timer("onos-comm-initiator"); + private final TimerTask connectionCustodian = new ConnectionCustodian(); + + private ListenLoop listenLoop; + private List commLoops = new ArrayList<>(WORKERS); + + @Activate + public void activate() { + loadClusterDefinition(); + startCommunications(); + startListening(); + startInitiating(); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + listenLoop.shutdown(); + for (CommLoop loop : commLoops) { + loop.shutdown(); + } + log.info("Stopped"); + } + + // Loads the cluster definition file + private void loadClusterDefinition() { +// ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json"); +// try { +// Set storedNodes = cds.read(); +// for (DefaultControllerNode node : storedNodes) { +// nodes.put(node.id(), node); +// } +// } catch (IOException e) { +// log.error("Unable to read cluster definitions", e); +// } + + // Establishes the controller's own identity. + IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1")); + self = nodes.get(new NodeId(ip.toString())); + + // As a fall-back, let's make sure we at least know who we are. + if (self == null) { + self = new DefaultControllerNode(new NodeId(ip.toString()), ip); + nodes.put(self.id(), self); + states.put(self.id(), State.ACTIVE); + } + } + + // Kicks off the IO loops. + private void startCommunications() { + for (int i = 0; i < WORKERS; i++) { + try { + CommLoop loop = new CommLoop(); + commLoops.add(loop); + commExecutors.execute(loop); + } catch (IOException e) { + log.warn("Unable to start comm IO loop", e); + } + } + + // Wait for the IO loops to start + for (CommLoop loop : commLoops) { + if (!loop.awaitStart(START_TIMEOUT)) { + log.warn("Comm loop did not start on-time; moving on..."); + } + } + } + + // Starts listening for connections from peer cluster members. + private void startListening() { + try { + listenLoop = new ListenLoop(self.ip(), self.tcpPort()); + listenExecutor.execute(listenLoop); + if (!listenLoop.awaitStart(START_TIMEOUT)) { + log.warn("Listen loop did not start on-time; moving on..."); + } + } catch (IOException e) { + log.error("Unable to listen for cluster connections", e); + } + } + + /** + * Initiates open connection request and registers the pending socket + * channel with the given IO loop. + * + * @param loop loop with which the channel should be registered + * @throws java.io.IOException if the socket could not be open or connected + */ + private void openConnection(DefaultControllerNode node, CommLoop loop) throws IOException { + SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort()); + SocketChannel ch = SocketChannel.open(); + nodesByChannel.put(ch, node); + ch.configureBlocking(false); + ch.connect(sa); + loop.connectStream(ch); + } + + + // Attempts to connect to any nodes that do not have an associated connection. + private void startInitiating() { + timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY, CONNECTION_CUSTODIAN_FREQUENCY); + } + + @Override + public ControllerNode getLocalNode() { + return self; + } + + @Override + public Set getNodes() { + ImmutableSet.Builder builder = ImmutableSet.builder(); + return builder.addAll(nodes.values()).build(); + } + + @Override + public ControllerNode getNode(NodeId nodeId) { + return nodes.get(nodeId); + } + + @Override + public State getState(NodeId nodeId) { + State state = states.get(nodeId); + return state == null ? State.INACTIVE : state; + } + + @Override + public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) { + DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort); + nodes.put(nodeId, node); + return node; + } + + @Override + public void removeNode(NodeId nodeId) { + nodes.remove(nodeId); + TLVMessageStream stream = streams.remove(nodeId); + if (stream != null) { + stream.close(); + } + } + + // Listens and accepts inbound connections from other cluster nodes. + private class ListenLoop extends AcceptorLoop { + ListenLoop(IpPrefix ip, int tcpPort) throws IOException { + super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort)); + } + + @Override + protected void acceptConnection(ServerSocketChannel channel) throws IOException { + SocketChannel sc = channel.accept(); + sc.configureBlocking(false); + + Socket so = sc.socket(); + so.setTcpNoDelay(SO_NO_DELAY); + so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE); + so.setSendBufferSize(SO_SEND_BUFFER_SIZE); + + findLeastUtilizedLoop().acceptStream(sc); + } + } + + private class CommLoop extends IOLoop { + CommLoop() throws IOException { + super(SELECT_TIMEOUT); + } + + @Override + protected TLVMessageStream createStream(ByteChannel byteChannel) { + return new TLVMessageStream(this, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME); + } + + @Override + protected void processMessages(List messages, MessageStream stream) { + TLVMessageStream tlvStream = (TLVMessageStream) stream; + for (TLVMessage message : messages) { + // TODO: add type-based dispatching here... this is just a hack to get going + if (message.type() == HELLO_MSG) { + processHello(message, tlvStream); + } else if (message.type() == ECHO_MSG) { + processEcho(message, tlvStream); + } else { + log.info("Deal with other messages"); + } + } + } + + @Override + public TLVMessageStream acceptStream(SocketChannel channel) { + TLVMessageStream stream = super.acceptStream(channel); + try { + InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress(); + log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress())); + stream.write(createHello(self)); + + } catch (IOException e) { + log.warn("Unable to accept connection from an unknown end-point", e); + } + return stream; + } + + @Override + public TLVMessageStream connectStream(SocketChannel channel) { + TLVMessageStream stream = super.connectStream(channel); + DefaultControllerNode node = nodesByChannel.get(channel); + if (node != null) { + log.debug("Opened connection to node {}", node.id()); + nodesByChannel.remove(channel); + } + return stream; + } + + @Override + protected void connect(SelectionKey key) throws IOException { + try { + super.connect(key); + TLVMessageStream stream = (TLVMessageStream) key.attachment(); + send(stream, createHello(self)); + } catch (IOException e) { + if (!Objects.equals(e.getMessage(), "Connection refused")) { + throw e; + } + } + } + + @Override + protected void removeStream(MessageStream stream) { + DefaultControllerNode node = ((TLVMessageStream) stream).node(); + if (node != null) { + log.info("Closed connection to node {}", node.id()); + states.put(node.id(), State.INACTIVE); + streams.remove(node.id()); + } + super.removeStream(stream); + } + } + + // Processes a HELLO message from a peer controller node. + private void processHello(TLVMessage message, TLVMessageStream stream) { + // FIXME: pure hack for now + String data = new String(message.data()); + String[] fields = data.split(":"); + DefaultControllerNode node = new DefaultControllerNode(new NodeId(fields[0]), + valueOf(fields[1]), + Integer.parseInt(fields[2])); + stream.setNode(node); + nodes.put(node.id(), node); + streams.put(node.id(), stream); + states.put(node.id(), State.ACTIVE); + } + + // Processes an ECHO message from a peer controller node. + private void processEcho(TLVMessage message, TLVMessageStream tlvStream) { + // TODO: implement heart-beat refresh + log.info("Dealing with echoes..."); + } + + // Sends message to the specified stream. + private void send(TLVMessageStream stream, TLVMessage message) { + try { + stream.write(message); + } catch (IOException e) { + log.warn("Unable to send message to {}", stream.node().id()); + } + } + + // Creates a hello message to be sent to a peer controller node. + private TLVMessage createHello(DefaultControllerNode self) { + return new TLVMessage(HELLO_MSG, (self.id() + ":" + self.ip() + ":" + self.tcpPort()).getBytes()); + } + + // Sweeps through all controller nodes and attempts to open connection to + // those that presently do not have one. + private class ConnectionCustodian extends TimerTask { + @Override + public void run() { + for (DefaultControllerNode node : nodes.values()) { + if (node != self && !streams.containsKey(node.id())) { + try { + openConnection(node, findLeastUtilizedLoop()); + } catch (IOException e) { + log.debug("Unable to connect", e); + } + } + } + } + } + + // Finds the least utilities IO loop. + private CommLoop findLeastUtilizedLoop() { + CommLoop leastUtilized = null; + int minCount = Integer.MAX_VALUE; + for (CommLoop loop : commLoops) { + int count = loop.streamCount(); + if (count == 0) { + return loop; + } + + if (count < minCount) { + leastUtilized = loop; + minCount = count; + } + } + return leastUtilized; + } +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/TLVMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/TLVMessage.java new file mode 100644 index 0000000000..246f8eecf1 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/TLVMessage.java @@ -0,0 +1,70 @@ +package org.onlab.onos.store.cluster.impl; + +import org.onlab.nio.AbstractMessage; + +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; + +/** + * Base message for cluster-wide communications using TLVs. + */ +public class TLVMessage extends AbstractMessage { + + private final int type; + private final byte[] data; + + /** + * Creates an immutable TLV message. + * + * @param type message type + * @param data message data bytes + */ + public TLVMessage(int type, byte[] data) { + this.length = data.length + TLVMessageStream.METADATA_LENGTH; + this.type = type; + this.data = data; + } + + /** + * Returns the message type indicator. + * + * @return message type + */ + public int type() { + return type; + } + + /** + * Returns the data bytes. + * + * @return message data + */ + public byte[] data() { + return data; + } + + @Override + public int hashCode() { + return Objects.hash(type, data); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final TLVMessage other = (TLVMessage) obj; + return Objects.equals(this.type, other.type) && + Objects.equals(this.data, other.data); + } + + @Override + public String toString() { + return toStringHelper(this).add("type", type).add("length", length).toString(); + } + +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/TLVMessageStream.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/TLVMessageStream.java new file mode 100644 index 0000000000..b003945620 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/TLVMessageStream.java @@ -0,0 +1,95 @@ +package org.onlab.onos.store.cluster.impl; + +import org.onlab.nio.IOLoop; +import org.onlab.nio.MessageStream; +import org.onlab.onos.cluster.DefaultControllerNode; + +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; + +import static com.google.common.base.Preconditions.checkState; + +/** + * Stream for transferring TLV messages between cluster members. + */ +public class TLVMessageStream extends MessageStream { + + public static final int METADATA_LENGTH = 16; // 8 + 4 + 4 + + private static final int LENGTH_OFFSET = 12; + private static final long MARKER = 0xfeedcafecafefeedL; + + private DefaultControllerNode node; + + /** + * Creates a message stream associated with the specified IO loop and + * backed by the given byte channel. + * + * @param loop IO loop + * @param byteChannel backing byte channel + * @param bufferSize size of the backing byte buffers + * @param maxIdleMillis maximum number of millis the stream can be idle + */ + protected TLVMessageStream(IOLoop loop, ByteChannel byteChannel, + int bufferSize, int maxIdleMillis) { + super(loop, byteChannel, bufferSize, maxIdleMillis); + } + + /** + * Returns the node with which this stream is associated. + * + * @return controller node + */ + DefaultControllerNode node() { + return node; + } + + /** + * Sets the node with which this stream is affiliated. + * + * @param node controller node + */ + void setNode(DefaultControllerNode node) { + checkState(this.node == null, "Stream is already bound to a node"); + this.node = node; + } + + @Override + protected TLVMessage read(ByteBuffer buffer) { + // Do we have enough bytes to read the header? If not, bail. + if (buffer.remaining() < METADATA_LENGTH) { + return null; + } + + // Peek at the length and if we have enough to read the entire message + // go ahead, otherwise bail. + int length = buffer.getInt(buffer.position() + LENGTH_OFFSET); + if (buffer.remaining() < length) { + return null; + } + + // At this point, we have enough data to read a complete message. + long marker = buffer.getLong(); + checkState(marker == MARKER, "Incorrect message marker"); + + int type = buffer.getInt(); + length = buffer.getInt(); + + // TODO: add deserialization hook here + byte[] data = new byte[length - METADATA_LENGTH]; + buffer.get(data); + + return new TLVMessage(type, data); + } + + @Override + protected void write(TLVMessage message, ByteBuffer buffer) { + buffer.putLong(MARKER); + buffer.putInt(message.type()); + buffer.putInt(message.length()); + + // TODO: add serialization hook here + buffer.put(message.data()); + } + +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java index 30374c3036..bd5f2fd901 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java @@ -86,46 +86,48 @@ public class OnosDistributedDeviceStore @Override public Iterable getDevices() { - // TODO builder v.s. copyOf. Guava semms to be using copyOf? - // FIXME: synchronize. Builder builder = ImmutableSet.builder(); - for (VersionedValue device : devices.values()) { - builder.add(device.entity()); + synchronized (this) { + for (VersionedValue device : devices.values()) { + builder.add(device.entity()); + } + return builder.build(); } - return builder.build(); } @Override public Device getDevice(DeviceId deviceId) { - return devices.get(deviceId).entity(); + VersionedValue device = devices.get(deviceId); + checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); + return device.entity(); } @Override public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId, DeviceDescription deviceDescription) { - Timestamp now = clockService.getTimestamp(deviceId); + Timestamp newTimestamp = clockService.getTimestamp(deviceId); VersionedValue device = devices.get(deviceId); if (device == null) { - return createDevice(providerId, deviceId, deviceDescription, now); + return createDevice(providerId, deviceId, deviceDescription, newTimestamp); } - checkState(now.compareTo(device.timestamp()) > 0, + checkState(newTimestamp.compareTo(device.timestamp()) > 0, "Existing device has a timestamp in the future!"); - return updateDevice(providerId, device.entity(), deviceDescription, now); + return updateDevice(providerId, device.entity(), deviceDescription, newTimestamp); } // Creates the device and returns the appropriate event if necessary. private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId, DeviceDescription desc, Timestamp timestamp) { - DefaultDevice device = new DefaultDevice(providerId, deviceId, desc.type(), + Device device = new DefaultDevice(providerId, deviceId, desc.type(), desc.manufacturer(), desc.hwVersion(), desc.swVersion(), desc.serialNumber()); - devices.put(deviceId, new VersionedValue(device, true, timestamp)); - // FIXME: broadcast a message telling peers of a device event. + devices.put(deviceId, new VersionedValue<>(device, true, timestamp)); + // TODO,FIXME: broadcast a message telling peers of a device event. return new DeviceEvent(DEVICE_ADDED, device, null); } @@ -148,7 +150,7 @@ public class OnosDistributedDeviceStore } // Otherwise merely attempt to change availability - DefaultDevice updated = new DefaultDevice(providerId, device.id(), + Device updated = new DefaultDevice(providerId, device.id(), desc.type(), desc.manufacturer(), desc.hwVersion(), @@ -196,18 +198,18 @@ public class OnosDistributedDeviceStore VersionedValue device = devices.get(deviceId); checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); Map> ports = getPortMap(deviceId); - Timestamp timestamp = clockService.getTimestamp(deviceId); + Timestamp newTimestamp = clockService.getTimestamp(deviceId); // Add new ports Set processed = new HashSet<>(); for (PortDescription portDescription : portDescriptions) { VersionedValue port = ports.get(portDescription.portNumber()); if (port == null) { - events.add(createPort(device, portDescription, ports, timestamp)); + events.add(createPort(device, portDescription, ports, newTimestamp)); } - checkState(timestamp.compareTo(port.timestamp()) > 0, + checkState(newTimestamp.compareTo(port.timestamp()) > 0, "Existing port state has a timestamp in the future!"); - events.add(updatePort(device, port, portDescription, ports, timestamp)); + events.add(updatePort(device.entity(), port.entity(), portDescription, ports, newTimestamp)); processed.add(portDescription.portNumber()); } @@ -233,19 +235,19 @@ public class OnosDistributedDeviceStore // Checks if the specified port requires update and if so, it replaces the // existing entry in the map and returns corresponding event. //@GuardedBy("this") - private DeviceEvent updatePort(VersionedValue device, VersionedValue port, + private DeviceEvent updatePort(Device device, Port port, PortDescription portDescription, Map> ports, Timestamp timestamp) { - if (port.entity().isEnabled() != portDescription.isEnabled()) { + if (port.isEnabled() != portDescription.isEnabled()) { VersionedValue updatedPort = new VersionedValue( - new DefaultPort(device.entity(), portDescription.portNumber(), + new DefaultPort(device, portDescription.portNumber(), portDescription.isEnabled()), portDescription.isEnabled(), timestamp); - ports.put(port.entity().number(), updatedPort); - updatePortMap(device.entity().id(), ports); - return new DeviceEvent(PORT_UPDATED, device.entity(), updatedPort.entity()); + ports.put(port.number(), updatedPort); + updatePortMap(device.id(), ports); + return new DeviceEvent(PORT_UPDATED, device, updatedPort.entity()); } return null; } @@ -300,7 +302,7 @@ public class OnosDistributedDeviceStore Map> ports = getPortMap(deviceId); VersionedValue port = ports.get(portDescription.portNumber()); Timestamp timestamp = clockService.getTimestamp(deviceId); - return updatePort(device, port, portDescription, ports, timestamp); + return updatePort(device.entity(), port.entity(), portDescription, ports, timestamp); } @Override diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java new file mode 100644 index 0000000000..5df25b4ed5 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java @@ -0,0 +1,246 @@ +package org.onlab.onos.store.link.impl; + +import static org.onlab.onos.net.Link.Type.DIRECT; +import static org.onlab.onos.net.Link.Type.INDIRECT; +import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED; +import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED; +import static org.onlab.onos.net.link.LinkEvent.Type.LINK_UPDATED; +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.onos.net.ConnectPoint; +import org.onlab.onos.net.DefaultLink; +import org.onlab.onos.net.DeviceId; +import org.onlab.onos.net.Link; +import org.onlab.onos.net.LinkKey; +import org.onlab.onos.net.link.LinkDescription; +import org.onlab.onos.net.link.LinkEvent; +import org.onlab.onos.net.link.LinkStore; +import org.onlab.onos.net.link.LinkStoreDelegate; +import org.onlab.onos.net.provider.ProviderId; +import org.onlab.onos.store.AbstractStore; +import org.onlab.onos.store.ClockService; +import org.onlab.onos.store.Timestamp; +import org.onlab.onos.store.device.impl.VersionedValue; +import org.slf4j.Logger; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; +import com.google.common.collect.ImmutableSet.Builder; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +/** + * Manages inventory of infrastructure links using a protocol that takes into consideration + * the order in which events occur. + */ +// FIXME: This does not yet implement the full protocol. +// The full protocol requires the sender of LLDP message to include the +// version information of src device/port and the receiver to +// take that into account when figuring out if a more recent src +// device/port down event renders the link discovery obsolete. +@Component(immediate = true) +@Service +public class OnosDistributedLinkStore + extends AbstractStore + implements LinkStore { + + private final Logger log = getLogger(getClass()); + + // Link inventory + private ConcurrentMap> links; + + public static final String LINK_NOT_FOUND = "Link between %s and %s not found"; + + // TODO synchronize? + // Egress and ingress link sets + private final Multimap> srcLinks = HashMultimap.create(); + private final Multimap> dstLinks = HashMultimap.create(); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClockService clockService; + + @Activate + public void activate() { + + links = new ConcurrentHashMap<>(); + + log.info("Started"); + } + + @Deactivate + public void deactivate() { + log.info("Stopped"); + } + + @Override + public int getLinkCount() { + return links.size(); + } + + @Override + public Iterable getLinks() { + Builder builder = ImmutableSet.builder(); + synchronized (this) { + for (VersionedValue link : links.values()) { + builder.add(link.entity()); + } + return builder.build(); + } + } + + @Override + public Set getDeviceEgressLinks(DeviceId deviceId) { + Set> egressLinks = ImmutableSet.copyOf(srcLinks.get(deviceId)); + Set rawEgressLinks = new HashSet<>(); + for (VersionedValue link : egressLinks) { + rawEgressLinks.add(link.entity()); + } + return rawEgressLinks; + } + + @Override + public Set getDeviceIngressLinks(DeviceId deviceId) { + Set> ingressLinks = ImmutableSet.copyOf(dstLinks.get(deviceId)); + Set rawIngressLinks = new HashSet<>(); + for (VersionedValue link : ingressLinks) { + rawIngressLinks.add(link.entity()); + } + return rawIngressLinks; + } + + @Override + public Link getLink(ConnectPoint src, ConnectPoint dst) { + VersionedValue link = links.get(new LinkKey(src, dst)); + checkArgument(link != null, "LINK_NOT_FOUND", src, dst); + return link.entity(); + } + + @Override + public Set getEgressLinks(ConnectPoint src) { + Set egressLinks = new HashSet<>(); + for (VersionedValue link : srcLinks.get(src.deviceId())) { + if (link.entity().src().equals(src)) { + egressLinks.add(link.entity()); + } + } + return egressLinks; + } + + @Override + public Set getIngressLinks(ConnectPoint dst) { + Set ingressLinks = new HashSet<>(); + for (VersionedValue link : dstLinks.get(dst.deviceId())) { + if (link.entity().dst().equals(dst)) { + ingressLinks.add(link.entity()); + } + } + return ingressLinks; + } + + @Override + public LinkEvent createOrUpdateLink(ProviderId providerId, + LinkDescription linkDescription) { + + final DeviceId destinationDeviceId = linkDescription.dst().deviceId(); + final Timestamp newTimestamp = clockService.getTimestamp(destinationDeviceId); + + LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst()); + VersionedValue link = links.get(key); + if (link == null) { + return createLink(providerId, key, linkDescription, newTimestamp); + } + + checkState(newTimestamp.compareTo(link.timestamp()) > 0, + "Existing Link has a timestamp in the future!"); + + return updateLink(providerId, link, key, linkDescription, newTimestamp); + } + + // Creates and stores the link and returns the appropriate event. + private LinkEvent createLink(ProviderId providerId, LinkKey key, + LinkDescription linkDescription, Timestamp timestamp) { + VersionedValue link = new VersionedValue(new DefaultLink(providerId, key.src(), key.dst(), + linkDescription.type()), true, timestamp); + synchronized (this) { + links.put(key, link); + addNewLink(link, timestamp); + } + // FIXME: notify peers. + return new LinkEvent(LINK_ADDED, link.entity()); + } + + // update Egress and ingress link sets + private void addNewLink(VersionedValue link, Timestamp timestamp) { + Link rawLink = link.entity(); + synchronized (this) { + srcLinks.put(rawLink.src().deviceId(), link); + dstLinks.put(rawLink.dst().deviceId(), link); + } + } + + // Updates, if necessary the specified link and returns the appropriate event. + private LinkEvent updateLink(ProviderId providerId, VersionedValue existingLink, + LinkKey key, LinkDescription linkDescription, Timestamp timestamp) { + // FIXME confirm Link update condition is OK + if (existingLink.entity().type() == INDIRECT && linkDescription.type() == DIRECT) { + synchronized (this) { + + VersionedValue updatedLink = new VersionedValue( + new DefaultLink(providerId, existingLink.entity().src(), existingLink.entity().dst(), + linkDescription.type()), true, timestamp); + links.replace(key, existingLink, updatedLink); + + replaceLink(existingLink, updatedLink); + // FIXME: notify peers. + return new LinkEvent(LINK_UPDATED, updatedLink.entity()); + } + } + return null; + } + + // update Egress and ingress link sets + private void replaceLink(VersionedValue current, VersionedValue updated) { + synchronized (this) { + srcLinks.remove(current.entity().src().deviceId(), current); + dstLinks.remove(current.entity().dst().deviceId(), current); + + srcLinks.put(current.entity().src().deviceId(), updated); + dstLinks.put(current.entity().dst().deviceId(), updated); + } + } + + @Override + public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) { + synchronized (this) { + LinkKey key = new LinkKey(src, dst); + VersionedValue link = links.remove(key); + if (link != null) { + removeLink(link); + // notify peers + return new LinkEvent(LINK_REMOVED, link.entity()); + } + return null; + } + } + + // update Egress and ingress link sets + private void removeLink(VersionedValue link) { + synchronized (this) { + srcLinks.remove(link.entity().src().deviceId(), link); + dstLinks.remove(link.entity().dst().deviceId(), link); + } + } +} diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java index 57d2358817..f83ac59e2a 100644 --- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java +++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java @@ -49,6 +49,7 @@ public class DistributedClusterStore private final MembershipListener listener = new InternalMembershipListener(); private final Map states = new ConcurrentHashMap<>(); + @Override @Activate public void activate() { super.activate(); @@ -56,9 +57,9 @@ public class DistributedClusterStore rawNodes = theInstance.getMap("nodes"); OptionalCacheLoader nodeLoader - = new OptionalCacheLoader<>(storeService, rawNodes); + = new OptionalCacheLoader<>(kryoSerializationService, rawNodes); nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader)); - rawNodes.addEntryListener(new RemoteEventHandler<>(nodes), true); + rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true); loadClusterNodes(); @@ -68,7 +69,7 @@ public class DistributedClusterStore // Loads the initial set of cluster nodes private void loadClusterNodes() { for (Member member : theInstance.getCluster().getMembers()) { - addMember(member); + addNode(node(member)); } } @@ -103,6 +104,11 @@ public class DistributedClusterStore return state == null ? State.INACTIVE : state; } + @Override + public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) { + return addNode(new DefaultControllerNode(nodeId, ip, tcpPort)); + } + @Override public void removeNode(NodeId nodeId) { synchronized (this) { @@ -112,8 +118,7 @@ public class DistributedClusterStore } // Adds a new node based on the specified member - private synchronized ControllerNode addMember(Member member) { - DefaultControllerNode node = node(member); + private synchronized ControllerNode addNode(DefaultControllerNode node) { rawNodes.put(serialize(node.id()), serialize(node)); nodes.put(node.id(), Optional.of(node)); states.put(node.id(), State.ACTIVE); @@ -136,7 +141,7 @@ public class DistributedClusterStore @Override public void memberAdded(MembershipEvent membershipEvent) { log.info("Member {} added", membershipEvent.getMember()); - ControllerNode node = addMember(membershipEvent.getMember()); + ControllerNode node = addNode(node(membershipEvent.getMember())); notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node)); } diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java index a2f2dd9698..50c5f08624 100644 --- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java +++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java @@ -52,7 +52,7 @@ implements MastershipStore { rawMasters = theInstance.getMap("masters"); OptionalCacheLoader nodeLoader - = new OptionalCacheLoader<>(storeService, rawMasters); + = new OptionalCacheLoader<>(kryoSerializationService, rawMasters); masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader)); rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true); @@ -123,7 +123,7 @@ implements MastershipStore { return null; } - private class RemoteMasterShipEventHandler extends RemoteEventHandler { + private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler { public RemoteMasterShipEventHandler(LoadingCache> cache) { super(cache); } diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java index ab513afbf7..0302105408 100644 --- a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java +++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java @@ -6,6 +6,7 @@ import com.hazelcast.core.EntryAdapter; import com.hazelcast.core.EntryEvent; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.MapEvent; +import com.hazelcast.core.Member; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; @@ -14,6 +15,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality; import org.onlab.onos.event.Event; import org.onlab.onos.store.AbstractStore; import org.onlab.onos.store.StoreDelegate; +import org.onlab.onos.store.serializers.KryoSerializationService; import org.slf4j.Logger; import static com.google.common.base.Preconditions.checkNotNull; @@ -31,6 +33,9 @@ public abstract class AbstractHazelcastStore T deserialize(byte[] bytes) { - return storeService.deserialize(bytes); + return kryoSerializationService.deserialize(bytes); } @@ -66,8 +71,9 @@ public abstract class AbstractHazelcastStore IMap key type after deserialization * @param IMap value type after deserialization */ - public class RemoteEventHandler extends EntryAdapter { + public class RemoteCacheEventHandler extends EntryAdapter { + private final Member localMember; private LoadingCache> cache; /** @@ -75,17 +81,26 @@ public abstract class AbstractHazelcastStore> cache) { + public RemoteCacheEventHandler(LoadingCache> cache) { + this.localMember = theInstance.getCluster().getLocalMember(); this.cache = checkNotNull(cache); } @Override public void mapCleared(MapEvent event) { + if (localMember.equals(event.getMember())) { + // ignore locally triggered event + return; + } cache.invalidateAll(); } @Override public void entryAdded(EntryEvent event) { + if (localMember.equals(event.getMember())) { + // ignore locally triggered event + return; + } K key = deserialize(event.getKey()); V newVal = deserialize(event.getValue()); Optional newValue = Optional.of(newVal); @@ -95,6 +110,10 @@ public abstract class AbstractHazelcastStore event) { + if (localMember.equals(event.getMember())) { + // ignore locally triggered event + return; + } K key = deserialize(event.getKey()); V oldVal = deserialize(event.getOldValue()); Optional oldValue = Optional.fromNullable(oldVal); @@ -106,6 +125,10 @@ public abstract class AbstractHazelcastStore event) { + if (localMember.equals(event.getMember())) { + // ignore locally triggered event + return; + } K key = deserialize(event.getKey()); V val = deserialize(event.getOldValue()); cache.invalidate(key); @@ -141,4 +164,80 @@ public abstract class AbstractHazelcastStore Entry key type after deserialization + * @param Entry value type after deserialization + */ + public class RemoteEventHandler extends EntryAdapter { + + private final Member localMember; + + public RemoteEventHandler() { + this.localMember = theInstance.getCluster().getLocalMember(); + } + @Override + public void entryAdded(EntryEvent event) { + if (localMember.equals(event.getMember())) { + // ignore locally triggered event + return; + } + K key = deserialize(event.getKey()); + V newVal = deserialize(event.getValue()); + onAdd(key, newVal); + } + + @Override + public void entryRemoved(EntryEvent event) { + if (localMember.equals(event.getMember())) { + // ignore locally triggered event + return; + } + K key = deserialize(event.getKey()); + V val = deserialize(event.getValue()); + onRemove(key, val); + } + + @Override + public void entryUpdated(EntryEvent event) { + if (localMember.equals(event.getMember())) { + // ignore locally triggered event + return; + } + K key = deserialize(event.getKey()); + V oldVal = deserialize(event.getOldValue()); + V newVal = deserialize(event.getValue()); + onUpdate(key, oldVal, newVal); + } + + /** + * Remote entry addition hook. + * + * @param key new key + * @param newVal new value + */ + protected void onAdd(K key, V newVal) { + } + + /** + * Remote entry update hook. + * + * @param key new key + * @param oldValue old value + * @param newVal new value + */ + protected void onUpdate(K key, V oldValue, V newVal) { + } + + /** + * Remote entry remove hook. + * + * @param key new key + * @param val old value + */ + protected void onRemove(K key, V val) { + } + } + } diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java index dd2b8726ed..f96fdd8354 100644 --- a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java +++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java @@ -2,6 +2,8 @@ package org.onlab.onos.store.common; import static com.google.common.base.Preconditions.checkNotNull; +import org.onlab.onos.store.serializers.KryoSerializationService; + import com.google.common.base.Optional; import com.google.common.cache.CacheLoader; import com.hazelcast.core.IMap; @@ -16,28 +18,28 @@ import com.hazelcast.core.IMap; public final class OptionalCacheLoader extends CacheLoader> { - private final StoreService storeService; + private final KryoSerializationService kryoSerializationService; private IMap rawMap; /** * Constructor. * - * @param storeService to use for serialization + * @param kryoSerializationService to use for serialization * @param rawMap underlying IMap */ - public OptionalCacheLoader(StoreService storeService, IMap rawMap) { - this.storeService = checkNotNull(storeService); + public OptionalCacheLoader(KryoSerializationService kryoSerializationService, IMap rawMap) { + this.kryoSerializationService = checkNotNull(kryoSerializationService); this.rawMap = checkNotNull(rawMap); } @Override public Optional load(K key) throws Exception { - byte[] keyBytes = storeService.serialize(key); + byte[] keyBytes = kryoSerializationService.serialize(key); byte[] valBytes = rawMap.get(keyBytes); if (valBytes == null) { return Optional.absent(); } - V dev = storeService.deserialize(valBytes); + V dev = kryoSerializationService.deserialize(valBytes); return Optional.of(dev); } } diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/StoreManager.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/StoreManager.java index 56851166bc..f3bed7e3ab 100644 --- a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/StoreManager.java +++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/StoreManager.java @@ -5,46 +5,14 @@ import com.hazelcast.config.FileSystemXmlConfig; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; -import de.javakaffee.kryoserializers.URISerializer; - import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Service; -import org.onlab.onos.cluster.ControllerNode; -import org.onlab.onos.cluster.DefaultControllerNode; -import org.onlab.onos.cluster.NodeId; -import org.onlab.onos.net.ConnectPoint; -import org.onlab.onos.net.DefaultDevice; -import org.onlab.onos.net.DefaultLink; -import org.onlab.onos.net.DefaultPort; -import org.onlab.onos.net.Device; -import org.onlab.onos.net.DeviceId; -import org.onlab.onos.net.Element; -import org.onlab.onos.net.Link; -import org.onlab.onos.net.LinkKey; -import org.onlab.onos.net.MastershipRole; -import org.onlab.onos.net.Port; -import org.onlab.onos.net.PortNumber; -import org.onlab.onos.net.provider.ProviderId; -import org.onlab.onos.store.serializers.ConnectPointSerializer; -import org.onlab.onos.store.serializers.DefaultLinkSerializer; -import org.onlab.onos.store.serializers.DefaultPortSerializer; -import org.onlab.onos.store.serializers.DeviceIdSerializer; -import org.onlab.onos.store.serializers.IpPrefixSerializer; -import org.onlab.onos.store.serializers.LinkKeySerializer; -import org.onlab.onos.store.serializers.NodeIdSerializer; -import org.onlab.onos.store.serializers.PortNumberSerializer; -import org.onlab.onos.store.serializers.ProviderIdSerializer; -import org.onlab.packet.IpPrefix; -import org.onlab.util.KryoPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; -import java.net.URI; -import java.util.ArrayList; -import java.util.HashMap; /** * Auxiliary bootstrap of distributed store. @@ -58,55 +26,18 @@ public class StoreManager implements StoreService { private final Logger log = LoggerFactory.getLogger(getClass()); protected HazelcastInstance instance; - private KryoPool serializerPool; - @Activate public void activate() { try { Config config = new FileSystemXmlConfig(HAZELCAST_XML_FILE); instance = Hazelcast.newHazelcastInstance(config); - setupKryoPool(); log.info("Started"); } catch (FileNotFoundException e) { log.error("Unable to configure Hazelcast", e); } } - /** - * Sets up the common serialzers pool. - */ - protected void setupKryoPool() { - // FIXME Slice out types used in common to separate pool/namespace. - serializerPool = KryoPool.newBuilder() - .register(ArrayList.class, - HashMap.class, - - ControllerNode.State.class, - Device.Type.class, - - DefaultControllerNode.class, - DefaultDevice.class, - MastershipRole.class, - Port.class, - Element.class, - - Link.Type.class - ) - .register(IpPrefix.class, new IpPrefixSerializer()) - .register(URI.class, new URISerializer()) - .register(NodeId.class, new NodeIdSerializer()) - .register(ProviderId.class, new ProviderIdSerializer()) - .register(DeviceId.class, new DeviceIdSerializer()) - .register(PortNumber.class, new PortNumberSerializer()) - .register(DefaultPort.class, new DefaultPortSerializer()) - .register(LinkKey.class, new LinkKeySerializer()) - .register(ConnectPoint.class, new ConnectPointSerializer()) - .register(DefaultLink.class, new DefaultLinkSerializer()) - .build() - .populate(10); - } - @Deactivate public void deactivate() { instance.shutdown(); @@ -118,18 +49,4 @@ public class StoreManager implements StoreService { return instance; } - - @Override - public byte[] serialize(final Object obj) { - return serializerPool.serialize(obj); - } - - @Override - public T deserialize(final byte[] bytes) { - if (bytes == null) { - return null; - } - return serializerPool.deserialize(bytes); - } - } diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/StoreService.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/StoreService.java index 490183ff57..8cc50c7aac 100644 --- a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/StoreService.java +++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/StoreService.java @@ -15,22 +15,4 @@ public interface StoreService { */ HazelcastInstance getHazelcastInstance(); - /** - * Serializes the specified object into bytes using one of the - * pre-registered serializers. - * - * @param obj object to be serialized - * @return serialized bytes - */ - public byte[] serialize(final Object obj); - - /** - * Deserializes the specified bytes into an object using one of the - * pre-registered serializers. - * - * @param bytes bytes to be deserialized - * @return deserialized object - */ - public T deserialize(final byte[] bytes); - } diff --git a/core/store/hz/common/src/test/java/org/onlab/onos/store/common/TestStoreManager.java b/core/store/hz/common/src/test/java/org/onlab/onos/store/common/TestStoreManager.java index 1914fc354f..f4cd4b8080 100644 --- a/core/store/hz/common/src/test/java/org/onlab/onos/store/common/TestStoreManager.java +++ b/core/store/hz/common/src/test/java/org/onlab/onos/store/common/TestStoreManager.java @@ -46,9 +46,8 @@ public class TestStoreManager extends StoreManager { this.instance = instance; } - // Hazelcast setup removed from original code. @Override public void activate() { - setupKryoPool(); + // Hazelcast setup removed from original code. } } diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java index dcf2a3d28b..a3d340b3cc 100644 --- a/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java +++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java @@ -72,6 +72,10 @@ public class DistributedDeviceStore private IMap rawDevicePorts; private LoadingCache>> devicePorts; + private String devicesListener; + + private String portsListener; + @Override @Activate public void activate() { @@ -83,20 +87,20 @@ public class DistributedDeviceStore // TODO decide on Map name scheme to avoid collision rawDevices = theInstance.getMap("devices"); final OptionalCacheLoader deviceLoader - = new OptionalCacheLoader<>(storeService, rawDevices); + = new OptionalCacheLoader<>(kryoSerializationService, rawDevices); devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader)); // refresh/populate cache based on notification from other instance - rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue); + devicesListener = rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue); // TODO cache availableDevices availableDevices = theInstance.getSet("availableDevices"); rawDevicePorts = theInstance.getMap("devicePorts"); final OptionalCacheLoader> devicePortLoader - = new OptionalCacheLoader<>(storeService, rawDevicePorts); + = new OptionalCacheLoader<>(kryoSerializationService, rawDevicePorts); devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader)); // refresh/populate cache based on notification from other instance - rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue); + portsListener = rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue); loadDeviceCache(); loadDevicePortsCache(); @@ -106,6 +110,8 @@ public class DistributedDeviceStore @Deactivate public void deactivate() { + rawDevicePorts.removeEntryListener(portsListener); + rawDevices.removeEntryListener(devicesListener); log.info("Stopped"); } @@ -354,7 +360,7 @@ public class DistributedDeviceStore } } - private class RemoteDeviceEventHandler extends RemoteEventHandler { + private class RemoteDeviceEventHandler extends RemoteCacheEventHandler { public RemoteDeviceEventHandler(LoadingCache> cache) { super(cache); } @@ -375,7 +381,7 @@ public class DistributedDeviceStore } } - private class RemotePortEventHandler extends RemoteEventHandler> { + private class RemotePortEventHandler extends RemoteCacheEventHandler> { public RemotePortEventHandler(LoadingCache>> cache) { super(cache); } diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java index d74ea49e95..5161f2fe42 100644 --- a/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java +++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java @@ -58,6 +58,8 @@ public class DistributedLinkStore private final Multimap srcLinks = HashMultimap.create(); private final Multimap dstLinks = HashMultimap.create(); + private String linksListener; + @Override @Activate public void activate() { @@ -68,10 +70,10 @@ public class DistributedLinkStore // TODO decide on Map name scheme to avoid collision rawLinks = theInstance.getMap("links"); final OptionalCacheLoader linkLoader - = new OptionalCacheLoader<>(storeService, rawLinks); + = new OptionalCacheLoader<>(kryoSerializationService, rawLinks); links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader)); // refresh/populate cache based on notification from other instance - rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue); + linksListener = rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue); loadLinkCache(); @@ -80,7 +82,7 @@ public class DistributedLinkStore @Deactivate public void deactivate() { - super.activate(); + rawLinks.removeEntryListener(linksListener); log.info("Stopped"); } @@ -233,7 +235,7 @@ public class DistributedLinkStore } } - private class RemoteLinkEventHandler extends RemoteEventHandler { + private class RemoteLinkEventHandler extends RemoteCacheEventHandler { public RemoteLinkEventHandler(LoadingCache> cache) { super(cache); } diff --git a/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java b/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java index 2fdad7437a..97b9ebe239 100644 --- a/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java +++ b/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java @@ -20,6 +20,7 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.onlab.onos.net.Device; import org.onlab.onos.net.DeviceId; @@ -35,12 +36,17 @@ import org.onlab.onos.net.provider.ProviderId; import org.onlab.onos.store.common.StoreManager; import org.onlab.onos.store.common.StoreService; import org.onlab.onos.store.common.TestStoreManager; +import org.onlab.onos.store.serializers.KryoSerializationManager; +import org.onlab.onos.store.serializers.KryoSerializationService; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.hazelcast.config.Config; import com.hazelcast.core.Hazelcast; +/** + * Test of the Hazelcast based distributed DeviceStore implementation. + */ public class DistributedDeviceStoreTest { private static final ProviderId PID = new ProviderId("of", "foo"); @@ -57,6 +63,7 @@ public class DistributedDeviceStoreTest { private static final PortNumber P3 = PortNumber.portNumber(3); private DistributedDeviceStore deviceStore; + private KryoSerializationManager serializationMgr; private StoreManager storeManager; @@ -78,7 +85,10 @@ public class DistributedDeviceStoreTest { storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); storeManager.activate(); - deviceStore = new TestDistributedDeviceStore(storeManager); + serializationMgr = new KryoSerializationManager(); + serializationMgr.activate(); + + deviceStore = new TestDistributedDeviceStore(storeManager, serializationMgr); deviceStore.activate(); } @@ -86,6 +96,8 @@ public class DistributedDeviceStoreTest { public void tearDown() throws Exception { deviceStore.deactivate(); + serializationMgr.deactivate(); + storeManager.deactivate(); } @@ -326,6 +338,7 @@ public class DistributedDeviceStoreTest { } // TODO add test for Port events when we have them + @Ignore("Ignore until Delegate spec. is clear.") @Test public final void testEvents() throws InterruptedException { final CountDownLatch addLatch = new CountDownLatch(1); @@ -379,8 +392,10 @@ public class DistributedDeviceStoreTest { } private class TestDistributedDeviceStore extends DistributedDeviceStore { - public TestDistributedDeviceStore(StoreService storeService) { + public TestDistributedDeviceStore(StoreService storeService, + KryoSerializationService kryoSerializationService) { this.storeService = storeService; + this.kryoSerializationService = kryoSerializationService; } } } diff --git a/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java b/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java index 0f973582db..a76e901ef7 100644 --- a/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java +++ b/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java @@ -15,6 +15,7 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.onlab.onos.net.ConnectPoint; import org.onlab.onos.net.DeviceId; @@ -29,27 +30,28 @@ import org.onlab.onos.net.provider.ProviderId; import org.onlab.onos.store.common.StoreManager; import org.onlab.onos.store.common.StoreService; import org.onlab.onos.store.common.TestStoreManager; +import org.onlab.onos.store.serializers.KryoSerializationManager; +import org.onlab.onos.store.serializers.KryoSerializationService; import com.google.common.collect.Iterables; import com.hazelcast.config.Config; import com.hazelcast.core.Hazelcast; +/** + * Test of the Hazelcast based distributed LinkStore implementation. + */ public class DistributedLinkStoreTest { private static final ProviderId PID = new ProviderId("of", "foo"); private static final DeviceId DID1 = deviceId("of:foo"); private static final DeviceId DID2 = deviceId("of:bar"); -// private static final String MFR = "whitebox"; -// private static final String HW = "1.1.x"; -// private static final String SW1 = "3.8.1"; -// private static final String SW2 = "3.9.5"; -// private static final String SN = "43311-12345"; private static final PortNumber P1 = PortNumber.portNumber(1); private static final PortNumber P2 = PortNumber.portNumber(2); private static final PortNumber P3 = PortNumber.portNumber(3); private StoreManager storeManager; + private KryoSerializationManager serializationMgr; private DistributedLinkStore linkStore; @@ -69,13 +71,17 @@ public class DistributedLinkStoreTest { storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config)); storeManager.activate(); - linkStore = new TestDistributedLinkStore(storeManager); + serializationMgr = new KryoSerializationManager(); + serializationMgr.activate(); + + linkStore = new TestDistributedLinkStore(storeManager, serializationMgr); linkStore.activate(); } @After public void tearDown() throws Exception { linkStore.deactivate(); + serializationMgr.deactivate(); storeManager.deactivate(); } @@ -302,6 +308,7 @@ public class DistributedLinkStoreTest { assertLink(linkId2, DIRECT, linkStore.getLink(d2P2, d1P1)); } + @Ignore("Ignore until Delegate spec. is clear.") @Test public final void testEvents() throws InterruptedException { @@ -354,8 +361,10 @@ public class DistributedLinkStoreTest { class TestDistributedLinkStore extends DistributedLinkStore { - TestDistributedLinkStore(StoreService storeService) { + TestDistributedLinkStore(StoreService storeService, + KryoSerializationService kryoSerializationService) { this.storeService = storeService; + this.kryoSerializationService = kryoSerializationService; } } } diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java new file mode 100644 index 0000000000..84e1b732f1 --- /dev/null +++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java @@ -0,0 +1,103 @@ +package org.onlab.onos.store.serializers; + +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.onlab.onos.cluster.ControllerNode; +import org.onlab.onos.cluster.DefaultControllerNode; +import org.onlab.onos.cluster.NodeId; +import org.onlab.onos.net.ConnectPoint; +import org.onlab.onos.net.DefaultDevice; +import org.onlab.onos.net.DefaultLink; +import org.onlab.onos.net.DefaultPort; +import org.onlab.onos.net.Device; +import org.onlab.onos.net.DeviceId; +import org.onlab.onos.net.Element; +import org.onlab.onos.net.Link; +import org.onlab.onos.net.LinkKey; +import org.onlab.onos.net.MastershipRole; +import org.onlab.onos.net.Port; +import org.onlab.onos.net.PortNumber; +import org.onlab.onos.net.provider.ProviderId; +import org.onlab.packet.IpPrefix; +import org.onlab.util.KryoPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import de.javakaffee.kryoserializers.URISerializer; + +/** + * Serialization service using Kryo. + */ +@Component(immediate = true) +@Service +public class KryoSerializationManager implements KryoSerializationService { + + private final Logger log = LoggerFactory.getLogger(getClass()); + private KryoPool serializerPool; + + + @Activate + public void activate() { + setupKryoPool(); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + log.info("Stopped"); + } + + /** + * Sets up the common serialzers pool. + */ + protected void setupKryoPool() { + // FIXME Slice out types used in common to separate pool/namespace. + serializerPool = KryoPool.newBuilder() + .register(ArrayList.class, + HashMap.class, + + ControllerNode.State.class, + Device.Type.class, + + DefaultControllerNode.class, + DefaultDevice.class, + MastershipRole.class, + Port.class, + Element.class, + + Link.Type.class + ) + .register(IpPrefix.class, new IpPrefixSerializer()) + .register(URI.class, new URISerializer()) + .register(NodeId.class, new NodeIdSerializer()) + .register(ProviderId.class, new ProviderIdSerializer()) + .register(DeviceId.class, new DeviceIdSerializer()) + .register(PortNumber.class, new PortNumberSerializer()) + .register(DefaultPort.class, new DefaultPortSerializer()) + .register(LinkKey.class, new LinkKeySerializer()) + .register(ConnectPoint.class, new ConnectPointSerializer()) + .register(DefaultLink.class, new DefaultLinkSerializer()) + .build() + .populate(1); + } + + @Override + public byte[] serialize(final Object obj) { + return serializerPool.serialize(obj); + } + + @Override + public T deserialize(final byte[] bytes) { + if (bytes == null) { + return null; + } + return serializerPool.deserialize(bytes); + } + +} diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java new file mode 100644 index 0000000000..e92cc4bc5a --- /dev/null +++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java @@ -0,0 +1,27 @@ +package org.onlab.onos.store.serializers; + +// TODO: To be replaced with SerializationService from IOLoop activity +/** + * Service to serialize Objects into byte array. + */ +public interface KryoSerializationService { + + /** + * Serializes the specified object into bytes using one of the + * pre-registered serializers. + * + * @param obj object to be serialized + * @return serialized bytes + */ + public byte[] serialize(final Object obj); + + /** + * Deserializes the specified bytes into an object using one of the + * pre-registered serializers. + * + * @param bytes bytes to be deserialized + * @return deserialized object + */ + public T deserialize(final byte[] bytes); + +} diff --git a/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleClusterStore.java b/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleClusterStore.java index d348d2f157..2208c861ed 100644 --- a/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleClusterStore.java +++ b/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleClusterStore.java @@ -20,7 +20,7 @@ import java.util.Set; import static org.slf4j.LoggerFactory.getLogger; /** - * Manages inventory of infrastructure DEVICES using trivial in-memory + * Manages inventory of infrastructure devices using trivial in-memory * structures implementation. */ @Component(immediate = true) @@ -67,6 +67,11 @@ public class SimpleClusterStore return ControllerNode.State.ACTIVE; } + @Override + public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) { + return null; + } + @Override public void removeNode(NodeId nodeId) { } diff --git a/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleDeviceStore.java b/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleDeviceStore.java index 15dba066d7..7c7d38f085 100644 --- a/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleDeviceStore.java +++ b/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleDeviceStore.java @@ -101,9 +101,6 @@ public class SimpleDeviceStore synchronized (this) { devices.put(deviceId, device); availableDevices.add(deviceId); - - // For now claim the device as a master automatically. - // roles.put(deviceId, MastershipRole.MASTER); } return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, device, null); } @@ -189,7 +186,7 @@ public class SimpleDeviceStore new DefaultPort(device, portDescription.portNumber(), portDescription.isEnabled()); ports.put(port.number(), updatedPort); - return new DeviceEvent(PORT_UPDATED, device, port); + return new DeviceEvent(PORT_UPDATED, device, updatedPort); } return null; } diff --git a/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleLinkStore.java b/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleLinkStore.java index 17bbc8840d..319df89aca 100644 --- a/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleLinkStore.java +++ b/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleLinkStore.java @@ -51,8 +51,6 @@ public class SimpleLinkStore private final Multimap srcLinks = HashMultimap.create(); private final Multimap dstLinks = HashMultimap.create(); - private static final Set EMPTY = ImmutableSet.of(); - @Activate public void activate() { log.info("Started"); diff --git a/core/store/trivial/src/test/java/org/onlab/onos/net/trivial/impl/SimpleDeviceStoreTest.java b/core/store/trivial/src/test/java/org/onlab/onos/net/trivial/impl/SimpleDeviceStoreTest.java new file mode 100644 index 0000000000..f973d9b1d9 --- /dev/null +++ b/core/store/trivial/src/test/java/org/onlab/onos/net/trivial/impl/SimpleDeviceStoreTest.java @@ -0,0 +1,376 @@ +/** + * + */ +package org.onlab.onos.net.trivial.impl; + +import static org.junit.Assert.*; +import static org.onlab.onos.net.Device.Type.SWITCH; +import static org.onlab.onos.net.DeviceId.deviceId; +import static org.onlab.onos.net.device.DeviceEvent.Type.*; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.onlab.onos.net.Device; +import org.onlab.onos.net.DeviceId; +import org.onlab.onos.net.Port; +import org.onlab.onos.net.PortNumber; +import org.onlab.onos.net.device.DefaultDeviceDescription; +import org.onlab.onos.net.device.DefaultPortDescription; +import org.onlab.onos.net.device.DeviceDescription; +import org.onlab.onos.net.device.DeviceEvent; +import org.onlab.onos.net.device.DeviceStore; +import org.onlab.onos.net.device.DeviceStoreDelegate; +import org.onlab.onos.net.device.PortDescription; +import org.onlab.onos.net.provider.ProviderId; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +/** + * Test of the simple DeviceStore implementation. + */ +public class SimpleDeviceStoreTest { + + private static final ProviderId PID = new ProviderId("of", "foo"); + private static final DeviceId DID1 = deviceId("of:foo"); + private static final DeviceId DID2 = deviceId("of:bar"); + private static final String MFR = "whitebox"; + private static final String HW = "1.1.x"; + private static final String SW1 = "3.8.1"; + private static final String SW2 = "3.9.5"; + private static final String SN = "43311-12345"; + + private static final PortNumber P1 = PortNumber.portNumber(1); + private static final PortNumber P2 = PortNumber.portNumber(2); + private static final PortNumber P3 = PortNumber.portNumber(3); + + private SimpleDeviceStore simpleDeviceStore; + private DeviceStore deviceStore; + + + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + + @Before + public void setUp() throws Exception { + simpleDeviceStore = new SimpleDeviceStore(); + simpleDeviceStore.activate(); + deviceStore = simpleDeviceStore; + } + + @After + public void tearDown() throws Exception { + simpleDeviceStore.deactivate(); + } + + private void putDevice(DeviceId deviceId, String swVersion) { + DeviceDescription description = + new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR, + HW, swVersion, SN); + deviceStore.createOrUpdateDevice(PID, deviceId, description); + } + + private static void assertDevice(DeviceId id, String swVersion, Device device) { + assertNotNull(device); + assertEquals(id, device.id()); + assertEquals(MFR, device.manufacturer()); + assertEquals(HW, device.hwVersion()); + assertEquals(swVersion, device.swVersion()); + assertEquals(SN, device.serialNumber()); + } + + @Test + public final void testGetDeviceCount() { + assertEquals("initialy empty", 0, deviceStore.getDeviceCount()); + + putDevice(DID1, SW1); + putDevice(DID2, SW2); + putDevice(DID1, SW1); + + assertEquals("expect 2 uniq devices", 2, deviceStore.getDeviceCount()); + } + + @Test + public final void testGetDevices() { + assertEquals("initialy empty", 0, Iterables.size(deviceStore.getDevices())); + + putDevice(DID1, SW1); + putDevice(DID2, SW2); + putDevice(DID1, SW1); + + assertEquals("expect 2 uniq devices", + 2, Iterables.size(deviceStore.getDevices())); + + Map devices = new HashMap<>(); + for (Device device : deviceStore.getDevices()) { + devices.put(device.id(), device); + } + + assertDevice(DID1, SW1, devices.get(DID1)); + assertDevice(DID2, SW2, devices.get(DID2)); + + // add case for new node? + } + + @Test + public final void testGetDevice() { + + putDevice(DID1, SW1); + + assertDevice(DID1, SW1, deviceStore.getDevice(DID1)); + assertNull("DID2 shouldn't be there", deviceStore.getDevice(DID2)); + } + + @Test + public final void testCreateOrUpdateDevice() { + DeviceDescription description = + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR, + HW, SW1, SN); + DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description); + assertEquals(DEVICE_ADDED, event.type()); + assertDevice(DID1, SW1, event.subject()); + + DeviceDescription description2 = + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR, + HW, SW2, SN); + DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2); + assertEquals(DEVICE_UPDATED, event2.type()); + assertDevice(DID1, SW2, event2.subject()); + + assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2)); + } + + @Test + public final void testMarkOffline() { + + putDevice(DID1, SW1); + assertTrue(deviceStore.isAvailable(DID1)); + + DeviceEvent event = deviceStore.markOffline(DID1); + assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type()); + assertDevice(DID1, SW1, event.subject()); + assertFalse(deviceStore.isAvailable(DID1)); + + DeviceEvent event2 = deviceStore.markOffline(DID1); + assertNull("No change, no event", event2); +} + + @Test + public final void testUpdatePorts() { + putDevice(DID1, SW1); + List pds = Arrays.asList( + new DefaultPortDescription(P1, true), + new DefaultPortDescription(P2, true) + ); + + List events = deviceStore.updatePorts(DID1, pds); + + Set expectedPorts = Sets.newHashSet(P1, P2); + for (DeviceEvent event : events) { + assertEquals(PORT_ADDED, event.type()); + assertDevice(DID1, SW1, event.subject()); + assertTrue("PortNumber is one of expected", + expectedPorts.remove(event.port().number())); + assertTrue("Port is enabled", event.port().isEnabled()); + } + assertTrue("Event for all expectedport appeared", expectedPorts.isEmpty()); + + + List pds2 = Arrays.asList( + new DefaultPortDescription(P1, false), + new DefaultPortDescription(P2, true), + new DefaultPortDescription(P3, true) + ); + + events = deviceStore.updatePorts(DID1, pds2); + assertFalse("event should be triggered", events.isEmpty()); + for (DeviceEvent event : events) { + PortNumber num = event.port().number(); + if (P1.equals(num)) { + assertEquals(PORT_UPDATED, event.type()); + assertDevice(DID1, SW1, event.subject()); + assertFalse("Port is disabled", event.port().isEnabled()); + } else if (P2.equals(num)) { + fail("P2 event not expected."); + } else if (P3.equals(num)) { + assertEquals(PORT_ADDED, event.type()); + assertDevice(DID1, SW1, event.subject()); + assertTrue("Port is enabled", event.port().isEnabled()); + } else { + fail("Unknown port number encountered: " + num); + } + } + + List pds3 = Arrays.asList( + new DefaultPortDescription(P1, false), + new DefaultPortDescription(P2, true) + ); + events = deviceStore.updatePorts(DID1, pds3); + assertFalse("event should be triggered", events.isEmpty()); + for (DeviceEvent event : events) { + PortNumber num = event.port().number(); + if (P1.equals(num)) { + fail("P1 event not expected."); + } else if (P2.equals(num)) { + fail("P2 event not expected."); + } else if (P3.equals(num)) { + assertEquals(PORT_REMOVED, event.type()); + assertDevice(DID1, SW1, event.subject()); + assertTrue("Port was enabled", event.port().isEnabled()); + } else { + fail("Unknown port number encountered: " + num); + } + } + + } + + @Test + public final void testUpdatePortStatus() { + putDevice(DID1, SW1); + List pds = Arrays.asList( + new DefaultPortDescription(P1, true) + ); + deviceStore.updatePorts(DID1, pds); + + DeviceEvent event = deviceStore.updatePortStatus(DID1, + new DefaultPortDescription(P1, false)); + assertEquals(PORT_UPDATED, event.type()); + assertDevice(DID1, SW1, event.subject()); + assertEquals(P1, event.port().number()); + assertFalse("Port is disabled", event.port().isEnabled()); + } + + @Test + public final void testGetPorts() { + putDevice(DID1, SW1); + putDevice(DID2, SW1); + List pds = Arrays.asList( + new DefaultPortDescription(P1, true), + new DefaultPortDescription(P2, true) + ); + deviceStore.updatePorts(DID1, pds); + + Set expectedPorts = Sets.newHashSet(P1, P2); + List ports = deviceStore.getPorts(DID1); + for (Port port : ports) { + assertTrue("Port is enabled", port.isEnabled()); + assertTrue("PortNumber is one of expected", + expectedPorts.remove(port.number())); + } + assertTrue("Event for all expectedport appeared", expectedPorts.isEmpty()); + + + assertTrue("DID2 has no ports", deviceStore.getPorts(DID2).isEmpty()); + } + + @Test + public final void testGetPort() { + putDevice(DID1, SW1); + putDevice(DID2, SW1); + List pds = Arrays.asList( + new DefaultPortDescription(P1, true), + new DefaultPortDescription(P2, false) + ); + deviceStore.updatePorts(DID1, pds); + + Port port1 = deviceStore.getPort(DID1, P1); + assertEquals(P1, port1.number()); + assertTrue("Port is enabled", port1.isEnabled()); + + Port port2 = deviceStore.getPort(DID1, P2); + assertEquals(P2, port2.number()); + assertFalse("Port is disabled", port2.isEnabled()); + + Port port3 = deviceStore.getPort(DID1, P3); + assertNull("P3 not expected", port3); + } + + @Test + public final void testRemoveDevice() { + putDevice(DID1, SW1); + putDevice(DID2, SW1); + + assertEquals(2, deviceStore.getDeviceCount()); + + DeviceEvent event = deviceStore.removeDevice(DID1); + assertEquals(DEVICE_REMOVED, event.type()); + assertDevice(DID1, SW1, event.subject()); + + assertEquals(1, deviceStore.getDeviceCount()); + } + + // If Delegates should be called only on remote events, + // then Simple* should never call them, thus not test required. + // TODO add test for Port events when we have them + @Ignore("Ignore until Delegate spec. is clear.") + @Test + public final void testEvents() throws InterruptedException { + final CountDownLatch addLatch = new CountDownLatch(1); + DeviceStoreDelegate checkAdd = new DeviceStoreDelegate() { + @Override + public void notify(DeviceEvent event) { + assertEquals(DEVICE_ADDED, event.type()); + assertDevice(DID1, SW1, event.subject()); + addLatch.countDown(); + } + }; + final CountDownLatch updateLatch = new CountDownLatch(1); + DeviceStoreDelegate checkUpdate = new DeviceStoreDelegate() { + @Override + public void notify(DeviceEvent event) { + assertEquals(DEVICE_UPDATED, event.type()); + assertDevice(DID1, SW2, event.subject()); + updateLatch.countDown(); + } + }; + final CountDownLatch removeLatch = new CountDownLatch(1); + DeviceStoreDelegate checkRemove = new DeviceStoreDelegate() { + @Override + public void notify(DeviceEvent event) { + assertEquals(DEVICE_REMOVED, event.type()); + assertDevice(DID1, SW2, event.subject()); + removeLatch.countDown(); + } + }; + + DeviceDescription description = + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR, + HW, SW1, SN); + deviceStore.setDelegate(checkAdd); + deviceStore.createOrUpdateDevice(PID, DID1, description); + assertTrue("Add event fired", addLatch.await(1, TimeUnit.SECONDS)); + + + DeviceDescription description2 = + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR, + HW, SW2, SN); + deviceStore.unsetDelegate(checkAdd); + deviceStore.setDelegate(checkUpdate); + deviceStore.createOrUpdateDevice(PID, DID1, description2); + assertTrue("Update event fired", updateLatch.await(1, TimeUnit.SECONDS)); + + deviceStore.unsetDelegate(checkUpdate); + deviceStore.setDelegate(checkRemove); + deviceStore.removeDevice(DID1); + assertTrue("Remove event fired", removeLatch.await(1, TimeUnit.SECONDS)); + } +} diff --git a/core/store/trivial/src/test/java/org/onlab/onos/net/trivial/impl/SimpleLinkStoreTest.java b/core/store/trivial/src/test/java/org/onlab/onos/net/trivial/impl/SimpleLinkStoreTest.java new file mode 100644 index 0000000000..50d0e479ac --- /dev/null +++ b/core/store/trivial/src/test/java/org/onlab/onos/net/trivial/impl/SimpleLinkStoreTest.java @@ -0,0 +1,346 @@ +package org.onlab.onos.net.trivial.impl; + +import static org.junit.Assert.*; +import static org.onlab.onos.net.DeviceId.deviceId; +import static org.onlab.onos.net.Link.Type.*; +import static org.onlab.onos.net.link.LinkEvent.Type.*; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.onlab.onos.net.ConnectPoint; +import org.onlab.onos.net.DeviceId; +import org.onlab.onos.net.Link; +import org.onlab.onos.net.LinkKey; +import org.onlab.onos.net.PortNumber; +import org.onlab.onos.net.Link.Type; +import org.onlab.onos.net.link.DefaultLinkDescription; +import org.onlab.onos.net.link.LinkEvent; +import org.onlab.onos.net.link.LinkStore; +import org.onlab.onos.net.link.LinkStoreDelegate; +import org.onlab.onos.net.provider.ProviderId; + +import com.google.common.collect.Iterables; + +/** + * Test of the simple LinkStore implementation. + */ +public class SimpleLinkStoreTest { + + private static final ProviderId PID = new ProviderId("of", "foo"); + private static final DeviceId DID1 = deviceId("of:foo"); + private static final DeviceId DID2 = deviceId("of:bar"); + + private static final PortNumber P1 = PortNumber.portNumber(1); + private static final PortNumber P2 = PortNumber.portNumber(2); + private static final PortNumber P3 = PortNumber.portNumber(3); + + + private SimpleLinkStore simpleLinkStore; + private LinkStore linkStore; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + @Before + public void setUp() throws Exception { + simpleLinkStore = new SimpleLinkStore(); + simpleLinkStore.activate(); + linkStore = simpleLinkStore; + } + + @After + public void tearDown() throws Exception { + simpleLinkStore.deactivate(); + } + + private void putLink(DeviceId srcId, PortNumber srcNum, + DeviceId dstId, PortNumber dstNum, Type type) { + ConnectPoint src = new ConnectPoint(srcId, srcNum); + ConnectPoint dst = new ConnectPoint(dstId, dstNum); + linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type)); + } + + private void putLink(LinkKey key, Type type) { + putLink(key.src().deviceId(), key.src().port(), + key.dst().deviceId(), key.dst().port(), + type); + } + + private static void assertLink(DeviceId srcId, PortNumber srcNum, + DeviceId dstId, PortNumber dstNum, Type type, + Link link) { + assertEquals(srcId, link.src().deviceId()); + assertEquals(srcNum, link.src().port()); + assertEquals(dstId, link.dst().deviceId()); + assertEquals(dstNum, link.dst().port()); + assertEquals(type, link.type()); + } + + private static void assertLink(LinkKey key, Type type, Link link) { + assertLink(key.src().deviceId(), key.src().port(), + key.dst().deviceId(), key.dst().port(), + type, link); + } + + @Test + public final void testGetLinkCount() { + assertEquals("initialy empty", 0, linkStore.getLinkCount()); + + putLink(DID1, P1, DID2, P2, DIRECT); + putLink(DID2, P2, DID1, P1, DIRECT); + putLink(DID1, P1, DID2, P2, DIRECT); + + assertEquals("expecting 2 unique link", 2, linkStore.getLinkCount()); + } + + @Test + public final void testGetLinks() { + assertEquals("initialy empty", 0, + Iterables.size(linkStore.getLinks())); + + LinkKey linkId1 = new LinkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2)); + LinkKey linkId2 = new LinkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1)); + + putLink(linkId1, DIRECT); + putLink(linkId2, DIRECT); + putLink(linkId1, DIRECT); + + assertEquals("expecting 2 unique link", 2, + Iterables.size(linkStore.getLinks())); + + Map links = new HashMap<>(); + for (Link link : linkStore.getLinks()) { + links.put(new LinkKey(link.src(), link.dst()), link); + } + + assertLink(linkId1, DIRECT, links.get(linkId1)); + assertLink(linkId2, DIRECT, links.get(linkId2)); + } + + @Test + public final void testGetDeviceEgressLinks() { + LinkKey linkId1 = new LinkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2)); + LinkKey linkId2 = new LinkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1)); + LinkKey linkId3 = new LinkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3)); + + putLink(linkId1, DIRECT); + putLink(linkId2, DIRECT); + putLink(linkId3, DIRECT); + + // DID1,P1 => DID2,P2 + // DID2,P2 => DID1,P1 + // DID1,P2 => DID2,P3 + + Set links1 = linkStore.getDeviceEgressLinks(DID1); + assertEquals(2, links1.size()); + // check + + Set links2 = linkStore.getDeviceEgressLinks(DID2); + assertEquals(1, links2.size()); + assertLink(linkId2, DIRECT, links2.iterator().next()); + } + + @Test + public final void testGetDeviceIngressLinks() { + LinkKey linkId1 = new LinkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2)); + LinkKey linkId2 = new LinkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1)); + LinkKey linkId3 = new LinkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3)); + + putLink(linkId1, DIRECT); + putLink(linkId2, DIRECT); + putLink(linkId3, DIRECT); + + // DID1,P1 => DID2,P2 + // DID2,P2 => DID1,P1 + // DID1,P2 => DID2,P3 + + Set links1 = linkStore.getDeviceIngressLinks(DID2); + assertEquals(2, links1.size()); + // check + + Set links2 = linkStore.getDeviceIngressLinks(DID1); + assertEquals(1, links2.size()); + assertLink(linkId2, DIRECT, links2.iterator().next()); + } + + @Test + public final void testGetLink() { + ConnectPoint src = new ConnectPoint(DID1, P1); + ConnectPoint dst = new ConnectPoint(DID2, P2); + LinkKey linkId1 = new LinkKey(src, dst); + + putLink(linkId1, DIRECT); + + Link link = linkStore.getLink(src, dst); + assertLink(linkId1, DIRECT, link); + + assertNull("There shouldn't be reverese link", + linkStore.getLink(dst, src)); + } + + @Test + public final void testGetEgressLinks() { + final ConnectPoint d1P1 = new ConnectPoint(DID1, P1); + final ConnectPoint d2P2 = new ConnectPoint(DID2, P2); + LinkKey linkId1 = new LinkKey(d1P1, d2P2); + LinkKey linkId2 = new LinkKey(d2P2, d1P1); + LinkKey linkId3 = new LinkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3)); + + putLink(linkId1, DIRECT); + putLink(linkId2, DIRECT); + putLink(linkId3, DIRECT); + + // DID1,P1 => DID2,P2 + // DID2,P2 => DID1,P1 + // DID1,P2 => DID2,P3 + + Set links1 = linkStore.getEgressLinks(d1P1); + assertEquals(1, links1.size()); + assertLink(linkId1, DIRECT, links1.iterator().next()); + + Set links2 = linkStore.getEgressLinks(d2P2); + assertEquals(1, links2.size()); + assertLink(linkId2, DIRECT, links2.iterator().next()); + } + + @Test + public final void testGetIngressLinks() { + final ConnectPoint d1P1 = new ConnectPoint(DID1, P1); + final ConnectPoint d2P2 = new ConnectPoint(DID2, P2); + LinkKey linkId1 = new LinkKey(d1P1, d2P2); + LinkKey linkId2 = new LinkKey(d2P2, d1P1); + LinkKey linkId3 = new LinkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3)); + + putLink(linkId1, DIRECT); + putLink(linkId2, DIRECT); + putLink(linkId3, DIRECT); + + // DID1,P1 => DID2,P2 + // DID2,P2 => DID1,P1 + // DID1,P2 => DID2,P3 + + Set links1 = linkStore.getIngressLinks(d2P2); + assertEquals(1, links1.size()); + assertLink(linkId1, DIRECT, links1.iterator().next()); + + Set links2 = linkStore.getIngressLinks(d1P1); + assertEquals(1, links2.size()); + assertLink(linkId2, DIRECT, links2.iterator().next()); + } + + @Test + public final void testCreateOrUpdateLink() { + ConnectPoint src = new ConnectPoint(DID1, P1); + ConnectPoint dst = new ConnectPoint(DID2, P2); + + // add link + LinkEvent event = linkStore.createOrUpdateLink(PID, + new DefaultLinkDescription(src, dst, INDIRECT)); + + assertLink(DID1, P1, DID2, P2, INDIRECT, event.subject()); + assertEquals(LINK_ADDED, event.type()); + + // update link type + LinkEvent event2 = linkStore.createOrUpdateLink(PID, + new DefaultLinkDescription(src, dst, DIRECT)); + + assertLink(DID1, P1, DID2, P2, DIRECT, event2.subject()); + assertEquals(LINK_UPDATED, event2.type()); + + // no change + LinkEvent event3 = linkStore.createOrUpdateLink(PID, + new DefaultLinkDescription(src, dst, DIRECT)); + + assertNull("No change event expected", event3); + } + + @Test + public final void testRemoveLink() { + final ConnectPoint d1P1 = new ConnectPoint(DID1, P1); + final ConnectPoint d2P2 = new ConnectPoint(DID2, P2); + LinkKey linkId1 = new LinkKey(d1P1, d2P2); + LinkKey linkId2 = new LinkKey(d2P2, d1P1); + + putLink(linkId1, DIRECT); + putLink(linkId2, DIRECT); + + // DID1,P1 => DID2,P2 + // DID2,P2 => DID1,P1 + // DID1,P2 => DID2,P3 + + LinkEvent event = linkStore.removeLink(d1P1, d2P2); + assertEquals(LINK_REMOVED, event.type()); + LinkEvent event2 = linkStore.removeLink(d1P1, d2P2); + assertNull(event2); + + assertLink(linkId2, DIRECT, linkStore.getLink(d2P2, d1P1)); + } + + // If Delegates should be called only on remote events, + // then Simple* should never call them, thus not test required. + @Ignore("Ignore until Delegate spec. is clear.") + @Test + public final void testEvents() throws InterruptedException { + + final ConnectPoint d1P1 = new ConnectPoint(DID1, P1); + final ConnectPoint d2P2 = new ConnectPoint(DID2, P2); + final LinkKey linkId1 = new LinkKey(d1P1, d2P2); + + final CountDownLatch addLatch = new CountDownLatch(1); + LinkStoreDelegate checkAdd = new LinkStoreDelegate() { + @Override + public void notify(LinkEvent event) { + assertEquals(LINK_ADDED, event.type()); + assertLink(linkId1, INDIRECT, event.subject()); + addLatch.countDown(); + } + }; + final CountDownLatch updateLatch = new CountDownLatch(1); + LinkStoreDelegate checkUpdate = new LinkStoreDelegate() { + @Override + public void notify(LinkEvent event) { + assertEquals(LINK_UPDATED, event.type()); + assertLink(linkId1, DIRECT, event.subject()); + updateLatch.countDown(); + } + }; + final CountDownLatch removeLatch = new CountDownLatch(1); + LinkStoreDelegate checkRemove = new LinkStoreDelegate() { + @Override + public void notify(LinkEvent event) { + assertEquals(LINK_REMOVED, event.type()); + assertLink(linkId1, DIRECT, event.subject()); + removeLatch.countDown(); + } + }; + + linkStore.setDelegate(checkAdd); + putLink(linkId1, INDIRECT); + assertTrue("Add event fired", addLatch.await(1, TimeUnit.SECONDS)); + + linkStore.unsetDelegate(checkAdd); + linkStore.setDelegate(checkUpdate); + putLink(linkId1, DIRECT); + assertTrue("Update event fired", updateLatch.await(1, TimeUnit.SECONDS)); + + linkStore.unsetDelegate(checkUpdate); + linkStore.setDelegate(checkRemove); + linkStore.removeLink(d1P1, d2P2); + assertTrue("Remove event fired", removeLatch.await(1, TimeUnit.SECONDS)); + } +} diff --git a/features/features.xml b/features/features.xml index 2ea8ef85c3..c32bc3dd6c 100644 --- a/features/features.xml +++ b/features/features.xml @@ -48,20 +48,17 @@ description="ONOS core components"> onos-api mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT - mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT - mvn:org.onlab.onos/onos-core-serializers/1.0.0-SNAPSHOT - mvn:org.onlab.onos/onos-core-hz-cluster/1.0.0-SNAPSHOT - mvn:org.onlab.onos/onos-core-hz-net/1.0.0-SNAPSHOT + mvn:org.onlab.onos/onos-core-dist/1.0.0-SNAPSHOT - + onos-api mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT mvn:org.onlab.onos/onos-core-serializers/1.0.0-SNAPSHOT mvn:org.onlab.onos/onos-core-hz-cluster/1.0.0-SNAPSHOT - mvn:org.onlab.onos/onos-core-dist/1.0.0-SNAPSHOT + mvn:org.onlab.onos/onos-core-hz-net/1.0.0-SNAPSHOT /opt/onos/var/stderr.log - sleep 3 + sleep 2 end script script diff --git a/tools/test/bin/onos-config b/tools/test/bin/onos-config index 9f1e3b0ec5..4c4f7e15c2 100755 --- a/tools/test/bin/onos-config +++ b/tools/test/bin/onos-config @@ -8,7 +8,21 @@ remote=$ONOS_USER@${1:-$OCI} +# Generate a cluster.json from the ON* environment variables +CDEF_FILE=/tmp/cluster.json +echo "{ \"nodes\":[" > $CDEF_FILE +for node in $(env | sort | egrep "OC[2-9]+" | cut -d= -f2); do + echo " { \"id\": \"$node\", \"ip\": \"$node\", \"tcpPort\": 9876 }," >> $CDEF_FILE +done +echo " { \"id\": \"$OC1\", \"ip\": \"$OC1\", \"tcpPort\": 9876 }" >> $CDEF_FILE +echo "]}" >> $CDEF_FILE + ssh $remote " sudo perl -pi.bak -e \"s/ .*${ONOS_NIC:-192.168.56.*}> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/system.properties +" + +scp -q $CDEF_FILE $remote:$ONOS_INSTALL_DIR/config/ \ No newline at end of file diff --git a/tools/test/bin/onos-install b/tools/test/bin/onos-install index d594105f1b..a87ff17049 100755 --- a/tools/test/bin/onos-install +++ b/tools/test/bin/onos-install @@ -24,6 +24,7 @@ ssh $remote " # Make a link to the log file directory and make a home for auxiliaries ln -s $ONOS_INSTALL_DIR/$KARAF_DIST/data/log /opt/onos/log mkdir $ONOS_INSTALL_DIR/var + mkdir $ONOS_INSTALL_DIR/config # Install the upstart configuration file and setup options for debugging sudo cp $ONOS_INSTALL_DIR/debian/onos.conf /etc/init/onos.conf diff --git a/tools/test/cells/local b/tools/test/cells/local index b04a5e3787..6b9fea59b6 100644 --- a/tools/test/cells/local +++ b/tools/test/cells/local @@ -1,6 +1,8 @@ # Default virtual box ONOS instances 1,2 & ONOS mininet box . $ONOS_ROOT/tools/test/cells/.reset +export ONOS_NIC=192.168.56.* + export OC1="192.168.56.101" export OC2="192.168.56.102" diff --git a/utils/misc/src/main/java/org/onlab/util/KryoPool.java b/utils/misc/src/main/java/org/onlab/util/KryoPool.java index 58c268ceca..be662a672c 100644 --- a/utils/misc/src/main/java/org/onlab/util/KryoPool.java +++ b/utils/misc/src/main/java/org/onlab/util/KryoPool.java @@ -1,5 +1,6 @@ package org.onlab.util; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -8,6 +9,8 @@ import org.apache.commons.lang3.tuple.Pair; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.ByteBufferInput; +import com.esotericsoftware.kryo.io.ByteBufferOutput; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.google.common.collect.ImmutableList; @@ -173,6 +176,22 @@ public final class KryoPool { } } + /** + * Serializes given object to byte buffer using Kryo instance in pool. + * + * @param obj Object to serialize + * @param buffer to write to + */ + public void serialize(final Object obj, final ByteBuffer buffer) { + ByteBufferOutput out = new ByteBufferOutput(buffer); + Kryo kryo = getKryo(); + try { + kryo.writeClassAndObject(out, obj); + } finally { + putKryo(kryo); + } + } + /** * Deserializes given byte array to Object using Kryo instance in pool. * @@ -192,6 +211,24 @@ public final class KryoPool { } } + /** + * Deserializes given byte buffer to Object using Kryo instance in pool. + * + * @param buffer input with serialized bytes + * @param deserialized Object type + * @return deserialized Object + */ + public T deserialize(final ByteBuffer buffer) { + ByteBufferInput in = new ByteBufferInput(buffer); + Kryo kryo = getKryo(); + try { + @SuppressWarnings("unchecked") + T obj = (T) kryo.readClassAndObject(in); + return obj; + } finally { + putKryo(kryo); + } + } /** * Creates a Kryo instance with {@link #registeredTypes} pre-registered. diff --git a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java index 1309330632..dc3ecaf0b6 100644 --- a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java +++ b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java @@ -53,6 +53,15 @@ public abstract class IOLoop> super(timeout); } + /** + * Returns the number of message stream in custody of the loop. + * + * @return number of message streams + */ + public int streamCount() { + return streams.size(); + } + /** * Creates a new message stream backed by the specified socket channel. * @@ -84,14 +93,9 @@ public abstract class IOLoop> * * @param key selection key holding the pending connect operation. */ - protected void connect(SelectionKey key) { - try { - SocketChannel ch = (SocketChannel) key.channel(); - ch.finishConnect(); - } catch (IOException | IllegalStateException e) { - log.warn("Unable to complete connection", e); - } - + protected void connect(SelectionKey key) throws IOException { + SocketChannel ch = (SocketChannel) key.channel(); + ch.finishConnect(); if (key.isValid()) { key.interestOps(SelectionKey.OP_READ); } @@ -115,7 +119,11 @@ public abstract class IOLoop> // If there is a pending connect operation, complete it. if (key.isConnectable()) { - connect(key); + try { + connect(key); + } catch (IOException | IllegalStateException e) { + log.warn("Unable to complete connection", e); + } } // If there is a read operation, slurp as much data as possible. @@ -182,9 +190,10 @@ public abstract class IOLoop> * with a pending accept operation. * * @param channel backing socket channel + * @return newly accepted message stream */ - public void acceptStream(SocketChannel channel) { - createAndAdmit(channel, SelectionKey.OP_READ); + public S acceptStream(SocketChannel channel) { + return createAndAdmit(channel, SelectionKey.OP_READ); } @@ -193,9 +202,10 @@ public abstract class IOLoop> * with a pending connect operation. * * @param channel backing socket channel + * @return newly connected message stream */ - public void connectStream(SocketChannel channel) { - createAndAdmit(channel, SelectionKey.OP_CONNECT); + public S connectStream(SocketChannel channel) { + return createAndAdmit(channel, SelectionKey.OP_CONNECT); } /** @@ -205,12 +215,14 @@ public abstract class IOLoop> * @param channel socket channel * @param op pending operations mask to be applied to the selection * key as a set of initial interestedOps + * @return newly created message stream */ - private synchronized void createAndAdmit(SocketChannel channel, int op) { + private synchronized S createAndAdmit(SocketChannel channel, int op) { S stream = createStream(channel); streams.add(stream); newStreamRequests.add(new NewStreamRequest(stream, channel, op)); selector.wakeup(); + return stream; } /** diff --git a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java index 89107bfc3b..c38f0f514d 100644 --- a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java +++ b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java @@ -10,6 +10,7 @@ import java.nio.channels.ByteChannel; import java.nio.channels.SelectionKey; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -170,7 +171,7 @@ public abstract class MessageStream { } /** - * Reads, withouth blocking, a list of messages from the stream. + * Reads, without blocking, a list of messages from the stream. * The list will be empty if there were not messages pending. * * @return list of messages or null if backing channel has been closed @@ -262,7 +263,7 @@ public abstract class MessageStream { try { channel.write(outbound); } catch (IOException e) { - if (!closed && !e.getMessage().equals("Broken pipe")) { + if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) { log.warn("Unable to write data", e); ioError = e; } diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java index bdcc97a00f..bbeedd00c1 100644 --- a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java +++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java @@ -230,7 +230,7 @@ public class IOLoopTestClient { } @Override - protected void connect(SelectionKey key) { + protected void connect(SelectionKey key) throws IOException { super.connect(key); TestMessageStream b = (TestMessageStream) key.attachment(); Worker w = ((CustomIOLoop) b.loop()).worker;