From 8b5051d20241cf2393a26258848da5607498e32b Mon Sep 17 00:00:00 2001 From: Sho SHIMIZU Date: Wed, 5 Nov 2014 11:24:13 -0800 Subject: [PATCH 01/26] Make logger static final variable Change-Id: I1ea7f3bb9a74a1b1f139512f3d253a1916b30c3e --- .../main/java/org/onlab/onos/net/intent/impl/IntentManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java index 31df87aa52..af2af248d5 100644 --- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java +++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java @@ -77,7 +77,7 @@ import com.google.common.collect.Lists; @Service public class IntentManager implements IntentService, IntentExtensionService { - private final Logger log = getLogger(getClass()); + private static final Logger log = getLogger(IntentManager.class); public static final String INTENT_NULL = "Intent cannot be null"; public static final String INTENT_ID_NULL = "Intent ID cannot be null"; From 5027b6b2d52fbef45c743be5ab00ae062f9bfad7 Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Wed, 5 Nov 2014 16:23:26 -0800 Subject: [PATCH 02/26] CopyCat: Dynamic cluster support Change-Id: I887c52b35811abf37a2b59db034b07ccf01eed2c --- .../store/service/impl/DatabaseManager.java | 80 +++++++++++++++---- .../impl/TcpClusterConfigSerializer.java | 30 +++++++ .../service/impl/TcpMemberSerializer.java | 24 ++++++ 3 files changed, 120 insertions(+), 14 deletions(-) create mode 100644 core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpClusterConfigSerializer.java create mode 100644 core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpMemberSerializer.java diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java index 183a6dbb78..19ee882b60 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java @@ -8,17 +8,21 @@ import java.util.List; import net.kuujo.copycat.Copycat; import net.kuujo.copycat.StateMachine; +import net.kuujo.copycat.cluster.ClusterConfig; import net.kuujo.copycat.cluster.TcpCluster; import net.kuujo.copycat.cluster.TcpClusterConfig; import net.kuujo.copycat.cluster.TcpMember; import net.kuujo.copycat.log.InMemoryLog; import net.kuujo.copycat.log.Log; + import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; +import org.onlab.onos.cluster.ClusterEvent; +import org.onlab.onos.cluster.ClusterEventListener; import org.onlab.onos.cluster.ClusterService; import org.onlab.onos.cluster.ControllerNode; import org.onlab.onos.store.service.DatabaseAdminService; @@ -35,8 +39,6 @@ import org.onlab.onos.store.service.WriteRequest; import org.onlab.onos.store.service.WriteResult; import org.slf4j.Logger; -import com.google.common.collect.Lists; - /** * Strongly consistent and durable state management service based on * Copycat implementation of Raft consensus protocol. @@ -58,17 +60,29 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { private Copycat copycat; private DatabaseClient client; + // TODO: check if synchronization is required to read/modify this + private ClusterConfig clusterConfig; + + private ClusterEventListener clusterEventListener; + @Activate public void activate() { - log.info("Starting."); - // TODO: Not every node can be part of the consensus ring. + // TODO: Not every node should be part of the consensus ring. + final ControllerNode localNode = clusterService.getLocalNode(); TcpMember localMember = new TcpMember( - clusterService.getLocalNode().ip().toString(), - clusterService.getLocalNode().tcpPort()); - List remoteMembers = Lists.newArrayList(); + localNode.ip().toString(), + localNode.tcpPort()); + + clusterConfig = new TcpClusterConfig(); + clusterConfig.setLocalMember(localMember); + + List remoteMembers = new ArrayList<>(clusterService.getNodes().size()); + + clusterEventListener = new InternalClusterEventListener(); + clusterService.addListener(clusterEventListener); for (ControllerNode node : clusterService.getNodes()) { TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort()); @@ -76,21 +90,18 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { remoteMembers.add(member); } } + clusterConfig.addRemoteMembers(remoteMembers); - // Configure the cluster. - TcpClusterConfig config = new TcpClusterConfig(); + log.info("Starting cluster with Local:[{}], Remote:{}", localMember, remoteMembers); - config.setLocalMember(localMember); - config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{})); // Create the cluster. - TcpCluster cluster = new TcpCluster(config); + TcpCluster cluster = new TcpCluster(clusterConfig); StateMachine stateMachine = new DatabaseStateMachine(); - ControllerNode thisNode = clusterService.getLocalNode(); // FIXME resolve Chronicle + OSGi issue //Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id()); - Log consensusLog = new InMemoryLog(); + Log consensusLog = new KryoRegisteredInMemoryLog(); copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol); copycat.start(); @@ -102,6 +113,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { @Deactivate public void deactivate() { + clusterService.removeListener(clusterEventListener); copycat.stop(); log.info("Stopped."); } @@ -179,6 +191,46 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { } + private final class InternalClusterEventListener + implements ClusterEventListener { + + @Override + public void event(ClusterEvent event) { + // TODO: Not every node should be part of the consensus ring. + + final ControllerNode node = event.subject(); + final TcpMember tcpMember = new TcpMember(node.ip().toString(), + node.tcpPort()); + + log.trace("{}", event); + switch (event.type()) { + case INSTANCE_ACTIVATED: + case INSTANCE_ADDED: + log.info("{} was added to the cluster", tcpMember); + clusterConfig.addRemoteMember(tcpMember); + break; + case INSTANCE_DEACTIVATED: + case INSTANCE_REMOVED: + log.info("{} was removed from the cluster", tcpMember); + clusterConfig.removeRemoteMember(tcpMember); + break; + default: + break; + } + log.info("Current cluster: {}", clusterConfig.getMembers()); + } + + } + + public static final class KryoRegisteredInMemoryLog extends InMemoryLog { + public KryoRegisteredInMemoryLog() { + super(); + // required to deserialize object across bundles + super.kryo.register(TcpMember.class, new TcpMemberSerializer()); + super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer()); + } + } + private class DatabaseOperationResult implements OptionalResult { private final R result; diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpClusterConfigSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpClusterConfigSerializer.java new file mode 100644 index 0000000000..48887b93f1 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpClusterConfigSerializer.java @@ -0,0 +1,30 @@ +package org.onlab.onos.store.service.impl; + +import java.util.Collection; + +import net.kuujo.copycat.cluster.TcpClusterConfig; +import net.kuujo.copycat.cluster.TcpMember; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +public class TcpClusterConfigSerializer extends Serializer { + + @Override + public void write(Kryo kryo, Output output, TcpClusterConfig object) { + kryo.writeClassAndObject(output, object.getLocalMember()); + kryo.writeClassAndObject(output, object.getRemoteMembers()); + } + + @Override + public TcpClusterConfig read(Kryo kryo, Input input, + Class type) { + TcpMember localMember = (TcpMember) kryo.readClassAndObject(input); + @SuppressWarnings("unchecked") + Collection remoteMembers = (Collection) kryo.readClassAndObject(input); + return new TcpClusterConfig(localMember, remoteMembers); + } + +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpMemberSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpMemberSerializer.java new file mode 100644 index 0000000000..e729f9b29c --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpMemberSerializer.java @@ -0,0 +1,24 @@ +package org.onlab.onos.store.service.impl; + +import net.kuujo.copycat.cluster.TcpMember; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +public class TcpMemberSerializer extends Serializer { + + @Override + public void write(Kryo kryo, Output output, TcpMember object) { + output.writeString(object.host()); + output.writeInt(object.port()); + } + + @Override + public TcpMember read(Kryo kryo, Input input, Class type) { + String host = input.readString(); + int port = input.readInt(); + return new TcpMember(host, port); + } +} From caa14d6ff13080486f692bbd25314bc81b32d83d Mon Sep 17 00:00:00 2001 From: Thomas Vachuska Date: Wed, 5 Nov 2014 16:28:33 -0800 Subject: [PATCH 03/26] Changed LOCAL port number in the CLI to "local". Change-Id: Ic24442446a225bbc52613ce6c26b5ecaf7697193 --- .../org/onlab/onos/cli/net/DevicePortsListCommand.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cli/src/main/java/org/onlab/onos/cli/net/DevicePortsListCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/DevicePortsListCommand.java index 1d32aa57cd..29452f7676 100644 --- a/cli/src/main/java/org/onlab/onos/cli/net/DevicePortsListCommand.java +++ b/cli/src/main/java/org/onlab/onos/cli/net/DevicePortsListCommand.java @@ -25,6 +25,7 @@ import org.apache.karaf.shell.commands.Option; import org.onlab.onos.cli.Comparators; import org.onlab.onos.net.Device; import org.onlab.onos.net.Port; +import org.onlab.onos.net.PortNumber; import org.onlab.onos.net.device.DeviceService; import java.util.ArrayList; @@ -108,7 +109,7 @@ public class DevicePortsListCommand extends DevicesListCommand { for (Port port : service.getPorts(device.id())) { if (isIncluded(port)) { ports.add(mapper.createObjectNode() - .put("port", port.number().toString()) + .put("port", portName(port.number())) .put("isEnabled", port.isEnabled()) .put("type", port.type().toString().toLowerCase()) .put("portSpeed", port.portSpeed()) @@ -120,6 +121,10 @@ public class DevicePortsListCommand extends DevicesListCommand { return result; } + private String portName(PortNumber port) { + return port.equals(PortNumber.LOCAL) ? "local" : port.toString(); + } + // Determines if a port should be included in output. private boolean isIncluded(Port port) { return enabled && port.isEnabled() || disabled && !port.isEnabled() || @@ -133,7 +138,8 @@ public class DevicePortsListCommand extends DevicesListCommand { Collections.sort(ports, Comparators.PORT_COMPARATOR); for (Port port : ports) { if (isIncluded(port)) { - print(FMT, port.number(), port.isEnabled() ? "enabled" : "disabled", + print(FMT, portName(port.number()), + port.isEnabled() ? "enabled" : "disabled", port.type().toString().toLowerCase(), port.portSpeed(), annotations(port.annotations())); } From 661b9b2754933456d0b120c57d074a731b6346f4 Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Wed, 5 Nov 2014 16:36:30 -0800 Subject: [PATCH 04/26] fix log Change-Id: I35cab6921be907963355dfb7bcbeeb68d0da91f2 --- .../java/org/onlab/onos/store/service/impl/DatabaseManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java index 19ee882b60..b1e1949705 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java @@ -217,7 +217,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { default: break; } - log.info("Current cluster: {}", clusterConfig.getMembers()); + log.debug("Current cluster: {}", copycat.cluster()); } } From 39ae550bacf8057443cd8083c327c2e5d4b71486 Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Wed, 5 Nov 2014 16:42:12 -0800 Subject: [PATCH 05/26] log related fixes Change-Id: Ie83feb7b135c046319ef76e9204b07ecc25caf02 --- .../messaging/impl/ClusterCommunicationManager.java | 2 +- .../onos/store/service/impl/DatabaseStateMachine.java | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java index 38e1322f0e..849ad175dd 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java @@ -159,7 +159,7 @@ public class ClusterCommunicationManager return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message)); } catch (IOException e) { - log.error("Failed interaction with remote nodeId: " + toNodeId, e); + log.trace("Failed interaction with remote nodeId: " + toNodeId, e); throw e; } } diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java index ad6773ebee..2822f25251 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java @@ -1,5 +1,7 @@ package org.onlab.onos.store.service.impl; +import static org.slf4j.LoggerFactory.getLogger; + import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -16,6 +18,7 @@ import org.onlab.onos.store.service.VersionedValue; import org.onlab.onos.store.service.WriteRequest; import org.onlab.onos.store.service.WriteResult; import org.onlab.util.KryoNamespace; +import org.slf4j.Logger; import com.google.common.collect.Maps; @@ -28,6 +31,8 @@ import com.google.common.collect.Maps; */ public class DatabaseStateMachine implements StateMachine { + private final Logger log = getLogger(getClass()); + public static final KryoSerializer SERIALIZER = new KryoSerializer() { @Override protected void setupKryoPool() { @@ -161,7 +166,7 @@ public class DatabaseStateMachine implements StateMachine { try { return SERIALIZER.encode(state); } catch (Exception e) { - e.printStackTrace(); + log.error("Snapshot serialization error", e); return null; } } @@ -171,7 +176,7 @@ public class DatabaseStateMachine implements StateMachine { try { this.state = SERIALIZER.decode(data); } catch (Exception e) { - e.printStackTrace(); + log.error("Snapshot deserialization error", e); } } } From 460f40251c835a9528e979ed53f9ec7870c86b77 Mon Sep 17 00:00:00 2001 From: Ray Milkey Date: Wed, 5 Nov 2014 15:41:43 -0800 Subject: [PATCH 06/26] ONOS-22 - Add Constraints to CLI Commands Added optional --lambda and --bandwidth operations to command line for host to host, point to point, and multi point to single point intents Fixed intent compilers to add constraints to the PathIntents they create Change-Id: I25510d401118feba493f51ecddc72d770d8ae3e3 --- .../cli/net/AddHostToHostIntentCommand.java | 10 +++-- ...dMultiPointToSinglePointIntentCommand.java | 6 ++- .../cli/net/AddPointToPointIntentCommand.java | 7 ++- .../cli/net/ConnectivityIntentCommand.java | 45 +++++++++++++++++-- .../onos/net/intent/HostToHostIntent.java | 1 + .../intent/MultiPointToSinglePointIntent.java | 34 ++++++++++++++ .../org/onlab/onos/net/intent/PathIntent.java | 21 +++++++++ .../impl/ConnectivityIntentCompiler.java | 4 +- .../intent/impl/HostToHostIntentCompiler.java | 3 +- .../impl/PointToPointIntentCompiler.java | 3 +- 10 files changed, 122 insertions(+), 12 deletions(-) diff --git a/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java index 7d3bfc29c2..13f3000ae2 100644 --- a/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java +++ b/cli/src/main/java/org/onlab/onos/cli/net/AddHostToHostIntentCommand.java @@ -15,14 +15,16 @@ */ package org.onlab.onos.cli.net; +import java.util.List; + import org.apache.karaf.shell.commands.Argument; import org.apache.karaf.shell.commands.Command; -import org.onlab.onos.cli.AbstractShellCommand; import org.onlab.onos.net.HostId; import org.onlab.onos.net.flow.DefaultTrafficSelector; import org.onlab.onos.net.flow.DefaultTrafficTreatment; import org.onlab.onos.net.flow.TrafficSelector; import org.onlab.onos.net.flow.TrafficTreatment; +import org.onlab.onos.net.intent.Constraint; import org.onlab.onos.net.intent.HostToHostIntent; import org.onlab.onos.net.intent.IntentService; @@ -31,7 +33,7 @@ import org.onlab.onos.net.intent.IntentService; */ @Command(scope = "onos", name = "add-host-intent", description = "Installs host-to-host connectivity intent") -public class AddHostToHostIntentCommand extends AbstractShellCommand { +public class AddHostToHostIntentCommand extends ConnectivityIntentCommand { @Argument(index = 0, name = "one", description = "One host ID", required = true, multiValued = false) @@ -50,9 +52,11 @@ public class AddHostToHostIntentCommand extends AbstractShellCommand { TrafficSelector selector = DefaultTrafficSelector.builder().build(); TrafficTreatment treatment = DefaultTrafficTreatment.builder().build(); + List constraints = buildConstraints(); HostToHostIntent intent = new HostToHostIntent(appId(), oneId, twoId, - selector, treatment); + selector, treatment, + constraints); service.submit(intent); } diff --git a/cli/src/main/java/org/onlab/onos/cli/net/AddMultiPointToSinglePointIntentCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/AddMultiPointToSinglePointIntentCommand.java index 9e8e2fc474..1cd695ed22 100644 --- a/cli/src/main/java/org/onlab/onos/cli/net/AddMultiPointToSinglePointIntentCommand.java +++ b/cli/src/main/java/org/onlab/onos/cli/net/AddMultiPointToSinglePointIntentCommand.java @@ -23,11 +23,13 @@ import org.onlab.onos.net.PortNumber; import org.onlab.onos.net.flow.DefaultTrafficTreatment; import org.onlab.onos.net.flow.TrafficSelector; import org.onlab.onos.net.flow.TrafficTreatment; +import org.onlab.onos.net.intent.Constraint; import org.onlab.onos.net.intent.Intent; import org.onlab.onos.net.intent.IntentService; import org.onlab.onos.net.intent.MultiPointToSinglePointIntent; import java.util.HashSet; +import java.util.List; import java.util.Set; import static org.onlab.onos.net.DeviceId.deviceId; @@ -69,9 +71,11 @@ public class AddMultiPointToSinglePointIntentCommand extends ConnectivityIntentC TrafficSelector selector = buildTrafficSelector(); TrafficTreatment treatment = DefaultTrafficTreatment.builder().build(); + List constraints = buildConstraints(); Intent intent = new MultiPointToSinglePointIntent(appId(), selector, treatment, - ingressPoints, egress); + ingressPoints, egress, + constraints); service.submit(intent); } diff --git a/cli/src/main/java/org/onlab/onos/cli/net/AddPointToPointIntentCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/AddPointToPointIntentCommand.java index ed98c7e84f..26bb1c0ced 100644 --- a/cli/src/main/java/org/onlab/onos/cli/net/AddPointToPointIntentCommand.java +++ b/cli/src/main/java/org/onlab/onos/cli/net/AddPointToPointIntentCommand.java @@ -15,6 +15,8 @@ */ package org.onlab.onos.cli.net; +import java.util.List; + import org.apache.karaf.shell.commands.Argument; import org.apache.karaf.shell.commands.Command; import org.onlab.onos.net.ConnectPoint; @@ -22,6 +24,7 @@ import org.onlab.onos.net.DeviceId; import org.onlab.onos.net.PortNumber; import org.onlab.onos.net.flow.TrafficSelector; import org.onlab.onos.net.flow.TrafficTreatment; +import org.onlab.onos.net.intent.Constraint; import org.onlab.onos.net.intent.Intent; import org.onlab.onos.net.intent.IntentService; import org.onlab.onos.net.intent.PointToPointIntent; @@ -63,8 +66,10 @@ public class AddPointToPointIntentCommand extends ConnectivityIntentCommand { TrafficSelector selector = buildTrafficSelector(); TrafficTreatment treatment = builder().build(); + List constraints = buildConstraints(); + Intent intent = new PointToPointIntent(appId(), selector, treatment, - ingress, egress); + ingress, egress, constraints); service.submit(intent); } diff --git a/cli/src/main/java/org/onlab/onos/cli/net/ConnectivityIntentCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/ConnectivityIntentCommand.java index d8ec3c7573..e4fc5aa602 100644 --- a/cli/src/main/java/org/onlab/onos/cli/net/ConnectivityIntentCommand.java +++ b/cli/src/main/java/org/onlab/onos/cli/net/ConnectivityIntentCommand.java @@ -15,14 +15,21 @@ */ package org.onlab.onos.cli.net; +import java.util.LinkedList; +import java.util.List; + import org.apache.karaf.shell.commands.Option; import org.onlab.onos.cli.AbstractShellCommand; import org.onlab.onos.net.flow.DefaultTrafficSelector; import org.onlab.onos.net.flow.TrafficSelector; +import org.onlab.onos.net.intent.Constraint; +import org.onlab.onos.net.intent.constraint.BandwidthConstraint; +import org.onlab.onos.net.intent.constraint.LambdaConstraint; +import org.onlab.onos.net.resource.Bandwidth; import org.onlab.packet.Ethernet; import org.onlab.packet.MacAddress; -import com.google.common.base.Strings; +import static com.google.common.base.Strings.isNullOrEmpty; /** * Base class for command line operations for connectivity based intents. @@ -41,6 +48,14 @@ public abstract class ConnectivityIntentCommand extends AbstractShellCommand { required = false, multiValued = false) private String ethTypeString = ""; + @Option(name = "-b", aliases = "--bandwidth", description = "Bandwidth", + required = false, multiValued = false) + private String bandwidthString = ""; + + @Option(name = "-l", aliases = "--lambda", description = "Lambda", + required = false, multiValued = false) + private boolean lambda = false; + /** * Constructs a traffic selector based on the command line arguments * presented to the command. @@ -50,21 +65,43 @@ public abstract class ConnectivityIntentCommand extends AbstractShellCommand { TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder(); Short ethType = Ethernet.TYPE_IPV4; - if (!Strings.isNullOrEmpty(ethTypeString)) { + if (!isNullOrEmpty(ethTypeString)) { EthType ethTypeParameter = EthType.valueOf(ethTypeString); ethType = ethTypeParameter.value(); } selectorBuilder.matchEthType(ethType); - if (!Strings.isNullOrEmpty(srcMacString)) { + if (!isNullOrEmpty(srcMacString)) { selectorBuilder.matchEthSrc(MacAddress.valueOf(srcMacString)); } - if (!Strings.isNullOrEmpty(dstMacString)) { + if (!isNullOrEmpty(dstMacString)) { selectorBuilder.matchEthDst(MacAddress.valueOf(dstMacString)); } return selectorBuilder.build(); } + /** + * Builds the constraint list for this command based on the command line + * parameters. + * + * @return List of constraint objects describing the constraints requested + */ + protected List buildConstraints() { + final List constraints = new LinkedList<>(); + + // Check for a bandwidth specification + if (!isNullOrEmpty(bandwidthString)) { + final double bandwidthValue = Double.parseDouble(bandwidthString); + constraints.add(new BandwidthConstraint(Bandwidth.valueOf(bandwidthValue))); + } + + // Check for a lambda specification + if (lambda) { + constraints.add(new LambdaConstraint(null)); + } + + return constraints; + } } diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/HostToHostIntent.java b/core/api/src/main/java/org/onlab/onos/net/intent/HostToHostIntent.java index 3fad93ddf9..893270a850 100644 --- a/core/api/src/main/java/org/onlab/onos/net/intent/HostToHostIntent.java +++ b/core/api/src/main/java/org/onlab/onos/net/intent/HostToHostIntent.java @@ -105,6 +105,7 @@ public final class HostToHostIntent extends ConnectivityIntent { .add("appId", appId()) .add("selector", selector()) .add("treatment", treatment()) + .add("constraints", constraints()) .add("one", one) .add("two", two) .toString(); diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntent.java b/core/api/src/main/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntent.java index 0c47aaddb2..90907fbccd 100644 --- a/core/api/src/main/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntent.java +++ b/core/api/src/main/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntent.java @@ -22,6 +22,7 @@ import org.onlab.onos.net.ConnectPoint; import org.onlab.onos.net.flow.TrafficSelector; import org.onlab.onos.net.flow.TrafficTreatment; +import java.util.List; import java.util.Set; import static com.google.common.base.Preconditions.checkArgument; @@ -64,6 +65,38 @@ public final class MultiPointToSinglePointIntent extends ConnectivityIntent { this.egressPoint = checkNotNull(egressPoint); } + /** + * Creates a new multi-to-single point connectivity intent for the specified + * traffic selector and treatment. + * + * @param appId application identifier + * @param selector traffic selector + * @param treatment treatment + * @param ingressPoints set of ports from which ingress traffic originates + * @param egressPoint port to which traffic will egress + * @param constraints constraints to apply to the intent + * @throws NullPointerException if {@code ingressPoints} or + * {@code egressPoint} is null. + * @throws IllegalArgumentException if the size of {@code ingressPoints} is + * not more than 1 + */ + public MultiPointToSinglePointIntent(ApplicationId appId, + TrafficSelector selector, + TrafficTreatment treatment, + Set ingressPoints, + ConnectPoint egressPoint, + List constraints) { + super(id(MultiPointToSinglePointIntent.class, selector, treatment, + ingressPoints, egressPoint), appId, null, selector, treatment, + constraints); + + checkNotNull(ingressPoints); + checkArgument(!ingressPoints.isEmpty(), "Ingress point set cannot be empty"); + + this.ingressPoints = Sets.newHashSet(ingressPoints); + this.egressPoint = checkNotNull(egressPoint); + } + /** * Constructor for serializer. */ @@ -101,6 +134,7 @@ public final class MultiPointToSinglePointIntent extends ConnectivityIntent { .add("treatment", treatment()) .add("ingress", ingressPoints()) .add("egress", egressPoint()) + .add("constraints", constraints()) .toString(); } } diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/PathIntent.java b/core/api/src/main/java/org/onlab/onos/net/intent/PathIntent.java index 9189bae0c5..9f8816d706 100644 --- a/core/api/src/main/java/org/onlab/onos/net/intent/PathIntent.java +++ b/core/api/src/main/java/org/onlab/onos/net/intent/PathIntent.java @@ -15,6 +15,8 @@ */ package org.onlab.onos.net.intent; +import java.util.List; + import com.google.common.base.MoreObjects; import org.onlab.onos.core.ApplicationId; import org.onlab.onos.net.Path; @@ -45,6 +47,24 @@ public class PathIntent extends ConnectivityIntent { this.path = path; } + /** + * Creates a new point-to-point intent with the supplied ingress/egress + * ports and using the specified explicit path. + * + * @param appId application identifier + * @param selector traffic selector + * @param treatment treatment + * @param path traversed links + * @param constraints optional list of constraints + * @throws NullPointerException {@code path} is null + */ + public PathIntent(ApplicationId appId, TrafficSelector selector, + TrafficTreatment treatment, Path path, List constraints) { + super(id(PathIntent.class, selector, treatment, path, constraints), appId, + resources(path.links()), selector, treatment, constraints); + this.path = path; + } + /** * Constructor for serializer. */ @@ -75,6 +95,7 @@ public class PathIntent extends ConnectivityIntent { .add("appId", appId()) .add("selector", selector()) .add("treatment", treatment()) + .add("constraints", constraints()) .add("path", path) .toString(); } diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/ConnectivityIntentCompiler.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ConnectivityIntentCompiler.java index 4cf1830327..8a319007ff 100644 --- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/ConnectivityIntentCompiler.java +++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ConnectivityIntentCompiler.java @@ -118,13 +118,14 @@ public abstract class ConnectivityIntentCompiler @Override public double weight(TopologyEdge edge) { - if (constraints == null) { + if (constraints == null || !constraints.iterator().hasNext()) { return 1.0; } // iterate over all constraints in order and return the weight of // the first one with fast fail over the first failure Iterator it = constraints.iterator(); + double cost = it.next().cost(edge.link(), resourceService); while (it.hasNext() && cost > 0) { if (it.next().cost(edge.link(), resourceService) < 0) { @@ -132,6 +133,7 @@ public abstract class ConnectivityIntentCompiler } } return cost; + } } diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/HostToHostIntentCompiler.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/HostToHostIntentCompiler.java index 605d3c7aef..4d989ac251 100644 --- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/HostToHostIntentCompiler.java +++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/HostToHostIntentCompiler.java @@ -70,7 +70,8 @@ public class HostToHostIntentCompiler HostToHostIntent intent) { TrafficSelector selector = builder(intent.selector()) .matchEthSrc(src.mac()).matchEthDst(dst.mac()).build(); - return new PathIntent(intent.appId(), selector, intent.treatment(), path); + return new PathIntent(intent.appId(), selector, intent.treatment(), + path, intent.constraints()); } } diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PointToPointIntentCompiler.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PointToPointIntentCompiler.java index c32c8ee009..5c8eb94d35 100644 --- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PointToPointIntentCompiler.java +++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PointToPointIntentCompiler.java @@ -77,7 +77,8 @@ public class PointToPointIntentCompiler private Intent createPathIntent(Path path, PointToPointIntent intent) { return new PathIntent(intent.appId(), - intent.selector(), intent.treatment(), path); + intent.selector(), intent.treatment(), path, + intent.constraints()); } } From c7ee066102824f6f997131472a03d30ec287b45a Mon Sep 17 00:00:00 2001 From: Simon Hunt Date: Wed, 5 Nov 2014 16:44:37 -0800 Subject: [PATCH 07/26] GUI -- Major Work-In-Progress - Added dataLoadError to view token. - Restructured code, viz: (1) svg and force layout initialized in preload callback (2) load callback initializes topo rendering (3) subsequent data loads modify topo rendering --- web/gui/src/main/webapp/onos2.js | 13 +- web/gui/src/main/webapp/topo2.css | 3 + web/gui/src/main/webapp/topo2.js | 193 +++++++++++++++++++++++++++--- 3 files changed, 192 insertions(+), 17 deletions(-) diff --git a/web/gui/src/main/webapp/onos2.js b/web/gui/src/main/webapp/onos2.js index 427a23ff89..375fe6b00a 100644 --- a/web/gui/src/main/webapp/onos2.js +++ b/web/gui/src/main/webapp/onos2.js @@ -407,7 +407,8 @@ height: this.height, uid: this.uid, setRadio: this.setRadio, - setKeys: this.setKeys + setKeys: this.setKeys, + dataLoadError: this.dataLoadError } }, @@ -498,6 +499,16 @@ uid: function (id) { return makeUid(this, id); + }, + + // TODO : implement custom dialogs (don't use alerts) + + dataLoadError: function (err, url) { + var msg = 'Data Load Error\n\n' + + err.status + ' -- ' + err.statusText + '\n\n' + + 'relative-url: "' + url + '"\n\n' + + 'complete-url: "' + err.responseURL + '"'; + alert(msg); } // TODO: consider schedule, clearTimer, etc. diff --git a/web/gui/src/main/webapp/topo2.css b/web/gui/src/main/webapp/topo2.css index 88fcd94194..eee92441af 100644 --- a/web/gui/src/main/webapp/topo2.css +++ b/web/gui/src/main/webapp/topo2.css @@ -24,3 +24,6 @@ svg #topo-bg { opacity: 0.5; } +svg .node { + fill: #03c; +} \ No newline at end of file diff --git a/web/gui/src/main/webapp/topo2.js b/web/gui/src/main/webapp/topo2.js index 7ac9adc650..f5e1792ec5 100644 --- a/web/gui/src/main/webapp/topo2.js +++ b/web/gui/src/main/webapp/topo2.js @@ -56,12 +56,24 @@ opt: 'img/opt.png' }, force: { - marginLR: 20, - marginTB: 20, + note: 'node.class or link.class is used to differentiate', + linkDistance: { + infra: 200, + host: 40 + }, + linkStrength: { + infra: 1.0, + host: 1.0 + }, + charge: { + device: -400, + host: -100 + }, + pad: 20, translate: function() { return 'translate(' + - config.force.marginLR + ',' + - config.force.marginTB + ')'; + config.force.pad + ',' + + config.force.pad + ')'; } } }; @@ -94,7 +106,11 @@ // D3 selections var svg, bgImg, - topoG; + topoG, + nodeG, + linkG, + node, + link; // ============================== // For Debugging / Development @@ -175,22 +191,145 @@ // ============================== // Private functions - // set the size of the given element to that of the view - function setSize(el, view) { + // set the size of the given element to that of the view (reduced if padded) + function setSize(el, view, pad) { + var padding = pad ? pad * 2 : 0; el.attr({ - width: view.width(), - height: view.height() + width: view.width() - padding, + height: view.height() - padding }); } - function getNetworkData(view) { var url = getTopoUrl(); - // TODO ... - + console.log('Fetching JSON: ' + url); + d3.json(url, function(err, data) { + if (err) { + view.dataLoadError(err, url); + } else { + network.data = data; + drawNetwork(view); + } + }); } + function drawNetwork(view) { + preprocessData(view); + updateLayout(view); + } + + function preprocessData(view) { + var w = view.width(), + h = view.height(), + hDevice = h * 0.6, + hHost = h * 0.3, + data = network.data, + deviceLayout = computeInitLayout(w, hDevice, data.devices.length), + hostLayout = computeInitLayout(w, hHost, data.hosts.length); + + network.lookup = {}; + network.nodes = []; + network.links = []; + // we created new arrays, so need to set the refs in the force layout + network.force.nodes(network.nodes); + network.force.links(network.links); + + // let's just start with the nodes + + // note that both 'devices' and 'hosts' get mapped into the nodes array + function makeNode(d, cls, layout) { + var node = { + id: d.id, + labels: d.labels, + class: cls, + icon: cls, + type: d.type, + x: layout.x(), + y: layout.y() + }; + network.lookup[d.id] = node; + network.nodes.push(node); + } + + // first the devices... + network.data.devices.forEach(function (d) { + makeNode(d, 'device', deviceLayout); + }); + + // then the hosts... + network.data.hosts.forEach(function (d) { + makeNode(d, 'host', hostLayout); + }); + + // TODO: process links + } + + function computeInitLayout(w, h, n) { + var maxdw = 60, + compdw, dw, ox, layout; + + if (n < 2) { + layout = { ox: w/2, dw: 0 } + } else { + compdw = (0.8 * w) / (n - 1); + dw = Math.min(maxdw, compdw); + ox = w/2 - ((n - 1)/2 * dw); + layout = { ox: ox, dw: dw } + } + + layout.i = 0; + + layout.x = function () { + var x = layout.ox + layout.i*layout.dw; + layout.i++; + return x; + }; + + layout.y = function () { + return h; + }; + + return layout; + } + + function linkId(d) { + return d.source.id + '~' + d.target.id; + } + + function nodeId(d) { + return d.id; + } + + function updateLayout(view) { + link = link.data(network.force.links(), linkId); + link.enter().append('line') + .attr('class', 'link'); + link.exit().remove(); + + node = node.data(network.force.nodes(), nodeId); + node.enter().append('circle') + .attr('id', function (d) { return 'nodeId-' + d.id; }) + .attr('class', function (d) { return 'node'; }) + .attr('r', 12); + + network.force.start(); + } + + + function tick() { + node.attr({ + cx: function(d) { return d.x; }, + cy: function(d) { return d.y; } + }); + + link.attr({ + x1: function (d) { return d.source.x; }, + y1: function (d) { return d.source.y; }, + x2: function (d) { return d.target.x; }, + y2: function (d) { return d.target.y; } + }); + } // ============================== // View life-cycle callbacks @@ -199,15 +338,15 @@ var w = view.width(), h = view.height(), idBg = view.uid('bg'), - showBg = config.options.showBackground ? 'visible' : 'hidden'; + showBg = config.options.showBackground ? 'visible' : 'hidden', + fcfg = config.force, + fpad = fcfg.pad, + forceDim = [w - 2*fpad, h - 2*fpad]; // NOTE: view.$div is a D3 selection of the view's div svg = view.$div.append('svg'); setSize(svg, view); - topoG = svg.append('g') - .attr('transform', config.force.translate()); - // load the background image bgImg = svg.append('svg:image') .attr({ @@ -219,6 +358,28 @@ .style({ visibility: showBg }); + + // group for the topology + topoG = svg.append('g') + .attr('transform', fcfg.translate()); + + // subgroups for links and nodes + linkG = topoG.append('g').attr('id', 'links'); + nodeG = topoG.append('g').attr('id', 'nodes'); + + // selection of nodes and links + link = linkG.selectAll('.link'); + node = nodeG.selectAll('.node'); + + // set up the force layout + network.force = d3.layout.force() + .size(forceDim) + .nodes(network.nodes) + .links(network.links) + .charge(function (d) { return fcfg.charge[d.class]; }) + .linkDistance(function (d) { return fcfg.linkDistance[d.class]; }) + .linkStrength(function (d) { return fcfg.linkStrength[d.class]; }) + .on('tick', tick); } From 1aa249ca74b8c738ba493ecb6c5035590bb7eb05 Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Wed, 5 Nov 2014 17:40:28 -0800 Subject: [PATCH 08/26] separate basic classes from API Change-Id: I110ebc7da395f60d8c95d8bb09c00bcc2a4f0547 --- .../store/serializers/KryoNamespaces.java | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java index e0348ff2ca..4cba9f0254 100644 --- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java +++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java @@ -103,6 +103,20 @@ import com.google.common.collect.ImmutableSet; public final class KryoNamespaces { + public static final KryoNamespace BASIC = KryoNamespace.newBuilder() + .register(ImmutableMap.class, new ImmutableMapSerializer()) + .register(ImmutableList.class, new ImmutableListSerializer()) + .register(ImmutableSet.class, new ImmutableSetSerializer()) + .register( + ArrayList.class, + Arrays.asList().getClass(), + HashMap.class, + HashSet.class, + LinkedList.class, + byte[].class + ) + .build(); + /** * KryoNamespace which can serialize ON.lab misc classes. */ @@ -123,19 +137,8 @@ public final class KryoNamespaces { */ public static final KryoNamespace API = KryoNamespace.newBuilder() .register(MISC) - .register(ImmutableMap.class, new ImmutableMapSerializer()) - .register(ImmutableList.class, new ImmutableListSerializer()) - .register(ImmutableSet.class, new ImmutableSetSerializer()) + .register(BASIC) .register( - // - ArrayList.class, - Arrays.asList().getClass(), - HashMap.class, - HashSet.class, - LinkedList.class, - byte[].class, - // - // ControllerNode.State.class, Device.Type.class, Port.Type.class, From 79a1e5e121262e85808621f51ef7d1334987aba4 Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Wed, 5 Nov 2014 17:42:01 -0800 Subject: [PATCH 09/26] DatabaseManager: try to wait for others on start up Change-Id: I90acfa10be7430509a459b456658dc8838d4e44b --- .../impl/ClusterMessagingProtocol.java | 4 +- .../store/service/impl/DatabaseManager.java | 50 ++++++++++++++++--- 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java index c561221cc2..56dba79127 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java @@ -149,12 +149,12 @@ public class ClusterMessagingProtocol @Activate public void activate() { - log.info("Started."); + log.info("Started"); } @Deactivate public void deactivate() { - log.info("Stopped."); + log.info("Stopped"); } @Override diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java index b1e1949705..2779b352c4 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java @@ -5,6 +5,8 @@ import static org.slf4j.LoggerFactory.getLogger; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import net.kuujo.copycat.Copycat; import net.kuujo.copycat.StateMachine; @@ -60,9 +62,11 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { private Copycat copycat; private DatabaseClient client; - // TODO: check if synchronization is required to read/modify this + // guarded by synchronized block private ClusterConfig clusterConfig; + private CountDownLatch clusterEventLatch; + private ClusterEventListener clusterEventListener; @Activate @@ -81,22 +85,45 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { List remoteMembers = new ArrayList<>(clusterService.getNodes().size()); + clusterEventLatch = new CountDownLatch(1); clusterEventListener = new InternalClusterEventListener(); clusterService.addListener(clusterEventListener); + // note: from this point beyond, clusterConfig requires synchronization + for (ControllerNode node : clusterService.getNodes()) { TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort()); if (!member.equals(localMember)) { remoteMembers.add(member); } } - clusterConfig.addRemoteMembers(remoteMembers); - log.info("Starting cluster with Local:[{}], Remote:{}", localMember, remoteMembers); + if (remoteMembers.isEmpty()) { + log.info("This node is the only node in the cluster. " + + "Waiting for others to show up."); + // FIXME: hack trying to relax cases forming multiple consensus rings. + // add seed node configuration to avoid this + // If the node is alone on it's own, wait some time + // hoping other will come up soon + try { + if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) { + log.info("Starting as single node cluster"); + } + } catch (InterruptedException e) { + log.info("Interrupted waiting for others", e); + } + } + + final TcpCluster cluster; + synchronized (clusterConfig) { + clusterConfig.addRemoteMembers(remoteMembers); + + // Create the cluster. + cluster = new TcpCluster(clusterConfig); + } + log.info("Starting cluster: {}", cluster); - // Create the cluster. - TcpCluster cluster = new TcpCluster(clusterConfig); StateMachine stateMachine = new DatabaseStateMachine(); // FIXME resolve Chronicle + OSGi issue @@ -207,17 +234,24 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { case INSTANCE_ACTIVATED: case INSTANCE_ADDED: log.info("{} was added to the cluster", tcpMember); - clusterConfig.addRemoteMember(tcpMember); + synchronized (clusterConfig) { + clusterConfig.addRemoteMember(tcpMember); + } break; case INSTANCE_DEACTIVATED: case INSTANCE_REMOVED: log.info("{} was removed from the cluster", tcpMember); - clusterConfig.removeRemoteMember(tcpMember); + synchronized (clusterConfig) { + clusterConfig.removeRemoteMember(tcpMember); + } break; default: break; } - log.debug("Current cluster: {}", copycat.cluster()); + if (copycat != null) { + log.debug("Current cluster: {}", copycat.cluster()); + } + clusterEventLatch.countDown(); } } From 0e51fe55c4d4057642ab3deeb42a3be5631a777c Mon Sep 17 00:00:00 2001 From: Sho SHIMIZU Date: Wed, 5 Nov 2014 12:04:23 -0800 Subject: [PATCH 10/26] Add constraint for waypoints Change-Id: I8156920d4d3d85aee060b9d0fe925f43f9b15ff1 --- core/api/pom.xml | 4 + .../intent/constraint/WaypointConstraint.java | 105 ++++++++++++++++++ .../constraint/WaypointConstraintTest.java | 87 +++++++++++++++ 3 files changed, 196 insertions(+) create mode 100644 core/api/src/main/java/org/onlab/onos/net/intent/constraint/WaypointConstraint.java create mode 100644 core/api/src/test/java/org/onlab/onos/net/intent/constraint/WaypointConstraintTest.java diff --git a/core/api/pom.xml b/core/api/pom.xml index 100e63f8fc..ff94f1a6a6 100644 --- a/core/api/pom.xml +++ b/core/api/pom.xml @@ -40,6 +40,10 @@ joda-time joda-time + + org.easymock + easymock + diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/constraint/WaypointConstraint.java b/core/api/src/main/java/org/onlab/onos/net/intent/constraint/WaypointConstraint.java new file mode 100644 index 0000000000..fffb66c3ef --- /dev/null +++ b/core/api/src/main/java/org/onlab/onos/net/intent/constraint/WaypointConstraint.java @@ -0,0 +1,105 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.onos.net.intent.constraint; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; +import org.onlab.onos.net.ElementId; +import org.onlab.onos.net.Link; +import org.onlab.onos.net.Path; +import org.onlab.onos.net.intent.Constraint; +import org.onlab.onos.net.resource.LinkResourceService; + +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Constraint that evaluates elements passed through in order. + */ +public class WaypointConstraint implements Constraint { + + private final List waypoints; + + /** + * Creates a new waypoint constraint. + * + * @param waypoints waypoints + */ + public WaypointConstraint(ElementId... waypoints) { + checkNotNull(waypoints, "waypoints cannot be null"); + checkArgument(waypoints.length > 0, "length of waypoints should be more than 0"); + this.waypoints = ImmutableList.copyOf(waypoints); + } + + @Override + public double cost(Link link, LinkResourceService resourceService) { + // Always consider the number of hops + return 1; + } + + @Override + public boolean validate(Path path, LinkResourceService resourceService) { + LinkedList waypoints = new LinkedList<>(this.waypoints); + ElementId current = waypoints.poll(); + // This is safe because Path class ensures the number of links are more than 0 + Link firstLink = path.links().get(0); + if (firstLink.src().elementId().equals(current)) { + current = waypoints.poll(); + } + + for (Link link : path.links()) { + if (link.dst().elementId().equals(current)) { + current = waypoints.poll(); + // Empty waypoints means passing through all waypoints in the specified order + if (current == null) { + return true; + } + } + } + + return false; + } + + @Override + public int hashCode() { + return Objects.hash(waypoints); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (!(obj instanceof WaypointConstraint)) { + return false; + } + + final WaypointConstraint that = (WaypointConstraint) obj; + return Objects.equals(this.waypoints, that.waypoints); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("waypoints", waypoints) + .toString(); + } +} diff --git a/core/api/src/test/java/org/onlab/onos/net/intent/constraint/WaypointConstraintTest.java b/core/api/src/test/java/org/onlab/onos/net/intent/constraint/WaypointConstraintTest.java new file mode 100644 index 0000000000..7b8089141a --- /dev/null +++ b/core/api/src/test/java/org/onlab/onos/net/intent/constraint/WaypointConstraintTest.java @@ -0,0 +1,87 @@ +/* + * Copyright 2014 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.onos.net.intent.constraint; + +import org.junit.Before; +import org.junit.Test; +import org.onlab.onos.net.DefaultLink; +import org.onlab.onos.net.DefaultPath; +import org.onlab.onos.net.DeviceId; +import org.onlab.onos.net.Path; +import org.onlab.onos.net.PortNumber; +import org.onlab.onos.net.provider.ProviderId; +import org.onlab.onos.net.resource.LinkResourceService; + +import java.util.Arrays; + +import static org.easymock.EasyMock.createMock; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.onlab.onos.net.DefaultLinkTest.cp; +import static org.onlab.onos.net.DeviceId.deviceId; +import static org.onlab.onos.net.Link.Type.DIRECT; + +/** + * Test for constraint of intermediate elements. + */ +public class WaypointConstraintTest { + + public static final DeviceId DID1 = deviceId("of:1"); + public static final DeviceId DID2 = deviceId("of:2"); + public static final DeviceId DID3 = deviceId("of:3"); + public static final DeviceId DID4 = deviceId("of:4"); + public static final PortNumber PN1 = PortNumber.portNumber(1); + public static final PortNumber PN2 = PortNumber.portNumber(2); + public static final PortNumber PN3 = PortNumber.portNumber(3); + public static final PortNumber PN4 = PortNumber.portNumber(4); + public static final ProviderId PROVIDER_ID = new ProviderId("of", "foo"); + + private WaypointConstraint sut; + private LinkResourceService linkResourceService; + + private Path path; + private DefaultLink link2; + private DefaultLink link1; + + @Before + public void setUp() { + linkResourceService = createMock(LinkResourceService.class); + + link1 = new DefaultLink(PROVIDER_ID, cp(DID1, PN1), cp(DID2, PN2), DIRECT); + link2 = new DefaultLink(PROVIDER_ID, cp(DID2, PN3), cp(DID3, PN4), DIRECT); + path = new DefaultPath(PROVIDER_ID, Arrays.asList(link1, link2), 10); + } + + /** + * Tests that all of the specified waypoints are included in the specified path in order. + */ + @Test + public void testSatisfyWaypoints() { + sut = new WaypointConstraint(DID1, DID2, DID3); + + assertThat(sut.validate(path, linkResourceService), is(true)); + } + + /** + * Tests that the specified path does not includes the specified waypoint. + */ + @Test + public void testNotSatisfyWaypoint() { + sut = new WaypointConstraint(DID4); + + assertThat(sut.validate(path, linkResourceService), is(false)); + } +} From 4248bee8a11992654830e5f967c71981406b2501 Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Wed, 5 Nov 2014 18:12:46 -0800 Subject: [PATCH 11/26] add more info to copycat timeout log Change-Id: I713caf37b3ca99c3b7319be35cb8eb03fb3c27d4 --- .../store/service/impl/ClusterMessagingProtocolClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java index 0c2aacdaaa..582e00cc29 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java @@ -132,8 +132,8 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) { if (message.subject().equals(ClusterMessagingProtocol.COPYCAT_SYNC) || message.subject().equals(ClusterMessagingProtocol.COPYCAT_PING)) { - log.warn("Request to {} failed. Will retry " - + "in {} ms", remoteNode, RETRY_INTERVAL_MILLIS); + log.warn("{} Request to {} failed. Will retry in {} ms", + message.subject(), remoteNode, RETRY_INTERVAL_MILLIS); THREAD_POOL.schedule( this, RETRY_INTERVAL_MILLIS, From cea3ba18dc9eb17aa4d6a0dcc9ffeb2eb6439916 Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Wed, 5 Nov 2014 18:18:08 -0800 Subject: [PATCH 12/26] add Factory method to ReadRequest Change-Id: I713b25ff6165b072e647be0dfa63eab97dc5ca85 --- .../onlab/onos/store/service/ReadRequest.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java index a22464ad4f..8bcf8b4170 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java @@ -1,5 +1,7 @@ package org.onlab.onos.store.service; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.common.base.MoreObjects; /** @@ -10,9 +12,22 @@ public class ReadRequest { private final String tableName; private final String key; + + /** + * Creates a read request, + * which will retrieve the specified key from the table. + * + * @param tableName name of the table + * @param key key in the table + * @return ReadRequest + */ + public static ReadRequest get(String tableName, String key) { + return new ReadRequest(tableName, key); + } + public ReadRequest(String tableName, String key) { - this.tableName = tableName; - this.key = key; + this.tableName = checkNotNull(tableName); + this.key = checkNotNull(key); } /** From 4ee9ddbaa18045a8d34a426d207996107f7f2a64 Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Wed, 5 Nov 2014 18:28:57 -0800 Subject: [PATCH 13/26] add hashCode to ReadRequest Change-Id: I2e74047ee65bd2122214eeb582efa70a28b1a1f5 --- .../onlab/onos/store/service/ReadRequest.java | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java index 8bcf8b4170..9f7af4a5ee 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java @@ -2,6 +2,8 @@ package org.onlab.onos.store.service; import static com.google.common.base.Preconditions.checkNotNull; +import java.util.Objects; + import com.google.common.base.MoreObjects; /** @@ -12,7 +14,6 @@ public class ReadRequest { private final String tableName; private final String key; - /** * Creates a read request, * which will retrieve the specified key from the table. @@ -53,4 +54,26 @@ public class ReadRequest { .add("key", key) .toString(); } + + @Override + public int hashCode() { + return Objects.hash(key, tableName); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + ReadRequest other = (ReadRequest) obj; + return Objects.equals(this.key, other.key) && + Objects.equals(this.tableName, other.tableName); + } + } \ No newline at end of file From 1838f88d2b0ad447bd8d1323ef01c22ce91e141d Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Wed, 5 Nov 2014 18:42:00 -0800 Subject: [PATCH 14/26] DatabaseStateMachine: fix type mismatch Change-Id: I267c7390e46b503d8bfb5ffcb2e09df3738ff5b0 --- .../onos/store/service/impl/DatabaseStateMachine.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java index 2822f25251..891de418a6 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java @@ -5,8 +5,6 @@ import static org.slf4j.LoggerFactory.getLogger; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; - import net.kuujo.copycat.Command; import net.kuujo.copycat.Query; import net.kuujo.copycat.StateMachine; @@ -20,6 +18,7 @@ import org.onlab.onos.store.service.WriteResult; import org.onlab.util.KryoNamespace; import org.slf4j.Logger; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; /** @@ -64,8 +63,8 @@ public class DatabaseStateMachine implements StateMachine { } @Query - public Set listTables() { - return state.getTables().keySet(); + public List listTables() { + return ImmutableList.copyOf(state.getTables().keySet()); } @Query From bddc81c4ec3e6a000d97a07cd43d95a18bf4cff6 Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Wed, 5 Nov 2014 18:53:09 -0800 Subject: [PATCH 15/26] add comments to DatabaseStateMachine Change-Id: I775e2f6c0250ac158e301408e423ebdb29d75668 --- .../onos/store/service/impl/DatabaseStateMachine.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java index 891de418a6..6d8d5bdb00 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java @@ -89,6 +89,8 @@ public class DatabaseStateMachine implements StateMachine { @Command public List write(List requests) { + + // applicability check boolean abort = false; List validationResults = new ArrayList<>(requests.size()); for (WriteRequest request : requests) { @@ -132,8 +134,13 @@ public class DatabaseStateMachine implements StateMachine { return results; } + // apply changes for (WriteRequest request : requests) { Map table = state.getTables().get(request.tableName()); + // FIXME: If this method could be called by multiple thread, + // synchronization scope is wrong. + // Whole function including applicability check needs to be protected. + // Confirm copycat's thread safety requirement for StateMachine synchronized (table) { VersionedValue previousValue = table.put(request.key(), new VersionedValue(request.newValue(), state.nextVersion())); From 3bd8cdcfa124c7545e1ce629f28b6292d8514886 Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Wed, 5 Nov 2014 19:11:44 -0800 Subject: [PATCH 16/26] copy VersionedValue before returning to protect internal state Change-Id: If56c512488b90bbfbabe32434e567b463d4acf1a --- .../onos/store/service/VersionedValue.java | 22 +++++++++++++++++++ .../service/impl/DatabaseStateMachine.java | 2 +- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java index 852fb07055..ae6969ce4a 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java @@ -38,6 +38,28 @@ public class VersionedValue { return version; } + /** + * Creates a copy of given VersionedValue. + * + * @param original VersionedValue to create a copy + * @return same as original if original or it's value is null, + * otherwise creates a copy. + */ + public static VersionedValue copy(VersionedValue original) { + if (original == null) { + return null; + } + if (original.value == null) { + // immutable, no need to copy + return original; + } else { + return new VersionedValue( + Arrays.copyOf(original.value, + original.value.length), + original.version); + } + } + @Override public String toString() { return MoreObjects.toStringHelper(getClass()) diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java index 6d8d5bdb00..d132b7cfc1 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java @@ -76,7 +76,7 @@ public class DatabaseStateMachine implements StateMachine { results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null)); continue; } - VersionedValue value = table.get(request.key()); + VersionedValue value = VersionedValue.copy(table.get(request.key())); results.add(new InternalReadResult( InternalReadResult.Status.OK, new ReadResult( From 778f7ad29b74de12f26de12797a44f8d5ec731ae Mon Sep 17 00:00:00 2001 From: Madan Jampani Date: Wed, 5 Nov 2014 22:46:15 -0800 Subject: [PATCH 17/26] MapDB backed Copycat log implementation --- core/store/dist/pom.xml | 6 + .../impl/ClusterMessagingProtocolServer.java | 53 ++-- .../service/impl/DatabaseStateMachine.java | 7 +- .../onos/store/service/impl/MapDBLog.java | 280 ++++++++++++++++++ .../store/service/impl/SnapshotException.java | 13 + .../onos/store/service/impl/MapDBLogTest.java | 193 ++++++++++++ 6 files changed, 525 insertions(+), 27 deletions(-) create mode 100644 core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java create mode 100644 core/store/dist/src/main/java/org/onlab/onos/store/service/impl/SnapshotException.java create mode 100644 core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml index 248e7636d6..f0f37c7518 100644 --- a/core/store/dist/pom.xml +++ b/core/store/dist/pom.xml @@ -63,6 +63,12 @@ --> + + org.mapdb + mapdb + 1.0.6 + + com.fasterxml.jackson.core jackson-databind diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java index 7d94847771..b3eaeb4e90 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java @@ -3,12 +3,17 @@ package org.onlab.onos.store.service.impl; import static org.slf4j.LoggerFactory.getLogger; import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; import net.kuujo.copycat.protocol.PingRequest; +import net.kuujo.copycat.protocol.PingResponse; import net.kuujo.copycat.protocol.PollRequest; +import net.kuujo.copycat.protocol.PollResponse; import net.kuujo.copycat.protocol.RequestHandler; import net.kuujo.copycat.protocol.SubmitRequest; +import net.kuujo.copycat.protocol.SubmitResponse; import net.kuujo.copycat.protocol.SyncRequest; +import net.kuujo.copycat.protocol.SyncResponse; import net.kuujo.copycat.spi.protocol.ProtocolServer; import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; @@ -57,37 +62,37 @@ public class ClusterMessagingProtocolServer implements ProtocolServer { public void handle(ClusterMessage message) { T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload()); if (request.getClass().equals(PingRequest.class)) { - handler.ping((PingRequest) request).whenComplete((response, error) -> { - try { - message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response)); - } catch (Exception e) { - log.error("Failed to respond to ping request", e); - } - }); + handler.ping((PingRequest) request).whenComplete(new PostExecutionTask(message)); } else if (request.getClass().equals(PollRequest.class)) { - handler.poll((PollRequest) request).whenComplete((response, error) -> { - try { - message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response)); - } catch (Exception e) { - log.error("Failed to respond to poll request", e); - } - }); + handler.poll((PollRequest) request).whenComplete(new PostExecutionTask(message)); } else if (request.getClass().equals(SyncRequest.class)) { - handler.sync((SyncRequest) request).whenComplete((response, error) -> { - try { - message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response)); - } catch (Exception e) { - log.error("Failed to respond to sync request", e); - } - }); + handler.sync((SyncRequest) request).whenComplete(new PostExecutionTask(message)); } else if (request.getClass().equals(SubmitRequest.class)) { - handler.submit((SubmitRequest) request).whenComplete((response, error) -> { + handler.submit((SubmitRequest) request).whenComplete(new PostExecutionTask(message)); + } else { + throw new IllegalStateException("Unknown request type: " + request.getClass().getName()); + } + } + + private class PostExecutionTask implements BiConsumer { + + private final ClusterMessage message; + + public PostExecutionTask(ClusterMessage message) { + this.message = message; + } + + @Override + public void accept(R response, Throwable t) { + if (t != null) { + log.error("Processing for " + message.subject() + " failed.", t); + } else { try { message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response)); } catch (Exception e) { - log.error("Failed to respond to submit request", e); + log.error("Failed to respond to " + response.getClass().getName(), e); } - }); + } } } } diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java index d132b7cfc1..96629766b6 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java @@ -172,8 +172,8 @@ public class DatabaseStateMachine implements StateMachine { try { return SERIALIZER.encode(state); } catch (Exception e) { - log.error("Snapshot serialization error", e); - return null; + log.error("Failed to take snapshot", e); + throw new SnapshotException(e); } } @@ -182,7 +182,8 @@ public class DatabaseStateMachine implements StateMachine { try { this.state = SERIALIZER.decode(data); } catch (Exception e) { - log.error("Snapshot deserialization error", e); + log.error("Failed to install from snapshot", e); + throw new SnapshotException(e); } } } diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java new file mode 100644 index 0000000000..893c31179a --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java @@ -0,0 +1,280 @@ +package org.onlab.onos.store.service.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentNavigableMap; + +import net.kuujo.copycat.log.Entry; +import net.kuujo.copycat.log.Log; +import net.kuujo.copycat.log.LogIndexOutOfBoundsException; + +import org.mapdb.Atomic; +import org.mapdb.BTreeMap; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.TxBlock; +import org.mapdb.TxMaker; +import org.onlab.onos.store.serializers.StoreSerializer; + +import com.google.common.collect.Lists; + +/** + * MapDB based log implementation. + */ +public class MapDBLog implements Log { + + private final File dbFile; + private TxMaker txMaker; + private final StoreSerializer serializer; + private static final String LOG_NAME = "log"; + private static final String SIZE_FIELD_NAME = "size"; + + public MapDBLog(File dbFile, StoreSerializer serializer) { + this.dbFile = dbFile; + this.serializer = serializer; + } + + @Override + public void open() throws IOException { + txMaker = DBMaker + .newFileDB(dbFile) + .makeTxMaker(); + } + + @Override + public void close() throws IOException { + assertIsOpen(); + txMaker.close(); + txMaker = null; + } + + @Override + public boolean isOpen() { + return txMaker != null; + } + + protected void assertIsOpen() { + checkState(isOpen(), "The log is not currently open."); + } + + @Override + public long appendEntry(Entry entry) { + checkArgument(entry != null, "expecting non-null entry"); + return appendEntries(entry).get(0); + } + + @Override + public List appendEntries(Entry... entries) { + checkArgument(entries != null, "expecting non-null entries"); + return appendEntries(Arrays.asList(entries)); + } + + @Override + public List appendEntries(List entries) { + assertIsOpen(); + checkArgument(entries != null, "expecting non-null entries"); + final List indices = Lists.newArrayList(); + + txMaker.execute(new TxBlock() { + @Override + public void tx(DB db) { + BTreeMap log = db.getTreeMap(LOG_NAME); + Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME); + long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1; + for (Entry entry : entries) { + byte[] entryBytes = serializer.encode(entry); + log.put(nextIndex, entryBytes); + size.addAndGet(entryBytes.length); + indices.add(nextIndex); + nextIndex++; + } + } + }); + + return indices; + } + + @Override + public boolean containsEntry(long index) { + assertIsOpen(); + DB db = txMaker.makeTx(); + try { + BTreeMap log = db.getTreeMap(LOG_NAME); + return log.containsKey(index); + } finally { + db.close(); + } + } + + @Override + public void delete() throws IOException { + assertIsOpen(); + txMaker.execute(new TxBlock() { + @Override + public void tx(DB db) { + BTreeMap log = db.getTreeMap(LOG_NAME); + Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME); + log.clear(); + size.set(0); + } + }); + } + + @Override + public T firstEntry() { + assertIsOpen(); + DB db = txMaker.makeTx(); + try { + BTreeMap log = db.getTreeMap(LOG_NAME); + return log.isEmpty() ? null : serializer.decode(log.firstEntry().getValue()); + } finally { + db.close(); + } + } + + @Override + public long firstIndex() { + assertIsOpen(); + DB db = txMaker.makeTx(); + try { + BTreeMap log = db.getTreeMap(LOG_NAME); + return log.isEmpty() ? 0 : log.firstKey(); + } finally { + db.close(); + } + } + + @Override + public List getEntries(long from, long to) { + assertIsOpen(); + DB db = txMaker.makeTx(); + try { + BTreeMap log = db.getTreeMap(LOG_NAME); + if (log.isEmpty()) { + throw new LogIndexOutOfBoundsException("Log is empty"); + } else if (from < log.firstKey()) { + throw new LogIndexOutOfBoundsException("From index out of bounds."); + } else if (to > log.lastKey()) { + throw new LogIndexOutOfBoundsException("To index out of bounds."); + } + List entries = new ArrayList<>((int) (to - from + 1)); + for (long i = from; i <= to; i++) { + T entry = serializer.decode(log.get(i)); + entries.add(entry); + } + return entries; + } finally { + db.close(); + } + } + + @Override + public T getEntry(long index) { + assertIsOpen(); + DB db = txMaker.makeTx(); + try { + BTreeMap log = db.getTreeMap(LOG_NAME); + byte[] entryBytes = log.get(index); + return entryBytes == null ? null : serializer.decode(entryBytes); + } finally { + db.close(); + } + } + + @Override + public boolean isEmpty() { + assertIsOpen(); + DB db = txMaker.makeTx(); + try { + BTreeMap log = db.getTreeMap(LOG_NAME); + return log.isEmpty(); + } finally { + db.close(); + } + } + + @Override + public T lastEntry() { + assertIsOpen(); + DB db = txMaker.makeTx(); + try { + BTreeMap log = db.getTreeMap(LOG_NAME); + return log.isEmpty() ? null : serializer.decode(log.lastEntry().getValue()); + } finally { + db.close(); + } + } + + @Override + public long lastIndex() { + assertIsOpen(); + DB db = txMaker.makeTx(); + try { + BTreeMap log = db.getTreeMap(LOG_NAME); + return log.isEmpty() ? 0 : log.lastKey(); + } finally { + db.close(); + } + } + + @Override + public void removeAfter(long index) { + assertIsOpen(); + txMaker.execute(new TxBlock() { + @Override + public void tx(DB db) { + BTreeMap log = db.getTreeMap(LOG_NAME); + Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME); + long startIndex = index + 1; + long endIndex = log.lastKey(); + for (long i = startIndex; i <= endIndex; ++i) { + byte[] entryBytes = log.remove(i); + size.addAndGet(-1L * entryBytes.length); + } + } + }); + } + + @Override + public long size() { + assertIsOpen(); + DB db = txMaker.makeTx(); + try { + Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME); + return size.get(); + } finally { + db.close(); + } + } + + @Override + public void sync() throws IOException { + assertIsOpen(); + } + + @Override + public void compact(long index, Entry entry) throws IOException { + + assertIsOpen(); + txMaker.execute(new TxBlock() { + @Override + public void tx(DB db) { + BTreeMap log = db.getTreeMap(LOG_NAME); + Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME); + ConcurrentNavigableMap headMap = log.headMap(index); + long deletedBytes = headMap.keySet().stream().mapToLong(i -> log.remove(i).length).sum(); + size.addAndGet(-1 * deletedBytes); + byte[] entryBytes = serializer.encode(entry); + byte[] existingEntry = log.put(index, entryBytes); + size.addAndGet(entryBytes.length - existingEntry.length); + db.compact(); + } + }); + } +} \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/SnapshotException.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/SnapshotException.java new file mode 100644 index 0000000000..4cfc13b618 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/SnapshotException.java @@ -0,0 +1,13 @@ +package org.onlab.onos.store.service.impl; + +import org.onlab.onos.store.service.DatabaseException; + +/** + * Exception that indicates a problem with the state machine snapshotting. + */ +@SuppressWarnings("serial") +public class SnapshotException extends DatabaseException { + public SnapshotException(Throwable t) { + super(t); + } +} diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java new file mode 100644 index 0000000000..75beefdeb8 --- /dev/null +++ b/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java @@ -0,0 +1,193 @@ +package org.onlab.onos.store.service.impl; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; + +import net.kuujo.copycat.internal.log.OperationEntry; +import net.kuujo.copycat.log.Entry; +import net.kuujo.copycat.log.Log; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.onlab.onos.store.serializers.StoreSerializer; + +import com.google.common.testing.EqualsTester; + +/** + * Test the MapDBLog implementation. + */ +public class MapDBLogTest { + + private static final String DB_FILE_NAME = "mapdbTest"; + private static final StoreSerializer SERIALIZER = ClusterMessagingProtocol.SERIALIZER; + private static final Entry TEST_ENTRY1 = new OperationEntry(1, "test1"); + private static final Entry TEST_ENTRY2 = new OperationEntry(2, "test12"); + private static final Entry TEST_ENTRY3 = new OperationEntry(3, "test123"); + private static final Entry TEST_ENTRY4 = new OperationEntry(4, "test1234"); + + private static final Entry TEST_SNAPSHOT_ENTRY = new OperationEntry(5, "snapshot"); + + private static final long TEST_ENTRY1_SIZE = SERIALIZER.encode(TEST_ENTRY1).length; + private static final long TEST_ENTRY2_SIZE = SERIALIZER.encode(TEST_ENTRY2).length; + private static final long TEST_ENTRY3_SIZE = SERIALIZER.encode(TEST_ENTRY3).length; + private static final long TEST_ENTRY4_SIZE = SERIALIZER.encode(TEST_ENTRY4).length; + + private static final long TEST_SNAPSHOT_ENTRY_SIZE = SERIALIZER.encode(TEST_SNAPSHOT_ENTRY).length; + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + Files.deleteIfExists(new File(DB_FILE_NAME).toPath()); + } + + @Test(expected = IllegalStateException.class) + public void testAssertOpen() { + Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER); + log.size(); + } + + @Test + public void testAppendEntry() throws IOException { + Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER); + log.open(); + log.appendEntry(TEST_ENTRY1); + OperationEntry first = log.firstEntry(); + OperationEntry last = log.lastEntry(); + new EqualsTester() + .addEqualityGroup(first, last, TEST_ENTRY1) + .testEquals(); + Assert.assertEquals(TEST_ENTRY1_SIZE, log.size()); + Assert.assertEquals(1, log.firstIndex()); + Assert.assertEquals(1, log.lastIndex()); + } + + @Test + public void testAppendEntries() throws IOException { + Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER); + log.open(); + log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3); + OperationEntry first = log.firstEntry(); + OperationEntry last = log.lastEntry(); + new EqualsTester() + .addEqualityGroup(first, TEST_ENTRY1) + .addEqualityGroup(last, TEST_ENTRY3) + .testEquals(); + Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY2_SIZE, TEST_ENTRY3_SIZE, log.size()); + Assert.assertEquals(1, log.firstIndex()); + Assert.assertEquals(3, log.lastIndex()); + Assert.assertTrue(log.containsEntry(1)); + Assert.assertTrue(log.containsEntry(2)); + } + + @Test + public void testDelete() throws IOException { + Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER); + log.open(); + log.appendEntries(TEST_ENTRY1, TEST_ENTRY2); + log.delete(); + Assert.assertEquals(0, log.size()); + Assert.assertTrue(log.isEmpty()); + Assert.assertEquals(0, log.firstIndex()); + Assert.assertNull(log.firstEntry()); + Assert.assertEquals(0, log.lastIndex()); + Assert.assertNull(log.lastEntry()); + } + + @Test + public void testGetEntries() throws IOException { + Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER); + log.open(); + log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4); + Assert.assertEquals( + TEST_ENTRY1_SIZE + + TEST_ENTRY2_SIZE + + TEST_ENTRY3_SIZE + + TEST_ENTRY4_SIZE, log.size()); + + List entries = log.getEntries(2, 3); + new EqualsTester() + .addEqualityGroup(log.getEntry(4), TEST_ENTRY4) + .addEqualityGroup(entries.get(0), TEST_ENTRY2) + .addEqualityGroup(entries.get(1), TEST_ENTRY3) + .testEquals(); + } + + @Test + public void testRemoveAfter() throws IOException { + Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER); + log.open(); + log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4); + log.removeAfter(1); + Assert.assertEquals(TEST_ENTRY1_SIZE, log.size()); + new EqualsTester() + .addEqualityGroup(log.firstEntry(), log.lastEntry(), TEST_ENTRY1) + .testEquals(); + } + + @Test + public void testAddAfterRemove() throws IOException { + Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER); + log.open(); + log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4); + log.removeAfter(1); + log.appendEntry(TEST_ENTRY4); + Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE, log.size()); + new EqualsTester() + .addEqualityGroup(log.firstEntry(), TEST_ENTRY1) + .addEqualityGroup(log.lastEntry(), TEST_ENTRY4) + .addEqualityGroup(log.size(), TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE) + .testEquals(); + } + + @Test + public void testClose() throws IOException { + Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER); + Assert.assertFalse(log.isOpen()); + log.open(); + Assert.assertTrue(log.isOpen()); + log.close(); + Assert.assertFalse(log.isOpen()); + } + + @Test + public void testReopen() throws IOException { + Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER); + log.open(); + log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4); + log.close(); + log.open(); + + new EqualsTester() + .addEqualityGroup(log.firstEntry(), TEST_ENTRY1) + .addEqualityGroup(log.getEntry(2), TEST_ENTRY2) + .addEqualityGroup(log.lastEntry(), TEST_ENTRY4) + .addEqualityGroup(log.size(), + TEST_ENTRY1_SIZE + + TEST_ENTRY2_SIZE + + TEST_ENTRY3_SIZE + + TEST_ENTRY4_SIZE) + .testEquals(); + } + + @Test + public void testCompact() throws IOException { + Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER); + log.open(); + log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4); + log.compact(3, TEST_SNAPSHOT_ENTRY); + new EqualsTester() + .addEqualityGroup(log.firstEntry(), TEST_SNAPSHOT_ENTRY) + .addEqualityGroup(log.lastEntry(), TEST_ENTRY4) + .addEqualityGroup(log.size(), + TEST_SNAPSHOT_ENTRY_SIZE + + TEST_ENTRY4_SIZE) + .testEquals(); + } +} From c195732463ef167700e866a9b8b9532f434f966d Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Wed, 5 Nov 2014 23:45:00 -0800 Subject: [PATCH 18/26] cleaning up the mess Change-Id: I2688b9ff9c83d86fda924531becd033f28873b09 --- features/features.xml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/features/features.xml b/features/features.xml index 0e7dfbf5e7..b8ef86f5b4 100644 --- a/features/features.xml +++ b/features/features.xml @@ -56,6 +56,9 @@ mvn:org.codehaus.jackson/jackson-mapper-asl/1.9.13 mvn:org.onlab.onos/onlab-thirdparty/1.0.0-SNAPSHOT + + mvn:org.mapdb/mapdb/1.0.6 +