Adding multi-instance support for flow stats.

Change-Id: I428c5a7cb58f4f9773a125fc94fb368ed846cb0d
This commit is contained in:
Thomas Vachuska 2015-05-29 15:53:01 -07:00 committed by Gerrit Code Review
parent 69d6ac7e9f
commit fdbc4c27b4
8 changed files with 156 additions and 85 deletions

View File

@ -61,6 +61,22 @@ public final class DefaultPortStatistics implements PortStatistics {
this.durationNano = durationNano; this.durationNano = durationNano;
} }
// Constructor for serializer
private DefaultPortStatistics() {
this.deviceId = null;
this.port = 0;
this.packetsReceived = 0;
this.packetsSent = 0;
this.bytesReceived = 0;
this.bytesSent = 0;
this.packetsRxDropped = 0;
this.packetsTxDropped = 0;
this.packetsRxErrors = 0;
this.packetsTxErrors = 0;
this.durationSec = 0;
this.durationNano = 0;
}
/** /**
* Creates a builder for DefaultPortStatistics object. * Creates a builder for DefaultPortStatistics object.
* *

View File

@ -50,6 +50,7 @@ import org.onosproject.net.OduCltPort;
import org.onosproject.net.OmsPort; import org.onosproject.net.OmsPort;
import org.onosproject.net.Port; import org.onosproject.net.Port;
import org.onosproject.net.PortNumber; import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DefaultPortStatistics;
import org.onosproject.net.device.DeviceClockService; import org.onosproject.net.device.DeviceClockService;
import org.onosproject.net.device.DeviceDescription; import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceEvent; import org.onosproject.net.device.DeviceEvent;
@ -68,8 +69,16 @@ import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler; import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject; import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.Timestamped; import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer; import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.custom.DistributedStoreSerializers; import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.onosproject.store.service.WallclockClockManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.io.IOException; import java.io.IOException;
@ -102,6 +111,7 @@ import static org.onosproject.net.DefaultAnnotations.merge;
import static org.onosproject.net.device.DeviceEvent.Type.*; import static org.onosproject.net.device.DeviceEvent.Type.*;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED; import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*; import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
/** /**
@ -123,13 +133,15 @@ public class GossipDeviceStore
// innerMap is used to lock a Device, thus instance should never be replaced. // innerMap is used to lock a Device, thus instance should never be replaced.
// collection of Description given from various providers // collection of Description given from various providers
private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>> private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
deviceDescs = Maps.newConcurrentMap(); deviceDescs = Maps.newConcurrentMap();
// cache of Device and Ports generated by compositing descriptions from providers // cache of Device and Ports generated by compositing descriptions from providers
private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap(); private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap(); private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, PortStatistics>>
devicePortStats = Maps.newConcurrentMap(); private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>>
portStatsListener = new InternalPortStatsListener();
// to be updated under Device lock // to be updated under Device lock
private final Map<DeviceId, Timestamp> offline = Maps.newHashMap(); private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
@ -141,6 +153,9 @@ public class GossipDeviceStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceClockService deviceClockService; protected DeviceClockService deviceClockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator; protected ClusterCommunicationService clusterCommunicator;
@ -182,10 +197,8 @@ public class GossipDeviceStore
private long initialDelaySec = 5; private long initialDelaySec = 5;
private long periodSec = 5; private long periodSec = 5;
@Activate @Activate
public void activate() { public void activate() {
executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d")); executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
backgroundExecutor = backgroundExecutor =
@ -198,8 +211,8 @@ public class GossipDeviceStore
new InternalDeviceOfflineEventListener(), new InternalDeviceOfflineEventListener(),
executor); executor);
clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ, clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
new InternalRemoveRequestListener(), new InternalRemoveRequestListener(),
executor); executor);
clusterCommunicator.addSubscriber( clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor); GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
clusterCommunicator.addSubscriber( clusterCommunicator.addSubscriber(
@ -217,8 +230,24 @@ public class GossipDeviceStore
// start anti-entropy thread // start anti-entropy thread
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(), backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec, TimeUnit.SECONDS); initialDelaySec, periodSec, TimeUnit.SECONDS);
// Create a distributed map for port stats.
KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(DefaultPortStatistics.class)
.register(DeviceId.class)
.register(MultiValuedTimestamp.class)
.register(WallClockTimestamp.class);
devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
.withName("port-stats")
.withSerializer(deviceDataSerializer)
.withAntiEntropyPeriod(5, TimeUnit.SECONDS)
.withClockService(new WallclockClockManager<>())
.withTombstonesDisabled()
.build();
devicePortStats.addListener(portStatsListener);
log.info("Started"); log.info("Started");
} }
@ -272,8 +301,8 @@ public class GossipDeviceStore
@Override @Override
public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
DeviceId deviceId, DeviceId deviceId,
DeviceDescription deviceDescription) { DeviceDescription deviceDescription) {
NodeId localNode = clusterService.getLocalNode().id(); NodeId localNode = clusterService.getLocalNode().id();
NodeId deviceNode = mastershipService.getMasterFor(deviceId); NodeId deviceNode = mastershipService.getMasterFor(deviceId);
@ -294,7 +323,7 @@ public class GossipDeviceStore
if (deviceEvent != null) { if (deviceEvent != null) {
log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}", log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
providerId, deviceId); providerId, deviceId);
notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc)); notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
} }
@ -324,12 +353,12 @@ public class GossipDeviceStore
} }
private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId, private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
DeviceId deviceId, DeviceId deviceId,
Timestamped<DeviceDescription> deltaDesc) { Timestamped<DeviceDescription> deltaDesc) {
// Collection of DeviceDescriptions for a Device // Collection of DeviceDescriptions for a Device
Map<ProviderId, DeviceDescriptions> device Map<ProviderId, DeviceDescriptions> device
= getOrCreateDeviceDescriptionsMap(deviceId); = getOrCreateDeviceDescriptionsMap(deviceId);
synchronized (device) { synchronized (device) {
// locking per device // locking per device
@ -345,7 +374,7 @@ public class GossipDeviceStore
final Device newDevice; final Device newDevice;
if (deltaDesc == descs.getDeviceDesc() || if (deltaDesc == descs.getDeviceDesc() ||
deltaDesc.isNewer(descs.getDeviceDesc())) { deltaDesc.isNewer(descs.getDeviceDesc())) {
// on new device or valid update // on new device or valid update
descs.putDeviceDesc(deltaDesc); descs.putDeviceDesc(deltaDesc);
newDevice = composeDevice(deviceId, device); newDevice = composeDevice(deviceId, device);
@ -371,8 +400,8 @@ public class GossipDeviceStore
// update composed device cache // update composed device cache
Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice); Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
verify(oldDevice == null, verify(oldDevice == null,
"Unexpected Device in cache. PID:%s [old=%s, new=%s]", "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
providerId, oldDevice, newDevice); providerId, oldDevice, newDevice);
if (!providerId.isAncillary()) { if (!providerId.isAncillary()) {
markOnline(newDevice.id(), timestamp); markOnline(newDevice.id(), timestamp);
@ -401,8 +430,8 @@ public class GossipDeviceStore
boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice); boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
if (!replaced) { if (!replaced) {
verify(replaced, verify(replaced,
"Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]", "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
providerId, oldDevice, devices.get(newDevice.id()) providerId, oldDevice, devices.get(newDevice.id())
, newDevice); , newDevice);
} }
if (!providerId.isAncillary()) { if (!providerId.isAncillary()) {
@ -424,7 +453,7 @@ public class GossipDeviceStore
final DeviceEvent event = markOfflineInternal(deviceId, timestamp); final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
if (event != null) { if (event != null) {
log.info("Notifying peers of a device offline topology event for deviceId: {} {}", log.info("Notifying peers of a device offline topology event for deviceId: {} {}",
deviceId, timestamp); deviceId, timestamp);
notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp)); notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
} }
return event; return event;
@ -433,7 +462,7 @@ public class GossipDeviceStore
private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) { private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
Map<ProviderId, DeviceDescriptions> providerDescs Map<ProviderId, DeviceDescriptions> providerDescs
= getOrCreateDeviceDescriptionsMap(deviceId); = getOrCreateDeviceDescriptionsMap(deviceId);
// locking device // locking device
synchronized (providerDescs) { synchronized (providerDescs) {
@ -465,7 +494,7 @@ public class GossipDeviceStore
* Marks the device as available if the given timestamp is not outdated, * Marks the device as available if the given timestamp is not outdated,
* compared to the time the device has been marked offline. * compared to the time the device has been marked offline.
* *
* @param deviceId identifier of the device * @param deviceId identifier of the device
* @param timestamp of the event triggering this change. * @param timestamp of the event triggering this change.
* @return true if availability change request was accepted and changed the state * @return true if availability change request was accepted and changed the state
*/ */
@ -475,7 +504,7 @@ public class GossipDeviceStore
// the latest offline request Timestamp // the latest offline request Timestamp
Timestamp offlineTimestamp = offline.get(deviceId); Timestamp offlineTimestamp = offline.get(deviceId);
if (offlineTimestamp == null || if (offlineTimestamp == null ||
offlineTimestamp.compareTo(timestamp) < 0) { offlineTimestamp.compareTo(timestamp) < 0) {
offline.remove(deviceId); offline.remove(deviceId);
return availableDevices.add(deviceId); return availableDevices.add(deviceId);
@ -485,8 +514,8 @@ public class GossipDeviceStore
@Override @Override
public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
DeviceId deviceId, DeviceId deviceId,
List<PortDescription> portDescriptions) { List<PortDescription> portDescriptions) {
NodeId localNode = clusterService.getLocalNode().id(); NodeId localNode = clusterService.getLocalNode().id();
// TODO: It might be negligible, but this will have negative impact to topology discovery performance, // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
@ -544,7 +573,7 @@ public class GossipDeviceStore
if (!deviceEvents.isEmpty()) { if (!deviceEvents.isEmpty()) {
log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}", log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
providerId, deviceId); providerId, deviceId);
notifyPeers(new InternalPortEvent(providerId, deviceId, merged)); notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
} }
@ -572,8 +601,8 @@ public class GossipDeviceStore
} }
private List<DeviceEvent> updatePortsInternal(ProviderId providerId, private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
DeviceId deviceId, DeviceId deviceId,
Timestamped<List<PortDescription>> portDescriptions) { Timestamped<List<PortDescription>> portDescriptions) {
Device device = devices.get(deviceId); Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
@ -592,8 +621,8 @@ public class GossipDeviceStore
DeviceDescriptions descs = descsMap.get(providerId); DeviceDescriptions descs = descsMap.get(providerId);
// every provider must provide DeviceDescription. // every provider must provide DeviceDescription.
checkArgument(descs != null, checkArgument(descs != null,
"Device description for Device ID %s from Provider %s was not found", "Device description for Device ID %s from Provider %s was not found",
deviceId, providerId); deviceId, providerId);
Map<PortNumber, Port> ports = getPortMap(deviceId); Map<PortNumber, Port> ports = getPortMap(deviceId);
@ -611,11 +640,11 @@ public class GossipDeviceStore
final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number); final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
if (existingPortDesc == null || if (existingPortDesc == null ||
newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) { newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
// on new port or valid update // on new port or valid update
// update description // update description
descs.putPortDesc(new Timestamped<>(portDescription, descs.putPortDesc(new Timestamped<>(portDescription,
portDescriptions.timestamp())); portDescriptions.timestamp()));
newPort = composePort(device, number, descsMap); newPort = composePort(device, number, descsMap);
} else { } else {
// outdated event, ignored. // outdated event, ignored.
@ -680,7 +709,7 @@ public class GossipDeviceStore
// exist, it creates and registers a new one. // exist, it creates and registers a new one.
private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) { private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
return createIfAbsentUnchecked(devicePorts, deviceId, return createIfAbsentUnchecked(devicePorts, deviceId,
NewConcurrentHashMap.<PortNumber, Port>ifNeeded()); NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
} }
private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap( private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
@ -702,7 +731,6 @@ public class GossipDeviceStore
private DeviceDescriptions getOrCreateProviderDeviceDescriptions( private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
Map<ProviderId, DeviceDescriptions> device, Map<ProviderId, DeviceDescriptions> device,
ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) { ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
synchronized (device) { synchronized (device) {
DeviceDescriptions r = device.get(providerId); DeviceDescriptions r = device.get(providerId);
if (r == null) { if (r == null) {
@ -728,26 +756,25 @@ public class GossipDeviceStore
return null; return null;
} }
final Timestamped<PortDescription> deltaDesc final Timestamped<PortDescription> deltaDesc
= new Timestamped<>(portDescription, newTimestamp); = new Timestamped<>(portDescription, newTimestamp);
final DeviceEvent event; final DeviceEvent event;
final Timestamped<PortDescription> mergedDesc; final Timestamped<PortDescription> mergedDesc;
final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId); final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
synchronized (device) { synchronized (device) {
event = updatePortStatusInternal(providerId, deviceId, deltaDesc); event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
mergedDesc = device.get(providerId) mergedDesc = device.get(providerId)
.getPortDesc(portDescription.portNumber()); .getPortDesc(portDescription.portNumber());
} }
if (event != null) { if (event != null) {
log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}", log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
providerId, deviceId); providerId, deviceId);
notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc)); notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
} }
return event; return event;
} }
private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId, private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
Timestamped<PortDescription> deltaDesc) { Timestamped<PortDescription> deltaDesc) {
Device device = devices.get(deviceId); Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
@ -764,8 +791,8 @@ public class GossipDeviceStore
DeviceDescriptions descs = descsMap.get(providerId); DeviceDescriptions descs = descsMap.get(providerId);
// assuming all providers must to give DeviceDescription // assuming all providers must to give DeviceDescription
verify(descs != null, verify(descs != null,
"Device description for Device ID %s from Provider %s was not found", "Device description for Device ID %s from Provider %s was not found",
deviceId, providerId); deviceId, providerId);
ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId); ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
final PortNumber number = deltaDesc.value().portNumber(); final PortNumber number = deltaDesc.value().portNumber();
@ -774,7 +801,7 @@ public class GossipDeviceStore
final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number); final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
if (existingPortDesc == null || if (existingPortDesc == null ||
deltaDesc.isNewer(existingPortDesc)) { deltaDesc.isNewer(existingPortDesc)) {
// on new port or valid update // on new port or valid update
// update description // update description
descs.putPortDesc(deltaDesc); descs.putPortDesc(deltaDesc);
@ -805,24 +832,21 @@ public class GossipDeviceStore
@Override @Override
public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId, public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId,
Collection<PortStatistics> portStats) { Collection<PortStatistics> portStats) {
Map<PortNumber, PortStatistics> statsMap = devicePortStats.get(deviceId);
ConcurrentMap<PortNumber, PortStatistics> statsMap = devicePortStats.get(deviceId);
if (statsMap == null) { if (statsMap == null) {
statsMap = Maps.newConcurrentMap(); statsMap = Maps.newHashMap();
devicePortStats.put(deviceId, statsMap);
} }
for (PortStatistics stat: portStats) { for (PortStatistics stat : portStats) {
PortNumber portNumber = PortNumber.portNumber(stat.port()); PortNumber portNumber = PortNumber.portNumber(stat.port());
statsMap.put(portNumber, stat); statsMap.put(portNumber, stat);
} }
devicePortStats.put(deviceId, statsMap);
return new DeviceEvent(PORT_STATS_UPDATED, devices.get(deviceId), null); return null; // new DeviceEvent(PORT_STATS_UPDATED, devices.get(deviceId), null);
} }
@Override @Override
public List<PortStatistics> getPortStatistics(DeviceId deviceId) { public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId); Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
if (portStats == null) { if (portStats == null) {
return Collections.emptyList(); return Collections.emptyList();
@ -865,7 +889,7 @@ public class GossipDeviceStore
if (!myId.equals(master)) { if (!myId.equals(master)) {
log.debug("{} has control of {}, forwarding remove request", log.debug("{} has control of {}, forwarding remove request",
master, deviceId); master, deviceId);
// TODO check unicast return value // TODO check unicast return value
clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master); clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
@ -874,7 +898,7 @@ public class GossipDeviceStore
*/ */
// event will be triggered after master processes it. // event will be triggered after master processes it.
return null; return null;
} }
// I have control.. // I have control..
@ -883,7 +907,7 @@ public class GossipDeviceStore
DeviceEvent event = removeDeviceInternal(deviceId, timestamp); DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
if (event != null) { if (event != null) {
log.debug("Notifying peers of a device removed topology event for deviceId: {}", log.debug("Notifying peers of a device removed topology event for deviceId: {}",
deviceId); deviceId);
notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp)); notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
} }
if (relinquishAtEnd) { if (relinquishAtEnd) {
@ -917,7 +941,7 @@ public class GossipDeviceStore
markOfflineInternal(deviceId, timestamp); markOfflineInternal(deviceId, timestamp);
descs.clear(); descs.clear();
return device == null ? null : return device == null ? null :
new DeviceEvent(DEVICE_REMOVED, device, null); new DeviceEvent(DEVICE_REMOVED, device, null);
} }
} }
@ -925,14 +949,14 @@ public class GossipDeviceStore
* Checks if given timestamp is superseded by removal request * Checks if given timestamp is superseded by removal request
* with more recent timestamp. * with more recent timestamp.
* *
* @param deviceId identifier of a device * @param deviceId identifier of a device
* @param timestampToCheck timestamp of an event to check * @param timestampToCheck timestamp of an event to check
* @return true if device is already removed * @return true if device is already removed
*/ */
private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) { private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
Timestamp removalTimestamp = removalRequest.get(deviceId); Timestamp removalTimestamp = removalRequest.get(deviceId);
if (removalTimestamp != null && if (removalTimestamp != null &&
removalTimestamp.compareTo(timestampToCheck) >= 0) { removalTimestamp.compareTo(timestampToCheck) >= 0) {
// removalRequest is more recent // removalRequest is more recent
return true; return true;
} }
@ -942,12 +966,12 @@ public class GossipDeviceStore
/** /**
* Returns a Device, merging description given from multiple Providers. * Returns a Device, merging description given from multiple Providers.
* *
* @param deviceId device identifier * @param deviceId device identifier
* @param providerDescs Collection of Descriptions from multiple providers * @param providerDescs Collection of Descriptions from multiple providers
* @return Device instance * @return Device instance
*/ */
private Device composeDevice(DeviceId deviceId, private Device composeDevice(DeviceId deviceId,
Map<ProviderId, DeviceDescriptions> providerDescs) { Map<ProviderId, DeviceDescriptions> providerDescs) {
checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied"); checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
@ -978,21 +1002,21 @@ public class GossipDeviceStore
annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations()); annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
} }
return new DefaultDevice(primary, deviceId , type, manufacturer, return new DefaultDevice(primary, deviceId, type, manufacturer,
hwVersion, swVersion, serialNumber, hwVersion, swVersion, serialNumber,
chassisId, annotations); chassisId, annotations);
} }
/** /**
* Returns a Port, merging description given from multiple Providers. * Returns a Port, merging description given from multiple Providers.
* *
* @param device device the port is on * @param device device the port is on
* @param number port number * @param number port number
* @param descsMap Collection of Descriptions from multiple providers * @param descsMap Collection of Descriptions from multiple providers
* @return Port instance * @return Port instance
*/ */
private Port composePort(Device device, PortNumber number, private Port composePort(Device device, PortNumber number,
Map<ProviderId, DeviceDescriptions> descsMap) { Map<ProviderId, DeviceDescriptions> descsMap) {
ProviderId primary = pickPrimaryPID(descsMap); ProviderId primary = pickPrimaryPID(descsMap);
DeviceDescriptions primDescs = descsMap.get(primary); DeviceDescriptions primDescs = descsMap.get(primary);
@ -1028,12 +1052,12 @@ public class GossipDeviceStore
case OMS: case OMS:
OmsPortDescription omsPortDesc = (OmsPortDescription) otherPortDesc.value(); OmsPortDescription omsPortDesc = (OmsPortDescription) otherPortDesc.value();
updated = new OmsPort(device, number, isEnabled, omsPortDesc.minFrequency(), updated = new OmsPort(device, number, isEnabled, omsPortDesc.minFrequency(),
omsPortDesc.maxFrequency(), omsPortDesc.grid(), annotations); omsPortDesc.maxFrequency(), omsPortDesc.grid(), annotations);
break; break;
case OCH: case OCH:
OchPortDescription ochPortDesc = (OchPortDescription) otherPortDesc.value(); OchPortDescription ochPortDesc = (OchPortDescription) otherPortDesc.value();
updated = new OchPort(device, number, isEnabled, ochPortDesc.signalType(), updated = new OchPort(device, number, isEnabled, ochPortDesc.signalType(),
ochPortDesc.isTunable(), ochPortDesc.lambda(), annotations); ochPortDesc.isTunable(), ochPortDesc.lambda(), annotations);
break; break;
case ODUCLT: case ODUCLT:
OduCltPortDescription oduCltPortDesc = (OduCltPortDescription) otherPortDesc.value(); OduCltPortDescription oduCltPortDesc = (OduCltPortDescription) otherPortDesc.value();
@ -1073,7 +1097,7 @@ public class GossipDeviceStore
} }
private DeviceDescriptions getPrimaryDescriptions( private DeviceDescriptions getPrimaryDescriptions(
Map<ProviderId, DeviceDescriptions> providerDescs) { Map<ProviderId, DeviceDescriptions> providerDescs) {
ProviderId pid = pickPrimaryPID(providerDescs); ProviderId pid = pickPrimaryPID(providerDescs);
return providerDescs.get(pid); return providerDescs.get(pid);
} }
@ -1174,14 +1198,14 @@ public class GossipDeviceStore
final DeviceDescriptions descs = prov.getValue(); final DeviceDescriptions descs = prov.getValue();
adDevices.put(new DeviceFragmentId(deviceId, provId), adDevices.put(new DeviceFragmentId(deviceId, provId),
descs.getDeviceDesc().timestamp()); descs.getDeviceDesc().timestamp());
for (Entry<PortNumber, Timestamped<PortDescription>> for (Entry<PortNumber, Timestamped<PortDescription>>
portDesc : descs.getPortDescs().entrySet()) { portDesc : descs.getPortDescs().entrySet()) {
final PortNumber number = portDesc.getKey(); final PortNumber number = portDesc.getKey();
adPorts.put(new PortFragmentId(deviceId, provId, number), adPorts.put(new PortFragmentId(deviceId, provId, number),
portDesc.getValue().timestamp()); portDesc.getValue().timestamp());
} }
} }
} }
@ -1192,7 +1216,7 @@ public class GossipDeviceStore
/** /**
* Responds to anti-entropy advertisement message. * Responds to anti-entropy advertisement message.
* <P> * <p/>
* Notify sender about out-dated information using regular replication message. * Notify sender about out-dated information using regular replication message.
* Send back advertisement to sender if not in sync. * Send back advertisement to sender if not in sync.
* *
@ -1269,7 +1293,7 @@ public class GossipDeviceStore
// find latest and update // find latest and update
final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp(); final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
if (localLatest == null || if (localLatest == null ||
providerLatest.compareTo(localLatest) > 0) { providerLatest.compareTo(localLatest) > 0) {
localLatest = providerLatest; localLatest = providerLatest;
} }
} // end local provider loop } // end local provider loop
@ -1277,7 +1301,7 @@ public class GossipDeviceStore
// checking if remote timestamp is more recent. // checking if remote timestamp is more recent.
Timestamp rOffline = offlineAds.get(deviceId); Timestamp rOffline = offlineAds.get(deviceId);
if (rOffline != null && if (rOffline != null &&
rOffline.compareTo(localLatest) > 0) { rOffline.compareTo(localLatest) > 0) {
// remote offline timestamp suggests that the // remote offline timestamp suggests that the
// device is off-line // device is off-line
markOfflineInternal(deviceId, rOffline); markOfflineInternal(deviceId, rOffline);
@ -1386,7 +1410,6 @@ public class GossipDeviceStore
implements ClusterMessageHandler { implements ClusterMessageHandler {
@Override @Override
public void handle(ClusterMessage message) { public void handle(ClusterMessage message) {
log.debug("Received device update event from peer: {}", message.sender()); log.debug("Received device update event from peer: {}", message.sender());
InternalDeviceEvent event = SERIALIZER.decode(message.payload()); InternalDeviceEvent event = SERIALIZER.decode(message.payload());
@ -1406,7 +1429,6 @@ public class GossipDeviceStore
implements ClusterMessageHandler { implements ClusterMessageHandler {
@Override @Override
public void handle(ClusterMessage message) { public void handle(ClusterMessage message) {
log.debug("Received device offline event from peer: {}", message.sender()); log.debug("Received device offline event from peer: {}", message.sender());
InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload()); InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
@ -1440,7 +1462,6 @@ public class GossipDeviceStore
implements ClusterMessageHandler { implements ClusterMessageHandler {
@Override @Override
public void handle(ClusterMessage message) { public void handle(ClusterMessage message) {
log.debug("Received device removed event from peer: {}", message.sender()); log.debug("Received device removed event from peer: {}", message.sender());
InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload()); InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
@ -1508,8 +1529,7 @@ public class GossipDeviceStore
} }
private final class InternalDeviceAdvertisementListener private final class InternalDeviceAdvertisementListener
implements ClusterMessageHandler { implements ClusterMessageHandler {
@Override @Override
public void handle(ClusterMessage message) { public void handle(ClusterMessage message) {
log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender()); log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
@ -1526,7 +1546,6 @@ public class GossipDeviceStore
implements ClusterMessageHandler { implements ClusterMessageHandler {
@Override @Override
public void handle(ClusterMessage message) { public void handle(ClusterMessage message) {
log.debug("Received injected device event from peer: {}", message.sender()); log.debug("Received injected device event from peer: {}", message.sender());
DeviceInjectedEvent event = SERIALIZER.decode(message.payload()); DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
@ -1551,7 +1570,6 @@ public class GossipDeviceStore
implements ClusterMessageHandler { implements ClusterMessageHandler {
@Override @Override
public void handle(ClusterMessage message) { public void handle(ClusterMessage message) {
log.debug("Received injected port event from peer: {}", message.sender()); log.debug("Received injected port event from peer: {}", message.sender());
PortInjectedEvent event = SERIALIZER.decode(message.payload()); PortInjectedEvent event = SERIALIZER.decode(message.payload());
@ -1571,4 +1589,17 @@ public class GossipDeviceStore
} }
} }
} }
private class InternalPortStatsListener
implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
@Override
public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
if (event.type() == PUT) {
Device device = devices.get(event.key());
if (device != null) {
delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
}
}
}
}
} }

View File

@ -55,6 +55,7 @@ import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage; import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler; import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject; import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.consistent.impl.DatabaseManager;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@ -157,7 +158,7 @@ public class GossipDeviceStoreTest {
clusterCommunicator = createNiceMock(ClusterCommunicationService.class); clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
clusterCommunicator.addSubscriber(anyObject(MessageSubject.class), clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class)); anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
expectLastCall().anyTimes(); expectLastCall().anyTimes();
replay(clusterCommunicator); replay(clusterCommunicator);
ClusterService clusterService = new TestClusterService(); ClusterService clusterService = new TestClusterService();
@ -165,6 +166,10 @@ public class GossipDeviceStoreTest {
testGossipDeviceStore = new TestGossipDeviceStore(deviceClockService, clusterService, clusterCommunicator); testGossipDeviceStore = new TestGossipDeviceStore(deviceClockService, clusterService, clusterCommunicator);
testGossipDeviceStore.mastershipService = new TestMastershipService(); testGossipDeviceStore.mastershipService = new TestMastershipService();
TestDatabaseManager testDatabaseManager = new TestDatabaseManager();
testDatabaseManager.init(clusterService, clusterCommunicator);
testGossipDeviceStore.storageService = testDatabaseManager;
gossipDeviceStore = testGossipDeviceStore; gossipDeviceStore = testGossipDeviceStore;
gossipDeviceStore.activate(); gossipDeviceStore.activate();
deviceStore = gossipDeviceStore; deviceStore = gossipDeviceStore;
@ -885,4 +890,12 @@ public class GossipDeviceStoreTest {
nodeStates.put(NID2, ACTIVE); nodeStates.put(NID2, ACTIVE);
} }
} }
private class TestDatabaseManager extends DatabaseManager {
void init(ClusterService clusterService,
ClusterCommunicationService clusterCommunicator) {
this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator;
}
}
} }

View File

@ -71,9 +71,11 @@ import org.onosproject.net.Port;
import org.onosproject.net.PortNumber; import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DefaultDeviceDescription; import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DefaultPortDescription; import org.onosproject.net.device.DefaultPortDescription;
import org.onosproject.net.device.DefaultPortStatistics;
import org.onosproject.net.device.OchPortDescription; import org.onosproject.net.device.OchPortDescription;
import org.onosproject.net.device.OduCltPortDescription; import org.onosproject.net.device.OduCltPortDescription;
import org.onosproject.net.device.OmsPortDescription; import org.onosproject.net.device.OmsPortDescription;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.flow.CompletedBatchOperation; import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry; import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.DefaultFlowRule; import org.onosproject.net.flow.DefaultFlowRule;
@ -380,7 +382,9 @@ public final class KryoNamespaces {
IntentOperation.class, IntentOperation.class,
FlowRuleExtPayLoad.class, FlowRuleExtPayLoad.class,
Frequency.class, Frequency.class,
DefaultAnnotations.class DefaultAnnotations.class,
PortStatistics.class,
DefaultPortStatistics.class
) )
.register(new DefaultApplicationIdSerializer(), DefaultApplicationId.class) .register(new DefaultApplicationIdSerializer(), DefaultApplicationId.class)
.register(new URISerializer(), URI.class) .register(new URISerializer(), URI.class)

View File

@ -14,7 +14,7 @@ cut -c7- $aux | cut -d\ -f1 | sort > $aux.1
# Normalize the expected apps # Normalize the expected apps
apps=${ONOS_APPS:-drivers,openflow} apps=${ONOS_APPS:-drivers,openflow}
(for app in ${apps/,/ }; do echo org.onosproject.$app; done) | sort > $aux.2 (for app in ${apps//,/ }; do echo org.onosproject.$app; done) | sort > $aux.2
# Check for differences # Check for differences
diff $aux.1 $aux.2 diff $aux.1 $aux.2

7
tools/test/cells/madan3 Normal file
View File

@ -0,0 +1,7 @@
# Madan's ProxMox ONOS instances 1,2,3 & ONOS mininet box
export ONOS_NIC="10.128.4.*"
export OC1="10.128.4.2"
export OC2="10.128.4.3"
export OC3="10.128.4.4"
export OCN="10.128.4.5"

View File

@ -1,6 +1,6 @@
# Office ProxMox ONOS instances 1,2,3 & ONOS mininet box # Tom's ProxMox ONOS instances 1,2,3 & ONOS mininet box
export ONOS_NIC=10.128.11.* export ONOS_NIC="10.128.11.*"
export OC1="10.128.11.1" export OC1="10.128.11.1"
export OC2="10.128.11.2" export OC2="10.128.11.2"
export OC3="10.128.11.3" export OC3="10.128.11.3"

View File

@ -850,7 +850,7 @@ public abstract class TopologyViewMessageHandlerBase extends UiMessageHandler {
if (load != null) { if (load != null) {
this.hasTraffic = hasTraffic || load.rate() > threshold; this.hasTraffic = hasTraffic || load.rate() > threshold;
this.bytes += load.latest(); this.bytes += load.latest();
this.rate = load.rate(); this.rate += load.rate();
} }
} }