From 3d643ecc8d04abf946638b2c0a65d59b9ba62c2c Mon Sep 17 00:00:00 2001 From: alshabib Date: Wed, 22 Oct 2014 18:33:00 -0700 Subject: [PATCH] functional stats service Change-Id: I90de3aa5d7721db8ef6a154e122af8b446243f60 --- .../onlab/onos/net/statistic/DefaultLoad.java | 56 ++++ .../org/onlab/onos/net/statistic/Load.java | 16 +- .../onos/net/flow/impl/FlowRuleManager.java | 5 + .../onos/net/packet/impl/PacketManager.java | 4 +- .../net/statistic/impl/StatisticManager.java | 76 +++-- .../net/flow/impl/FlowRuleManagerTest.java | 25 +- .../impl/DistributedStatisticStore.java | 289 ++++++++++++++++++ .../impl/StatisticStoreMessageSubjects.java | 15 + .../store/statistic/impl/package-info.java | 4 + .../store/serializers/KryoNamespaces.java | 4 + .../provider/lldp/impl/LinkDiscovery.java | 11 +- .../of/flow/impl/OpenFlowRuleProvider.java | 12 +- 12 files changed, 483 insertions(+), 34 deletions(-) create mode 100644 core/api/src/main/java/org/onlab/onos/net/statistic/DefaultLoad.java create mode 100644 core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java create mode 100644 core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/StatisticStoreMessageSubjects.java create mode 100644 core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/package-info.java diff --git a/core/api/src/main/java/org/onlab/onos/net/statistic/DefaultLoad.java b/core/api/src/main/java/org/onlab/onos/net/statistic/DefaultLoad.java new file mode 100644 index 0000000000..1826dded31 --- /dev/null +++ b/core/api/src/main/java/org/onlab/onos/net/statistic/DefaultLoad.java @@ -0,0 +1,56 @@ +package org.onlab.onos.net.statistic; + +import org.onlab.onos.net.flow.FlowRuleProvider; + +/** + * Implementation of a load. + */ +public class DefaultLoad implements Load { + + private final boolean isValid; + private final long current; + private final long previous; + private final long time; + + /** + * Creates an invalid load. + */ + public DefaultLoad() { + this.isValid = false; + this.time = System.currentTimeMillis(); + this.current = -1; + this.previous = -1; + } + + /** + * Creates a load value from the parameters. + * @param current the current value + * @param previous the previous value + */ + public DefaultLoad(long current, long previous) { + this.current = current; + this.previous = previous; + this.time = System.currentTimeMillis(); + this.isValid = true; + } + + @Override + public long rate() { + return (current - previous) / FlowRuleProvider.POLL_INTERVAL; + } + + @Override + public long latest() { + return current; + } + + @Override + public boolean isValid() { + return isValid; + } + + @Override + public long time() { + return time; + } +} diff --git a/core/api/src/main/java/org/onlab/onos/net/statistic/Load.java b/core/api/src/main/java/org/onlab/onos/net/statistic/Load.java index 534b10c923..b609f2b4a2 100644 --- a/core/api/src/main/java/org/onlab/onos/net/statistic/Load.java +++ b/core/api/src/main/java/org/onlab/onos/net/statistic/Load.java @@ -6,15 +6,27 @@ package org.onlab.onos.net.statistic; public interface Load { /** - * Obtain the current observed rate on a link. + * Obtain the current observed rate (in bytes/s) on a link. * @return long value */ long rate(); /** - * Obtain the latest counter viewed on that link. + * Obtain the latest bytes counter viewed on that link. * @return long value */ long latest(); + /** + * Indicates whether this load was built on valid values. + * @return boolean + */ + boolean isValid(); + + /** + * Returns when this value was seen. + * @return epoch time + */ + long time(); + } diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java index 01514d4eef..d8f89ae7b0 100644 --- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java +++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java @@ -108,6 +108,9 @@ public class FlowRuleManager if (local) { // TODO: aggregate all local rules and push down once? applyFlowRulesToProviders(f); + eventDispatcher.post( + new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, f)); + } } } @@ -136,6 +139,8 @@ public class FlowRuleManager if (local) { // TODO: aggregate all local rules and push down once? removeFlowRulesFromProviders(f); + eventDispatcher.post( + new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, f)); } } } diff --git a/core/net/src/main/java/org/onlab/onos/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onlab/onos/net/packet/impl/PacketManager.java index e682c499b2..49d21e0021 100644 --- a/core/net/src/main/java/org/onlab/onos/net/packet/impl/PacketManager.java +++ b/core/net/src/main/java/org/onlab/onos/net/packet/impl/PacketManager.java @@ -68,7 +68,9 @@ implements PacketService, PacketProviderRegistry { checkNotNull(packet, "Packet cannot be null"); final Device device = deviceService.getDevice(packet.sendThrough()); final PacketProvider packetProvider = getProvider(device.providerId()); - packetProvider.emit(packet); + if (packetProvider != null) { + packetProvider.emit(packet); + } } @Override diff --git a/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java b/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java index 9b1a2e0a68..90db729416 100644 --- a/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java +++ b/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java @@ -10,15 +10,19 @@ import org.onlab.onos.net.ConnectPoint; import org.onlab.onos.net.Link; import org.onlab.onos.net.Path; +import org.onlab.onos.net.flow.FlowEntry; import org.onlab.onos.net.flow.FlowRule; import org.onlab.onos.net.flow.FlowRuleEvent; import org.onlab.onos.net.flow.FlowRuleListener; import org.onlab.onos.net.flow.FlowRuleService; +import org.onlab.onos.net.statistic.DefaultLoad; import org.onlab.onos.net.statistic.Load; import org.onlab.onos.net.statistic.StatisticService; import org.onlab.onos.net.statistic.StatisticStore; import org.slf4j.Logger; +import java.util.Set; + import static org.slf4j.LoggerFactory.getLogger; /** @@ -54,12 +58,12 @@ public class StatisticManager implements StatisticService { @Override public Load load(Link link) { - return null; + return load(link.src()); } @Override public Load load(ConnectPoint connectPoint) { - return null; + return loadInternal(connectPoint); } @Override @@ -77,6 +81,35 @@ public class StatisticManager implements StatisticService { return null; } + private Load loadInternal(ConnectPoint connectPoint) { + Set current; + Set previous; + synchronized (statisticStore) { + current = statisticStore.getCurrentStatistic(connectPoint); + previous = statisticStore.getPreviousStatistic(connectPoint); + } + if (current == null || previous == null) { + return new DefaultLoad(); + } + long currentAggregate = aggregate(current); + long previousAggregate = aggregate(previous); + + return new DefaultLoad(currentAggregate, previousAggregate); + } + + /** + * Aggregates a set of values. + * @param values the values to aggregate + * @return a long value + */ + private long aggregate(Set values) { + long sum = 0; + for (FlowEntry f : values) { + sum += f.bytes(); + } + return sum; + } + /** * Internal flow rule event listener. */ @@ -84,22 +117,29 @@ public class StatisticManager implements StatisticService { @Override public void event(FlowRuleEvent event) { -// FlowRule rule = event.subject(); -// switch (event.type()) { -// case RULE_ADDED: -// case RULE_UPDATED: -// if (rule instanceof FlowEntry) { -// statisticStore.addOrUpdateStatistic((FlowEntry) rule); -// } -// break; -// case RULE_ADD_REQUESTED: -// statisticStore.prepareForStatistics(rule); -// break; -// case RULE_REMOVE_REQUESTED: -// case RULE_REMOVED: -// statisticStore.removeFromStatistics(rule); -// break; -// } + FlowRule rule = event.subject(); + switch (event.type()) { + case RULE_ADDED: + case RULE_UPDATED: + if (rule instanceof FlowEntry) { + statisticStore.addOrUpdateStatistic((FlowEntry) rule); + } else { + log.warn("IT AIN'T A FLOWENTRY"); + } + break; + case RULE_ADD_REQUESTED: + log.info("Preparing for stats"); + statisticStore.prepareForStatistics(rule); + break; + case RULE_REMOVE_REQUESTED: + log.info("Removing stats"); + statisticStore.removeFromStatistics(rule); + break; + case RULE_REMOVED: + break; + default: + log.warn("Unknown flow rule event {}", event); + } } } diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java index ca7cc077f0..cb966dcef8 100644 --- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java +++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java @@ -6,9 +6,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED; -import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED; -import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED; +import static org.onlab.onos.net.flow.FlowRuleEvent.Type.*; import java.util.ArrayList; import java.util.Collections; @@ -164,7 +162,8 @@ public class FlowRuleManagerTest { assertEquals("2 rules should exist", 2, flowCount()); providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2)); - validateEvents(RULE_ADDED, RULE_ADDED); + validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, + RULE_ADDED, RULE_ADDED); addFlowRule(1); assertEquals("should still be 2 rules", 2, flowCount()); @@ -217,11 +216,12 @@ public class FlowRuleManagerTest { FlowEntry fe2 = new DefaultFlowEntry(f2); FlowEntry fe3 = new DefaultFlowEntry(f3); providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2, fe3)); - validateEvents(RULE_ADDED, RULE_ADDED, RULE_ADDED); + validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, + RULE_ADDED, RULE_ADDED, RULE_ADDED); mgr.removeFlowRules(f1, f2); //removing from north, so no events generated - validateEvents(); + validateEvents(RULE_REMOVE_REQUESTED, RULE_REMOVE_REQUESTED); assertEquals("3 rule should exist", 3, flowCount()); assertTrue("Entries should be pending remove.", validateState(ImmutableMap.of( @@ -243,7 +243,8 @@ public class FlowRuleManagerTest { service.removeFlowRules(f1); fe1.setState(FlowEntryState.REMOVED); providerService.flowRemoved(fe1); - validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED); + validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADDED, + RULE_ADDED, RULE_REMOVE_REQUESTED, RULE_REMOVED); providerService.flowRemoved(fe1); validateEvents(); @@ -252,7 +253,7 @@ public class FlowRuleManagerTest { FlowEntry fe3 = new DefaultFlowEntry(f3); service.applyFlowRules(f3); providerService.pushFlowMetrics(DID, Collections.singletonList(fe3)); - validateEvents(RULE_ADDED); + validateEvents(RULE_ADD_REQUESTED, RULE_ADDED); providerService.flowRemoved(fe3); validateEvents(); @@ -281,7 +282,8 @@ public class FlowRuleManagerTest { f2, FlowEntryState.ADDED, f3, FlowEntryState.PENDING_ADD))); - validateEvents(RULE_ADDED, RULE_ADDED); + validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, + RULE_ADDED, RULE_ADDED); } @Test @@ -301,7 +303,7 @@ public class FlowRuleManagerTest { providerService.pushFlowMetrics(DID, Lists.newArrayList(fe1, fe2, fe3)); - validateEvents(RULE_ADDED, RULE_ADDED); + validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADDED, RULE_ADDED); } @@ -326,7 +328,8 @@ public class FlowRuleManagerTest { providerService.pushFlowMetrics(DID, Lists.newArrayList(fe1, fe2)); - validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED); + validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, + RULE_REMOVE_REQUESTED, RULE_ADDED, RULE_ADDED, RULE_REMOVED); } diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java new file mode 100644 index 0000000000..d89ce252d6 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java @@ -0,0 +1,289 @@ +package org.onlab.onos.store.statistic.impl; + +import static org.onlab.onos.store.statistic.impl.StatisticStoreMessageSubjects.*; +import static org.slf4j.LoggerFactory.getLogger; + +import com.google.common.collect.ImmutableSet; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.onos.cluster.ClusterService; +import org.onlab.onos.net.ConnectPoint; +import org.onlab.onos.net.PortNumber; +import org.onlab.onos.net.flow.FlowEntry; +import org.onlab.onos.net.flow.FlowRule; +import org.onlab.onos.net.flow.instructions.Instruction; +import org.onlab.onos.net.flow.instructions.Instructions; +import org.onlab.onos.net.statistic.StatisticStore; +import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; +import org.onlab.onos.store.cluster.messaging.ClusterMessage; +import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; +import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse; +import org.onlab.onos.store.flow.ReplicaInfo; +import org.onlab.onos.store.flow.ReplicaInfoService; +import org.onlab.onos.store.serializers.KryoNamespaces; +import org.onlab.onos.store.serializers.KryoSerializer; +import org.slf4j.Logger; + + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * Maintains statistics using RPC calls to collect stats from remote instances + * on demand. + */ +@Component(immediate = true) +@Service +public class DistributedStatisticStore implements StatisticStore { + + private final Logger log = getLogger(getClass()); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + private ReplicaInfoService replicaInfoManager; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + private ClusterCommunicationService clusterCommunicator; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + private ClusterService clusterService; + + private Map representations = + new ConcurrentHashMap<>(); + + private Map> previous = + new ConcurrentHashMap<>(); + + private Map> current = + new ConcurrentHashMap<>(); + + protected static final KryoSerializer SERIALIZER = new KryoSerializer() { + @Override + protected void setupKryoPool() { + serializerPool = KryoNamespaces.API.newBuilder() + .build() + .populate(1); + } + };; + + private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000; + + @Activate + public void activate() { + clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() { + + @Override + public void handle(ClusterMessage message) { + ConnectPoint cp = SERIALIZER.decode(message.payload()); + try { + message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp))); + } catch (IOException e) { + log.error("Failed to respond back", e); + } + } + }); + + clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() { + + @Override + public void handle(ClusterMessage message) { + ConnectPoint cp = SERIALIZER.decode(message.payload()); + try { + message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp))); + } catch (IOException e) { + log.error("Failed to respond back", e); + } + } + }); + log.info("Started"); + } + + @Deactivate + public void deactivate() { + log.info("Stopped"); + } + + @Override + public void prepareForStatistics(FlowRule rule) { + ConnectPoint cp = buildConnectPoint(rule); + if (cp == null) { + return; + } + InternalStatisticRepresentation rep; + synchronized (representations) { + rep = getOrCreateRepresentation(cp); + } + rep.prepare(); + } + + @Override + public void removeFromStatistics(FlowRule rule) { + ConnectPoint cp = buildConnectPoint(rule); + if (cp == null) { + return; + } + InternalStatisticRepresentation rep = representations.get(cp); + if (rep != null) { + rep.remove(rule); + } + } + + @Override + public void addOrUpdateStatistic(FlowEntry rule) { + ConnectPoint cp = buildConnectPoint(rule); + if (cp == null) { + return; + } + InternalStatisticRepresentation rep = representations.get(cp); + if (rep != null && rep.submit(rule)) { + updatePublishedStats(cp, rep.get()); + } + } + + private synchronized void updatePublishedStats(ConnectPoint cp, + Set flowEntries) { + Set curr = current.get(cp); + if (curr == null) { + curr = new HashSet<>(); + } + previous.put(cp, curr); + current.put(cp, flowEntries); + + } + + @Override + public Set getCurrentStatistic(ConnectPoint connectPoint) { + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId()); + if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) { + return getCurrentStatisticInternal(connectPoint); + } else { + ClusterMessage message = new ClusterMessage( + clusterService.getLocalNode().id(), + GET_CURRENT, + SERIALIZER.encode(connectPoint)); + + try { + ClusterMessageResponse response = + clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); + return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS, + TimeUnit.MILLISECONDS)); + } catch (IOException | TimeoutException e) { + // FIXME: throw a FlowStoreException + throw new RuntimeException(e); + } + } + + } + + private synchronized Set getCurrentStatisticInternal(ConnectPoint connectPoint) { + return current.get(connectPoint); + } + + @Override + public Set getPreviousStatistic(ConnectPoint connectPoint) { + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId()); + if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) { + return getPreviousStatisticInternal(connectPoint); + } else { + ClusterMessage message = new ClusterMessage( + clusterService.getLocalNode().id(), + GET_CURRENT, + SERIALIZER.encode(connectPoint)); + + try { + ClusterMessageResponse response = + clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); + return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS, + TimeUnit.MILLISECONDS)); + } catch (IOException | TimeoutException e) { + // FIXME: throw a FlowStoreException + throw new RuntimeException(e); + } + } + + } + + private synchronized Set getPreviousStatisticInternal(ConnectPoint connectPoint) { + return previous.get(connectPoint); + } + + private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) { + + if (representations.containsKey(cp)) { + return representations.get(cp); + } else { + InternalStatisticRepresentation rep = new InternalStatisticRepresentation(); + representations.put(cp, rep); + return rep; + } + + } + + private ConnectPoint buildConnectPoint(FlowRule rule) { + PortNumber port = getOutput(rule); + if (port == null) { + log.warn("Rule {} has no output.", rule); + return null; + } + ConnectPoint cp = new ConnectPoint(rule.deviceId(), port); + return cp; + } + + private PortNumber getOutput(FlowRule rule) { + for (Instruction i : rule.treatment().instructions()) { + if (i.type() == Instruction.Type.OUTPUT) { + Instructions.OutputInstruction out = (Instructions.OutputInstruction) i; + return out.port(); + } + if (i.type() == Instruction.Type.DROP) { + return PortNumber.P0; + } + } + return null; + } + + private class InternalStatisticRepresentation { + + private final AtomicInteger counter = new AtomicInteger(0); + private final Set rules = new HashSet<>(); + + public void prepare() { + counter.incrementAndGet(); + } + + public synchronized void remove(FlowRule rule) { + rules.remove(rule); + counter.decrementAndGet(); + } + + public synchronized boolean submit(FlowEntry rule) { + if (rules.contains(rule)) { + rules.remove(rule); + } + rules.add(rule); + if (counter.get() == 0) { + return true; + } else { + return counter.decrementAndGet() == 0; + } + } + + public synchronized Set get() { + counter.set(rules.size()); + return ImmutableSet.copyOf(rules); + } + + + } + +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/StatisticStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/StatisticStoreMessageSubjects.java new file mode 100644 index 0000000000..a096a3d4a0 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/StatisticStoreMessageSubjects.java @@ -0,0 +1,15 @@ +package org.onlab.onos.store.statistic.impl; + +import org.onlab.onos.store.cluster.messaging.MessageSubject; + +/** + * MessageSubjects used by DistributedStatisticStore peer-peer communication. + */ +public final class StatisticStoreMessageSubjects { + private StatisticStoreMessageSubjects() {} + public static final MessageSubject GET_CURRENT = + new MessageSubject("peer-return-current"); + public static final MessageSubject GET_PREVIOUS = + new MessageSubject("peer-return-previous"); + +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/package-info.java new file mode 100644 index 0000000000..122d2be81a --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/package-info.java @@ -0,0 +1,4 @@ +/** + * Implementation of the statistic store. + */ +package org.onlab.onos.store.statistic.impl; \ No newline at end of file diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java index 9de6d8cde5..dc0eaa80c9 100644 --- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java +++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java @@ -4,6 +4,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import org.onlab.onos.cluster.ControllerNode; import org.onlab.onos.cluster.DefaultControllerNode; @@ -26,6 +27,7 @@ import org.onlab.onos.net.Port; import org.onlab.onos.net.PortNumber; import org.onlab.onos.net.device.DefaultDeviceDescription; import org.onlab.onos.net.device.DefaultPortDescription; +import org.onlab.onos.net.flow.DefaultFlowEntry; import org.onlab.onos.net.flow.DefaultFlowRule; import org.onlab.onos.net.flow.DefaultTrafficSelector; import org.onlab.onos.net.flow.DefaultTrafficTreatment; @@ -75,6 +77,7 @@ public final class KryoNamespaces { ArrayList.class, Arrays.asList().getClass(), HashMap.class, + HashSet.class, // // ControllerNode.State.class, @@ -94,6 +97,7 @@ public final class KryoNamespaces { HostDescription.class, DefaultHostDescription.class, DefaultFlowRule.class, + DefaultFlowEntry.class, FlowId.class, DefaultTrafficSelector.class, Criteria.PortCriterion.class, diff --git a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java index 95cd619f33..bf4fee035f 100644 --- a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java +++ b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java @@ -16,6 +16,7 @@ package org.onlab.onos.provider.lldp.impl; +import static com.google.common.base.Preconditions.checkNotNull; import static org.slf4j.LoggerFactory.getLogger; import java.nio.ByteBuffer; @@ -95,11 +96,13 @@ public class LinkDiscovery implements TimerTask { */ public LinkDiscovery(Device device, PacketService pktService, MastershipService masterService, LinkProviderService providerService, Boolean... useBDDP) { + this.device = device; this.probeRate = 3000; this.linkProvider = providerService; this.pktService = pktService; - this.mastershipService = masterService; + + this.mastershipService = checkNotNull(masterService, "WTF!"); this.slowPorts = Collections.synchronizedSet(new HashSet()); this.fastPorts = Collections.synchronizedSet(new HashSet()); this.portProbeCount = new HashMap<>(); @@ -344,6 +347,12 @@ public class LinkDiscovery implements TimerTask { } private void sendProbes(Long portNumber) { + if (device == null) { + log.warn("CRAZY SHIT"); + } + if (mastershipService == null) { + log.warn("INSANE"); + } if (device.type() != Device.Type.ROADM && mastershipService.getLocalRole(this.device.id()) == MastershipRole.MASTER) { diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java index 54265ba685..4a21add456 100644 --- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java +++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java @@ -103,6 +103,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr private final Map pendingFMs = new ConcurrentHashMap(); + private final Map collectors = Maps.newHashMap(); + /** * Creates an OpenFlow host provider. */ @@ -115,6 +117,14 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr providerService = providerRegistry.register(this); controller.addListener(listener); controller.addEventListener(listener); + + for (OpenFlowSwitch sw : controller.getSwitches()) { + FlowStatsCollector fsc = new FlowStatsCollector(sw, POLL_INTERVAL); + fsc.start(); + collectors.put(new Dpid(sw.getId()), fsc); + } + + log.info("Started"); } @@ -213,7 +223,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr private class InternalFlowProvider implements OpenFlowSwitchListener, OpenFlowEventListener { - private final Map collectors = Maps.newHashMap(); + private final Multimap completeEntries = ArrayListMultimap.create();