From fdbc4c27b428f3a9dcee905307adbda94e8c391c Mon Sep 17 00:00:00 2001 From: Thomas Vachuska Date: Fri, 29 May 2015 15:53:01 -0700 Subject: [PATCH] Adding multi-instance support for flow stats. Change-Id: I428c5a7cb58f4f9773a125fc94fb368ed846cb0d --- .../net/device/DefaultPortStatistics.java | 16 ++ .../store/device/impl/GossipDeviceStore.java | 189 ++++++++++-------- .../device/impl/GossipDeviceStoreTest.java | 15 +- .../store/serializers/KryoNamespaces.java | 6 +- tools/test/bin/onos-check-apps | 2 +- tools/test/cells/madan3 | 7 + tools/test/cells/tomx | 4 +- .../impl/TopologyViewMessageHandlerBase.java | 2 +- 8 files changed, 156 insertions(+), 85 deletions(-) create mode 100644 tools/test/cells/madan3 diff --git a/core/api/src/main/java/org/onosproject/net/device/DefaultPortStatistics.java b/core/api/src/main/java/org/onosproject/net/device/DefaultPortStatistics.java index 03828332a6..540a945f03 100644 --- a/core/api/src/main/java/org/onosproject/net/device/DefaultPortStatistics.java +++ b/core/api/src/main/java/org/onosproject/net/device/DefaultPortStatistics.java @@ -61,6 +61,22 @@ public final class DefaultPortStatistics implements PortStatistics { this.durationNano = durationNano; } + // Constructor for serializer + private DefaultPortStatistics() { + this.deviceId = null; + this.port = 0; + this.packetsReceived = 0; + this.packetsSent = 0; + this.bytesReceived = 0; + this.bytesSent = 0; + this.packetsRxDropped = 0; + this.packetsTxDropped = 0; + this.packetsRxErrors = 0; + this.packetsTxErrors = 0; + this.durationSec = 0; + this.durationNano = 0; + } + /** * Creates a builder for DefaultPortStatistics object. * diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java index 5ea27af43a..e705567a50 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java @@ -50,6 +50,7 @@ import org.onosproject.net.OduCltPort; import org.onosproject.net.OmsPort; import org.onosproject.net.Port; import org.onosproject.net.PortNumber; +import org.onosproject.net.device.DefaultPortStatistics; import org.onosproject.net.device.DeviceClockService; import org.onosproject.net.device.DeviceDescription; import org.onosproject.net.device.DeviceEvent; @@ -68,8 +69,16 @@ import org.onosproject.store.cluster.messaging.ClusterMessage; import org.onosproject.store.cluster.messaging.ClusterMessageHandler; import org.onosproject.store.cluster.messaging.MessageSubject; import org.onosproject.store.impl.Timestamped; +import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.serializers.KryoSerializer; import org.onosproject.store.serializers.custom.DistributedStoreSerializers; +import org.onosproject.store.service.EventuallyConsistentMap; +import org.onosproject.store.service.EventuallyConsistentMapEvent; +import org.onosproject.store.service.EventuallyConsistentMapListener; +import org.onosproject.store.service.MultiValuedTimestamp; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.WallClockTimestamp; +import org.onosproject.store.service.WallclockClockManager; import org.slf4j.Logger; import java.io.IOException; @@ -102,6 +111,7 @@ import static org.onosproject.net.DefaultAnnotations.merge; import static org.onosproject.net.device.DeviceEvent.Type.*; import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED; import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*; +import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT; import static org.slf4j.LoggerFactory.getLogger; /** @@ -123,13 +133,15 @@ public class GossipDeviceStore // innerMap is used to lock a Device, thus instance should never be replaced. // collection of Description given from various providers private final ConcurrentMap> - deviceDescs = Maps.newConcurrentMap(); + deviceDescs = Maps.newConcurrentMap(); // cache of Device and Ports generated by compositing descriptions from providers private final ConcurrentMap devices = Maps.newConcurrentMap(); private final ConcurrentMap> devicePorts = Maps.newConcurrentMap(); - private final ConcurrentMap> - devicePortStats = Maps.newConcurrentMap(); + + private EventuallyConsistentMap> devicePortStats; + private final EventuallyConsistentMapListener> + portStatsListener = new InternalPortStatsListener(); // to be updated under Device lock private final Map offline = Maps.newHashMap(); @@ -141,6 +153,9 @@ public class GossipDeviceStore @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected DeviceClockService deviceClockService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ClusterCommunicationService clusterCommunicator; @@ -182,10 +197,8 @@ public class GossipDeviceStore private long initialDelaySec = 5; private long periodSec = 5; - @Activate public void activate() { - executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d")); backgroundExecutor = @@ -198,8 +211,8 @@ public class GossipDeviceStore new InternalDeviceOfflineEventListener(), executor); clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ, - new InternalRemoveRequestListener(), - executor); + new InternalRemoveRequestListener(), + executor); clusterCommunicator.addSubscriber( GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor); clusterCommunicator.addSubscriber( @@ -217,8 +230,24 @@ public class GossipDeviceStore // start anti-entropy thread backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(), - initialDelaySec, periodSec, TimeUnit.SECONDS); + initialDelaySec, periodSec, TimeUnit.SECONDS); + // Create a distributed map for port stats. + KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder() + .register(KryoNamespaces.API) + .register(DefaultPortStatistics.class) + .register(DeviceId.class) + .register(MultiValuedTimestamp.class) + .register(WallClockTimestamp.class); + + devicePortStats = storageService.>eventuallyConsistentMapBuilder() + .withName("port-stats") + .withSerializer(deviceDataSerializer) + .withAntiEntropyPeriod(5, TimeUnit.SECONDS) + .withClockService(new WallclockClockManager<>()) + .withTombstonesDisabled() + .build(); + devicePortStats.addListener(portStatsListener); log.info("Started"); } @@ -272,8 +301,8 @@ public class GossipDeviceStore @Override public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, - DeviceId deviceId, - DeviceDescription deviceDescription) { + DeviceId deviceId, + DeviceDescription deviceDescription) { NodeId localNode = clusterService.getLocalNode().id(); NodeId deviceNode = mastershipService.getMasterFor(deviceId); @@ -294,7 +323,7 @@ public class GossipDeviceStore if (deviceEvent != null) { log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}", - providerId, deviceId); + providerId, deviceId); notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc)); } @@ -324,12 +353,12 @@ public class GossipDeviceStore } private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId, - DeviceId deviceId, - Timestamped deltaDesc) { + DeviceId deviceId, + Timestamped deltaDesc) { // Collection of DeviceDescriptions for a Device Map device - = getOrCreateDeviceDescriptionsMap(deviceId); + = getOrCreateDeviceDescriptionsMap(deviceId); synchronized (device) { // locking per device @@ -345,7 +374,7 @@ public class GossipDeviceStore final Device newDevice; if (deltaDesc == descs.getDeviceDesc() || - deltaDesc.isNewer(descs.getDeviceDesc())) { + deltaDesc.isNewer(descs.getDeviceDesc())) { // on new device or valid update descs.putDeviceDesc(deltaDesc); newDevice = composeDevice(deviceId, device); @@ -371,8 +400,8 @@ public class GossipDeviceStore // update composed device cache Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice); verify(oldDevice == null, - "Unexpected Device in cache. PID:%s [old=%s, new=%s]", - providerId, oldDevice, newDevice); + "Unexpected Device in cache. PID:%s [old=%s, new=%s]", + providerId, oldDevice, newDevice); if (!providerId.isAncillary()) { markOnline(newDevice.id(), timestamp); @@ -401,8 +430,8 @@ public class GossipDeviceStore boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice); if (!replaced) { verify(replaced, - "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]", - providerId, oldDevice, devices.get(newDevice.id()) + "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]", + providerId, oldDevice, devices.get(newDevice.id()) , newDevice); } if (!providerId.isAncillary()) { @@ -424,7 +453,7 @@ public class GossipDeviceStore final DeviceEvent event = markOfflineInternal(deviceId, timestamp); if (event != null) { log.info("Notifying peers of a device offline topology event for deviceId: {} {}", - deviceId, timestamp); + deviceId, timestamp); notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp)); } return event; @@ -433,7 +462,7 @@ public class GossipDeviceStore private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) { Map providerDescs - = getOrCreateDeviceDescriptionsMap(deviceId); + = getOrCreateDeviceDescriptionsMap(deviceId); // locking device synchronized (providerDescs) { @@ -465,7 +494,7 @@ public class GossipDeviceStore * Marks the device as available if the given timestamp is not outdated, * compared to the time the device has been marked offline. * - * @param deviceId identifier of the device + * @param deviceId identifier of the device * @param timestamp of the event triggering this change. * @return true if availability change request was accepted and changed the state */ @@ -475,7 +504,7 @@ public class GossipDeviceStore // the latest offline request Timestamp Timestamp offlineTimestamp = offline.get(deviceId); if (offlineTimestamp == null || - offlineTimestamp.compareTo(timestamp) < 0) { + offlineTimestamp.compareTo(timestamp) < 0) { offline.remove(deviceId); return availableDevices.add(deviceId); @@ -485,8 +514,8 @@ public class GossipDeviceStore @Override public synchronized List updatePorts(ProviderId providerId, - DeviceId deviceId, - List portDescriptions) { + DeviceId deviceId, + List portDescriptions) { NodeId localNode = clusterService.getLocalNode().id(); // TODO: It might be negligible, but this will have negative impact to topology discovery performance, @@ -544,7 +573,7 @@ public class GossipDeviceStore if (!deviceEvents.isEmpty()) { log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}", - providerId, deviceId); + providerId, deviceId); notifyPeers(new InternalPortEvent(providerId, deviceId, merged)); } @@ -572,8 +601,8 @@ public class GossipDeviceStore } private List updatePortsInternal(ProviderId providerId, - DeviceId deviceId, - Timestamped> portDescriptions) { + DeviceId deviceId, + Timestamped> portDescriptions) { Device device = devices.get(deviceId); checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); @@ -592,8 +621,8 @@ public class GossipDeviceStore DeviceDescriptions descs = descsMap.get(providerId); // every provider must provide DeviceDescription. checkArgument(descs != null, - "Device description for Device ID %s from Provider %s was not found", - deviceId, providerId); + "Device description for Device ID %s from Provider %s was not found", + deviceId, providerId); Map ports = getPortMap(deviceId); @@ -611,11 +640,11 @@ public class GossipDeviceStore final Timestamped existingPortDesc = descs.getPortDesc(number); if (existingPortDesc == null || - newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) { + newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) { // on new port or valid update // update description descs.putPortDesc(new Timestamped<>(portDescription, - portDescriptions.timestamp())); + portDescriptions.timestamp())); newPort = composePort(device, number, descsMap); } else { // outdated event, ignored. @@ -680,7 +709,7 @@ public class GossipDeviceStore // exist, it creates and registers a new one. private ConcurrentMap getPortMap(DeviceId deviceId) { return createIfAbsentUnchecked(devicePorts, deviceId, - NewConcurrentHashMap.ifNeeded()); + NewConcurrentHashMap.ifNeeded()); } private Map getOrCreateDeviceDescriptionsMap( @@ -702,7 +731,6 @@ public class GossipDeviceStore private DeviceDescriptions getOrCreateProviderDeviceDescriptions( Map device, ProviderId providerId, Timestamped deltaDesc) { - synchronized (device) { DeviceDescriptions r = device.get(providerId); if (r == null) { @@ -728,26 +756,25 @@ public class GossipDeviceStore return null; } final Timestamped deltaDesc - = new Timestamped<>(portDescription, newTimestamp); + = new Timestamped<>(portDescription, newTimestamp); final DeviceEvent event; final Timestamped mergedDesc; final Map device = getOrCreateDeviceDescriptionsMap(deviceId); synchronized (device) { event = updatePortStatusInternal(providerId, deviceId, deltaDesc); mergedDesc = device.get(providerId) - .getPortDesc(portDescription.portNumber()); + .getPortDesc(portDescription.portNumber()); } if (event != null) { log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}", - providerId, deviceId); + providerId, deviceId); notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc)); } return event; } private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId, - Timestamped deltaDesc) { - + Timestamped deltaDesc) { Device device = devices.get(deviceId); checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); @@ -764,8 +791,8 @@ public class GossipDeviceStore DeviceDescriptions descs = descsMap.get(providerId); // assuming all providers must to give DeviceDescription verify(descs != null, - "Device description for Device ID %s from Provider %s was not found", - deviceId, providerId); + "Device description for Device ID %s from Provider %s was not found", + deviceId, providerId); ConcurrentMap ports = getPortMap(deviceId); final PortNumber number = deltaDesc.value().portNumber(); @@ -774,7 +801,7 @@ public class GossipDeviceStore final Timestamped existingPortDesc = descs.getPortDesc(number); if (existingPortDesc == null || - deltaDesc.isNewer(existingPortDesc)) { + deltaDesc.isNewer(existingPortDesc)) { // on new port or valid update // update description descs.putPortDesc(deltaDesc); @@ -805,24 +832,21 @@ public class GossipDeviceStore @Override public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId, Collection portStats) { - - ConcurrentMap statsMap = devicePortStats.get(deviceId); + Map statsMap = devicePortStats.get(deviceId); if (statsMap == null) { - statsMap = Maps.newConcurrentMap(); - devicePortStats.put(deviceId, statsMap); + statsMap = Maps.newHashMap(); } - for (PortStatistics stat: portStats) { + for (PortStatistics stat : portStats) { PortNumber portNumber = PortNumber.portNumber(stat.port()); statsMap.put(portNumber, stat); } - - return new DeviceEvent(PORT_STATS_UPDATED, devices.get(deviceId), null); + devicePortStats.put(deviceId, statsMap); + return null; // new DeviceEvent(PORT_STATS_UPDATED, devices.get(deviceId), null); } @Override public List getPortStatistics(DeviceId deviceId) { - Map portStats = devicePortStats.get(deviceId); if (portStats == null) { return Collections.emptyList(); @@ -865,7 +889,7 @@ public class GossipDeviceStore if (!myId.equals(master)) { log.debug("{} has control of {}, forwarding remove request", - master, deviceId); + master, deviceId); // TODO check unicast return value clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master); @@ -874,7 +898,7 @@ public class GossipDeviceStore */ // event will be triggered after master processes it. - return null; + return null; } // I have control.. @@ -883,7 +907,7 @@ public class GossipDeviceStore DeviceEvent event = removeDeviceInternal(deviceId, timestamp); if (event != null) { log.debug("Notifying peers of a device removed topology event for deviceId: {}", - deviceId); + deviceId); notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp)); } if (relinquishAtEnd) { @@ -917,7 +941,7 @@ public class GossipDeviceStore markOfflineInternal(deviceId, timestamp); descs.clear(); return device == null ? null : - new DeviceEvent(DEVICE_REMOVED, device, null); + new DeviceEvent(DEVICE_REMOVED, device, null); } } @@ -925,14 +949,14 @@ public class GossipDeviceStore * Checks if given timestamp is superseded by removal request * with more recent timestamp. * - * @param deviceId identifier of a device + * @param deviceId identifier of a device * @param timestampToCheck timestamp of an event to check * @return true if device is already removed */ private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) { Timestamp removalTimestamp = removalRequest.get(deviceId); if (removalTimestamp != null && - removalTimestamp.compareTo(timestampToCheck) >= 0) { + removalTimestamp.compareTo(timestampToCheck) >= 0) { // removalRequest is more recent return true; } @@ -942,12 +966,12 @@ public class GossipDeviceStore /** * Returns a Device, merging description given from multiple Providers. * - * @param deviceId device identifier + * @param deviceId device identifier * @param providerDescs Collection of Descriptions from multiple providers * @return Device instance */ private Device composeDevice(DeviceId deviceId, - Map providerDescs) { + Map providerDescs) { checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied"); @@ -978,21 +1002,21 @@ public class GossipDeviceStore annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations()); } - return new DefaultDevice(primary, deviceId , type, manufacturer, - hwVersion, swVersion, serialNumber, - chassisId, annotations); + return new DefaultDevice(primary, deviceId, type, manufacturer, + hwVersion, swVersion, serialNumber, + chassisId, annotations); } /** * Returns a Port, merging description given from multiple Providers. * - * @param device device the port is on - * @param number port number + * @param device device the port is on + * @param number port number * @param descsMap Collection of Descriptions from multiple providers * @return Port instance */ private Port composePort(Device device, PortNumber number, - Map descsMap) { + Map descsMap) { ProviderId primary = pickPrimaryPID(descsMap); DeviceDescriptions primDescs = descsMap.get(primary); @@ -1028,12 +1052,12 @@ public class GossipDeviceStore case OMS: OmsPortDescription omsPortDesc = (OmsPortDescription) otherPortDesc.value(); updated = new OmsPort(device, number, isEnabled, omsPortDesc.minFrequency(), - omsPortDesc.maxFrequency(), omsPortDesc.grid(), annotations); + omsPortDesc.maxFrequency(), omsPortDesc.grid(), annotations); break; case OCH: OchPortDescription ochPortDesc = (OchPortDescription) otherPortDesc.value(); updated = new OchPort(device, number, isEnabled, ochPortDesc.signalType(), - ochPortDesc.isTunable(), ochPortDesc.lambda(), annotations); + ochPortDesc.isTunable(), ochPortDesc.lambda(), annotations); break; case ODUCLT: OduCltPortDescription oduCltPortDesc = (OduCltPortDescription) otherPortDesc.value(); @@ -1073,7 +1097,7 @@ public class GossipDeviceStore } private DeviceDescriptions getPrimaryDescriptions( - Map providerDescs) { + Map providerDescs) { ProviderId pid = pickPrimaryPID(providerDescs); return providerDescs.get(pid); } @@ -1174,14 +1198,14 @@ public class GossipDeviceStore final DeviceDescriptions descs = prov.getValue(); adDevices.put(new DeviceFragmentId(deviceId, provId), - descs.getDeviceDesc().timestamp()); + descs.getDeviceDesc().timestamp()); for (Entry> portDesc : descs.getPortDescs().entrySet()) { final PortNumber number = portDesc.getKey(); adPorts.put(new PortFragmentId(deviceId, provId, number), - portDesc.getValue().timestamp()); + portDesc.getValue().timestamp()); } } } @@ -1192,7 +1216,7 @@ public class GossipDeviceStore /** * Responds to anti-entropy advertisement message. - *

+ *

* Notify sender about out-dated information using regular replication message. * Send back advertisement to sender if not in sync. * @@ -1269,7 +1293,7 @@ public class GossipDeviceStore // find latest and update final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp(); if (localLatest == null || - providerLatest.compareTo(localLatest) > 0) { + providerLatest.compareTo(localLatest) > 0) { localLatest = providerLatest; } } // end local provider loop @@ -1277,7 +1301,7 @@ public class GossipDeviceStore // checking if remote timestamp is more recent. Timestamp rOffline = offlineAds.get(deviceId); if (rOffline != null && - rOffline.compareTo(localLatest) > 0) { + rOffline.compareTo(localLatest) > 0) { // remote offline timestamp suggests that the // device is off-line markOfflineInternal(deviceId, rOffline); @@ -1386,7 +1410,6 @@ public class GossipDeviceStore implements ClusterMessageHandler { @Override public void handle(ClusterMessage message) { - log.debug("Received device update event from peer: {}", message.sender()); InternalDeviceEvent event = SERIALIZER.decode(message.payload()); @@ -1406,7 +1429,6 @@ public class GossipDeviceStore implements ClusterMessageHandler { @Override public void handle(ClusterMessage message) { - log.debug("Received device offline event from peer: {}", message.sender()); InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload()); @@ -1440,7 +1462,6 @@ public class GossipDeviceStore implements ClusterMessageHandler { @Override public void handle(ClusterMessage message) { - log.debug("Received device removed event from peer: {}", message.sender()); InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload()); @@ -1508,8 +1529,7 @@ public class GossipDeviceStore } private final class InternalDeviceAdvertisementListener - implements ClusterMessageHandler { - + implements ClusterMessageHandler { @Override public void handle(ClusterMessage message) { log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender()); @@ -1526,7 +1546,6 @@ public class GossipDeviceStore implements ClusterMessageHandler { @Override public void handle(ClusterMessage message) { - log.debug("Received injected device event from peer: {}", message.sender()); DeviceInjectedEvent event = SERIALIZER.decode(message.payload()); @@ -1551,7 +1570,6 @@ public class GossipDeviceStore implements ClusterMessageHandler { @Override public void handle(ClusterMessage message) { - log.debug("Received injected port event from peer: {}", message.sender()); PortInjectedEvent event = SERIALIZER.decode(message.payload()); @@ -1571,4 +1589,17 @@ public class GossipDeviceStore } } } + + private class InternalPortStatsListener + implements EventuallyConsistentMapListener> { + @Override + public void event(EventuallyConsistentMapEvent> event) { + if (event.type() == PUT) { + Device device = devices.get(event.key()); + if (device != null) { + delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device)); + } + } + } + } } diff --git a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java index dc08e7644a..5d4a91f404 100644 --- a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java +++ b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java @@ -55,6 +55,7 @@ import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.cluster.messaging.ClusterMessage; import org.onosproject.store.cluster.messaging.ClusterMessageHandler; import org.onosproject.store.cluster.messaging.MessageSubject; +import org.onosproject.store.consistent.impl.DatabaseManager; import java.io.IOException; import java.util.Arrays; @@ -157,7 +158,7 @@ public class GossipDeviceStoreTest { clusterCommunicator = createNiceMock(ClusterCommunicationService.class); clusterCommunicator.addSubscriber(anyObject(MessageSubject.class), - anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class)); + anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class)); expectLastCall().anyTimes(); replay(clusterCommunicator); ClusterService clusterService = new TestClusterService(); @@ -165,6 +166,10 @@ public class GossipDeviceStoreTest { testGossipDeviceStore = new TestGossipDeviceStore(deviceClockService, clusterService, clusterCommunicator); testGossipDeviceStore.mastershipService = new TestMastershipService(); + TestDatabaseManager testDatabaseManager = new TestDatabaseManager(); + testDatabaseManager.init(clusterService, clusterCommunicator); + testGossipDeviceStore.storageService = testDatabaseManager; + gossipDeviceStore = testGossipDeviceStore; gossipDeviceStore.activate(); deviceStore = gossipDeviceStore; @@ -885,4 +890,12 @@ public class GossipDeviceStoreTest { nodeStates.put(NID2, ACTIVE); } } + + private class TestDatabaseManager extends DatabaseManager { + void init(ClusterService clusterService, + ClusterCommunicationService clusterCommunicator) { + this.clusterService = clusterService; + this.clusterCommunicator = clusterCommunicator; + } + } } diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java index 9356a29d08..b3e7a75b9b 100644 --- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java +++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java @@ -71,9 +71,11 @@ import org.onosproject.net.Port; import org.onosproject.net.PortNumber; import org.onosproject.net.device.DefaultDeviceDescription; import org.onosproject.net.device.DefaultPortDescription; +import org.onosproject.net.device.DefaultPortStatistics; import org.onosproject.net.device.OchPortDescription; import org.onosproject.net.device.OduCltPortDescription; import org.onosproject.net.device.OmsPortDescription; +import org.onosproject.net.device.PortStatistics; import org.onosproject.net.flow.CompletedBatchOperation; import org.onosproject.net.flow.DefaultFlowEntry; import org.onosproject.net.flow.DefaultFlowRule; @@ -380,7 +382,9 @@ public final class KryoNamespaces { IntentOperation.class, FlowRuleExtPayLoad.class, Frequency.class, - DefaultAnnotations.class + DefaultAnnotations.class, + PortStatistics.class, + DefaultPortStatistics.class ) .register(new DefaultApplicationIdSerializer(), DefaultApplicationId.class) .register(new URISerializer(), URI.class) diff --git a/tools/test/bin/onos-check-apps b/tools/test/bin/onos-check-apps index 03df7dc055..9760a41d55 100755 --- a/tools/test/bin/onos-check-apps +++ b/tools/test/bin/onos-check-apps @@ -14,7 +14,7 @@ cut -c7- $aux | cut -d\ -f1 | sort > $aux.1 # Normalize the expected apps apps=${ONOS_APPS:-drivers,openflow} -(for app in ${apps/,/ }; do echo org.onosproject.$app; done) | sort > $aux.2 +(for app in ${apps//,/ }; do echo org.onosproject.$app; done) | sort > $aux.2 # Check for differences diff $aux.1 $aux.2 diff --git a/tools/test/cells/madan3 b/tools/test/cells/madan3 new file mode 100644 index 0000000000..857d096e98 --- /dev/null +++ b/tools/test/cells/madan3 @@ -0,0 +1,7 @@ +# Madan's ProxMox ONOS instances 1,2,3 & ONOS mininet box + +export ONOS_NIC="10.128.4.*" +export OC1="10.128.4.2" +export OC2="10.128.4.3" +export OC3="10.128.4.4" +export OCN="10.128.4.5" diff --git a/tools/test/cells/tomx b/tools/test/cells/tomx index 48018dea93..b05544953b 100644 --- a/tools/test/cells/tomx +++ b/tools/test/cells/tomx @@ -1,6 +1,6 @@ -# Office ProxMox ONOS instances 1,2,3 & ONOS mininet box +# Tom's ProxMox ONOS instances 1,2,3 & ONOS mininet box -export ONOS_NIC=10.128.11.* +export ONOS_NIC="10.128.11.*" export OC1="10.128.11.1" export OC2="10.128.11.2" export OC3="10.128.11.3" diff --git a/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandlerBase.java b/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandlerBase.java index 71eb1fcd71..c19e8bcb32 100644 --- a/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandlerBase.java +++ b/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandlerBase.java @@ -850,7 +850,7 @@ public abstract class TopologyViewMessageHandlerBase extends UiMessageHandler { if (load != null) { this.hasTraffic = hasTraffic || load.rate() > threshold; this.bytes += load.latest(); - this.rate = load.rate(); + this.rate += load.rate(); } }