From c35efaca4736cc72cba976bc16fb544689b58d7c Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Mon, 6 Oct 2014 14:43:53 -0700 Subject: [PATCH] DeviceStore update - Add off-line/remove handling to Gossip~ - Backport lock scope to Simple~ Change-Id: I5b4c8e12738ef78920341fb8699c4b07bde8712a --- .../store/device/impl/GossipDeviceStore.java | 233 +++++++++++++----- .../store/trivial/impl/SimpleDeviceStore.java | 209 +++++++++------- 2 files changed, 300 insertions(+), 142 deletions(-) diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java index 85d9b07887..8316769c86 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java @@ -2,7 +2,8 @@ package org.onlab.onos.store.device.impl; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; - +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.commons.lang3.concurrent.ConcurrentException; import org.apache.commons.lang3.concurrent.ConcurrentInitializer; import org.apache.felix.scr.annotations.Activate; @@ -59,7 +60,7 @@ import static org.onlab.onos.net.DefaultAnnotations.merge; import static org.onlab.onos.net.DefaultAnnotations.union; import static com.google.common.base.Verify.verify; -// TODO: implement remove event handling and call *Internal +// TODO: give me a better name /** * Manages inventory of infrastructure devices using gossip protocol to distribute * information. @@ -79,14 +80,18 @@ public class GossipDeviceStore // collection of Description given from various providers private final ConcurrentMap> - deviceDescs = new ConcurrentHashMap<>(); + deviceDescs = Maps.newConcurrentMap(); // cache of Device and Ports generated by compositing descriptions from providers - private final ConcurrentMap devices = new ConcurrentHashMap<>(); - private final ConcurrentMap> devicePorts = new ConcurrentHashMap<>(); + private final ConcurrentMap devices = Maps.newConcurrentMap(); + private final ConcurrentMap> devicePorts = Maps.newConcurrentMap(); + + // to be updated under Device lock + private final Map offline = Maps.newHashMap(); + private final Map removalRequest = Maps.newHashMap(); // available(=UP) devices - private final Set availableDevices = new HashSet<>(); + private final Set availableDevices = Sets.newConcurrentHashSet(); @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ClockService clockService; @@ -121,7 +126,8 @@ public class GossipDeviceStore } @Override - public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId, + public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, + DeviceId deviceId, DeviceDescription deviceDescription) { Timestamp newTimestamp = clockService.getTimestamp(deviceId); final Timestamped deltaDesc = new Timestamped<>(deviceDescription, newTimestamp); @@ -133,22 +139,26 @@ public class GossipDeviceStore return event; } - private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId, DeviceId deviceId, - Timestamped deltaDesc) { + private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId, + DeviceId deviceId, + Timestamped deltaDesc) { // Collection of DeviceDescriptions for a Device ConcurrentMap providerDescs = getDeviceDescriptions(deviceId); - - DeviceDescriptions descs - = createIfAbsentUnchecked(providerDescs, providerId, - new InitDeviceDescs(deltaDesc)); - - // update description synchronized (providerDescs) { // locking per device + if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) { + log.debug("Ignoring outdated event: {}", deltaDesc); + return null; + } + + DeviceDescriptions descs + = createIfAbsentUnchecked(providerDescs, providerId, + new InitDeviceDescs(deltaDesc)); + final Device oldDevice = devices.get(deviceId); final Device newDevice; @@ -163,18 +173,18 @@ public class GossipDeviceStore } if (oldDevice == null) { // ADD - return createDevice(providerId, newDevice); + return createDevice(providerId, newDevice, deltaDesc.timestamp()); } else { // UPDATE or ignore (no change or stale) - return updateDevice(providerId, oldDevice, newDevice); + return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp()); } } } // Creates the device and returns the appropriate event if necessary. - // Guarded by deviceDescs value (=locking Device) + // Guarded by deviceDescs value (=Device lock) private DeviceEvent createDevice(ProviderId providerId, - Device newDevice) { + Device newDevice, Timestamp timestamp) { // update composed device cache Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice); @@ -183,16 +193,17 @@ public class GossipDeviceStore providerId, oldDevice, newDevice); if (!providerId.isAncillary()) { - availableDevices.add(newDevice.id()); + markOnline(newDevice.id(), timestamp); } return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null); } // Updates the device and returns the appropriate event if necessary. - // Guarded by deviceDescs value (=locking Device) + // Guarded by deviceDescs value (=Device lock) private DeviceEvent updateDevice(ProviderId providerId, - Device oldDevice, Device newDevice) { + Device oldDevice, + Device newDevice, Timestamp newTimestamp) { // We allow only certain attributes to trigger update if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) || @@ -207,14 +218,14 @@ public class GossipDeviceStore , newDevice); } if (!providerId.isAncillary()) { - availableDevices.add(newDevice.id()); + markOnline(newDevice.id(), newTimestamp); } return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null); } // Otherwise merely attempt to change availability if primary provider if (!providerId.isAncillary()) { - boolean added = availableDevices.add(newDevice.id()); + boolean added = markOnline(newDevice.id(), newTimestamp); return !added ? null : new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null); } @@ -223,11 +234,29 @@ public class GossipDeviceStore @Override public DeviceEvent markOffline(DeviceId deviceId) { - ConcurrentMap providerDescs + Timestamp timestamp = clockService.getTimestamp(deviceId); + return markOfflineInternal(deviceId, timestamp); + } + + private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) { + + Map providerDescs = getDeviceDescriptions(deviceId); // locking device synchronized (providerDescs) { + + // accept off-line if given timestamp is newer than + // the latest Timestamp from Primary provider + DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs); + Timestamp lastTimestamp = primDescs.getLatestTimestamp(); + if (timestamp.compareTo(lastTimestamp) <= 0) { + // outdated event ignore + return null; + } + + offline.put(deviceId, timestamp); + Device device = devices.get(deviceId); if (device == null) { return null; @@ -236,15 +265,37 @@ public class GossipDeviceStore if (removed) { // TODO: broadcast ... DOWN only? return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null); - } return null; } } + /** + * Marks the device as available if the given timestamp is not outdated, + * compared to the time the device has been marked offline. + * + * @param deviceId identifier of the device + * @param timestamp of the event triggering this change. + * @return true if availability change request was accepted and changed the state + */ + // Guarded by deviceDescs value (=Device lock) + private boolean markOnline(DeviceId deviceId, Timestamp timestamp) { + // accept on-line if given timestamp is newer than + // the latest offline request Timestamp + Timestamp offlineTimestamp = offline.get(deviceId); + if (offlineTimestamp == null || + offlineTimestamp.compareTo(timestamp) < 0) { + + offline.remove(deviceId); + return availableDevices.add(deviceId); + } + return false; + } + @Override - public synchronized List updatePorts(ProviderId providerId, DeviceId deviceId, - List portDescriptions) { + public synchronized List updatePorts(ProviderId providerId, + DeviceId deviceId, + List portDescriptions) { Timestamp newTimestamp = clockService.getTimestamp(deviceId); List> deltaDescs = new ArrayList<>(portDescriptions.size()); @@ -252,7 +303,8 @@ public class GossipDeviceStore deltaDescs.add(new Timestamped(e, newTimestamp)); } - List events = updatePortsInternal(providerId, deviceId, deltaDescs); + List events = updatePortsInternal(providerId, deviceId, + new Timestamped<>(portDescriptions, newTimestamp)); if (!events.isEmpty()) { // FIXME: broadcast deltaDesc, UP log.debug("broadcast deltaDesc"); @@ -261,8 +313,9 @@ public class GossipDeviceStore } - private List updatePortsInternal(ProviderId providerId, DeviceId deviceId, - List> deltaDescs) { + private List updatePortsInternal(ProviderId providerId, + DeviceId deviceId, + Timestamped> portDescriptions) { Device device = devices.get(deviceId); checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); @@ -270,30 +323,41 @@ public class GossipDeviceStore ConcurrentMap descsMap = deviceDescs.get(deviceId); checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); - DeviceDescriptions descs = descsMap.get(providerId); - // every provider must provide DeviceDescription. - checkArgument(descs != null, - "Device description for Device ID %s from Provider %s was not found", - deviceId, providerId); - List events = new ArrayList<>(); synchronized (descsMap) { + + if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) { + log.debug("Ignoring outdated events: {}", portDescriptions); + return null; + } + + DeviceDescriptions descs = descsMap.get(providerId); + // every provider must provide DeviceDescription. + checkArgument(descs != null, + "Device description for Device ID %s from Provider %s was not found", + deviceId, providerId); + Map ports = getPortMap(deviceId); + final Timestamp newTimestamp = portDescriptions.timestamp(); + // Add new ports Set processed = new HashSet<>(); - for (Timestamped deltaDesc : deltaDescs) { - final PortNumber number = deltaDesc.value().portNumber(); + for (PortDescription portDescription : portDescriptions.value()) { + final PortNumber number = portDescription.portNumber(); + processed.add(number); + final Port oldPort = ports.get(number); final Port newPort; + final Timestamped existingPortDesc = descs.getPortDesc(number); if (existingPortDesc == null || - deltaDesc == existingPortDesc || - deltaDesc.isNewer(existingPortDesc)) { + newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) { // on new port or valid update // update description - descs.putPortDesc(deltaDesc); + descs.putPortDesc(new Timestamped<>(portDescription, + portDescriptions.timestamp())); newPort = composePort(device, number, descsMap); } else { // outdated event, ignored. @@ -303,7 +367,6 @@ public class GossipDeviceStore events.add(oldPort == null ? createPort(device, newPort, ports) : updatePort(device, oldPort, newPort, ports)); - processed.add(number); } events.addAll(pruneOldPorts(device, ports, processed)); @@ -313,7 +376,7 @@ public class GossipDeviceStore // Creates a new port based on the port description adds it to the map and // Returns corresponding event. - // Guarded by deviceDescs value (=locking Device) + // Guarded by deviceDescs value (=Device lock) private DeviceEvent createPort(Device device, Port newPort, Map ports) { ports.put(newPort.number(), newPort); @@ -322,7 +385,7 @@ public class GossipDeviceStore // Checks if the specified port requires update and if so, it replaces the // existing entry in the map and returns corresponding event. - // Guarded by deviceDescs value (=locking Device) + // Guarded by deviceDescs value (=Device lock) private DeviceEvent updatePort(Device device, Port oldPort, Port newPort, Map ports) { @@ -337,7 +400,7 @@ public class GossipDeviceStore // Prunes the specified list of ports based on which ports are in the // processed list and returns list of corresponding events. - // Guarded by deviceDescs value (=locking Device) + // Guarded by deviceDescs value (=Device lock) private List pruneOldPorts(Device device, Map ports, Set processed) { @@ -389,13 +452,19 @@ public class GossipDeviceStore ConcurrentMap descsMap = deviceDescs.get(deviceId); checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); - DeviceDescriptions descs = descsMap.get(providerId); - // assuming all providers must to give DeviceDescription - checkArgument(descs != null, - "Device description for Device ID %s from Provider %s was not found", - deviceId, providerId); - synchronized (descsMap) { + + if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) { + log.debug("Ignoring outdated event: {}", deltaDesc); + return null; + } + + DeviceDescriptions descs = descsMap.get(providerId); + // assuming all providers must to give DeviceDescription + checkArgument(descs != null, + "Device description for Device ID %s from Provider %s was not found", + deviceId, providerId); + ConcurrentMap ports = getPortMap(deviceId); final PortNumber number = deltaDesc.value().portNumber(); final Port oldPort = ports.get(number); @@ -443,19 +512,51 @@ public class GossipDeviceStore } @Override - public DeviceEvent removeDevice(DeviceId deviceId) { - ConcurrentMap descs = getDeviceDescriptions(deviceId); + public synchronized DeviceEvent removeDevice(DeviceId deviceId) { + Timestamp timestamp = clockService.getTimestamp(deviceId); + DeviceEvent event = removeDeviceInternal(deviceId, timestamp); + // TODO: broadcast removal event + return event; + } + + private DeviceEvent removeDeviceInternal(DeviceId deviceId, + Timestamp timestamp) { + + Map descs = getDeviceDescriptions(deviceId); synchronized (descs) { + // accept removal request if given timestamp is newer than + // the latest Timestamp from Primary provider + DeviceDescriptions primDescs = getPrimaryDescriptions(descs); + Timestamp lastTimestamp = primDescs.getLatestTimestamp(); + if (timestamp.compareTo(lastTimestamp) <= 0) { + // outdated event ignore + return null; + } + removalRequest.put(deviceId, timestamp); + Device device = devices.remove(deviceId); // should DEVICE_REMOVED carry removed ports? - devicePorts.get(deviceId).clear(); - availableDevices.remove(deviceId); + Map ports = devicePorts.get(deviceId); + if (ports != null) { + ports.clear(); + } + markOfflineInternal(deviceId, timestamp); descs.clear(); return device == null ? null : new DeviceEvent(DEVICE_REMOVED, device, null); } } + private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) { + Timestamp removalTimestamp = removalRequest.get(deviceId); + if (removalTimestamp != null && + removalTimestamp.compareTo(timestampToCheck) >= 0) { + // removalRequest is more recent + return true; + } + return false; + } + /** * Returns a Device, merging description given from multiple Providers. * @@ -472,7 +573,7 @@ public class GossipDeviceStore DeviceDescriptions desc = providerDescs.get(primary); - DeviceDescription base = desc.getDeviceDesc().value(); + final DeviceDescription base = desc.getDeviceDesc().value(); Type type = base.type(); String manufacturer = base.manufacturer(); String hwVersion = base.hwVersion(); @@ -545,7 +646,7 @@ public class GossipDeviceStore * @return primary ProviderID, or randomly chosen one if none exists */ private ProviderId pickPrimaryPID( - ConcurrentMap providerDescs) { + Map providerDescs) { ProviderId fallBackPrimary = null; for (Entry e : providerDescs.entrySet()) { if (!e.getKey().isAncillary()) { @@ -558,6 +659,12 @@ public class GossipDeviceStore return fallBackPrimary; } + private DeviceDescriptions getPrimaryDescriptions( + Map providerDescs) { + ProviderId pid = pickPrimaryPID(providerDescs); + return providerDescs.get(pid); + } + public static final class InitDeviceDescs implements ConcurrentInitializer { @@ -586,6 +693,16 @@ public class GossipDeviceStore this.portDescs = new ConcurrentHashMap<>(); } + Timestamp getLatestTimestamp() { + Timestamp latest = deviceDesc.get().timestamp(); + for (Timestamped desc : portDescs.values()) { + if (desc.timestamp().compareTo(latest) > 0) { + latest = desc.timestamp(); + } + } + return latest; + } + public Timestamped getDeviceDesc() { return deviceDesc.get(); } diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java index 0880ac9eaa..514a22e89c 100644 --- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java +++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java @@ -2,6 +2,8 @@ package org.onlab.onos.store.trivial.impl; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.commons.lang3.concurrent.ConcurrentException; import org.apache.commons.lang3.concurrent.ConcurrentInitializer; @@ -32,7 +34,6 @@ import org.onlab.util.NewConcurrentHashMap; import org.slf4j.Logger; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -48,6 +49,7 @@ import java.util.concurrent.atomic.AtomicReference; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Predicates.notNull; +import static com.google.common.base.Verify.verify; import static org.onlab.onos.net.device.DeviceEvent.Type.*; import static org.slf4j.LoggerFactory.getLogger; import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; @@ -71,14 +73,14 @@ public class SimpleDeviceStore // collection of Description given from various providers private final ConcurrentMap> - deviceDescs = new ConcurrentHashMap<>(); + deviceDescs = Maps.newConcurrentMap(); // cache of Device and Ports generated by compositing descriptions from providers - private final ConcurrentMap devices = new ConcurrentHashMap<>(); - private final ConcurrentMap> devicePorts = new ConcurrentHashMap<>(); + private final ConcurrentMap devices = Maps.newConcurrentMap(); + private final ConcurrentMap> devicePorts = Maps.newConcurrentMap(); // available(=UP) devices - private final Set availableDevices = new HashSet<>(); + private final Set availableDevices = Sets.newConcurrentHashSet(); @Activate @@ -88,6 +90,10 @@ public class SimpleDeviceStore @Deactivate public void deactivate() { + deviceDescs.clear(); + devices.clear(); + devicePorts.clear(); + availableDevices.clear(); log.info("Stopped"); } @@ -107,45 +113,54 @@ public class SimpleDeviceStore } @Override - public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId, + public DeviceEvent createOrUpdateDevice(ProviderId providerId, + DeviceId deviceId, DeviceDescription deviceDescription) { + ConcurrentMap providerDescs = getDeviceDescriptions(deviceId); - Device oldDevice = devices.get(deviceId); + synchronized (providerDescs) { + // locking per device - DeviceDescriptions descs - = createIfAbsentUnchecked(providerDescs, providerId, - new InitDeviceDescs(deviceDescription)); + DeviceDescriptions descs + = createIfAbsentUnchecked(providerDescs, providerId, + new InitDeviceDescs(deviceDescription)); - // update description - descs.putDeviceDesc(deviceDescription); - Device newDevice = composeDevice(deviceId, providerDescs); + Device oldDevice = devices.get(deviceId); + // update description + descs.putDeviceDesc(deviceDescription); + Device newDevice = composeDevice(deviceId, providerDescs); - if (oldDevice == null) { - // ADD - return createDevice(providerId, newDevice); - } else { - // UPDATE or ignore (no change or stale) - return updateDevice(providerId, oldDevice, newDevice); + if (oldDevice == null) { + // ADD + return createDevice(providerId, newDevice); + } else { + // UPDATE or ignore (no change or stale) + return updateDevice(providerId, oldDevice, newDevice); + } } } // Creates the device and returns the appropriate event if necessary. + // Guarded by deviceDescs value (=Device lock) private DeviceEvent createDevice(ProviderId providerId, Device newDevice) { // update composed device cache - synchronized (this) { - devices.putIfAbsent(newDevice.id(), newDevice); - if (!providerId.isAncillary()) { - availableDevices.add(newDevice.id()); - } + Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice); + verify(oldDevice == null, + "Unexpected Device in cache. PID:%s [old=%s, new=%s]", + providerId, oldDevice, newDevice); + + if (!providerId.isAncillary()) { + availableDevices.add(newDevice.id()); } return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null); } // Updates the device and returns the appropriate event if necessary. + // Guarded by deviceDescs value (=Device lock) private DeviceEvent updateDevice(ProviderId providerId, Device oldDevice, Device newDevice) { // We allow only certain attributes to trigger update @@ -153,70 +168,87 @@ public class SimpleDeviceStore !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) || !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) { - synchronized (this) { - devices.replace(newDevice.id(), oldDevice, newDevice); - if (!providerId.isAncillary()) { - availableDevices.add(newDevice.id()); - } + boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice); + if (!replaced) { + verify(replaced, + "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]", + providerId, oldDevice, devices.get(newDevice.id()) + , newDevice); + } + if (!providerId.isAncillary()) { + availableDevices.add(newDevice.id()); } return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null); } // Otherwise merely attempt to change availability if primary provider if (!providerId.isAncillary()) { - synchronized (this) { boolean added = availableDevices.add(newDevice.id()); return !added ? null : new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null); - } } return null; } @Override public DeviceEvent markOffline(DeviceId deviceId) { - synchronized (this) { + ConcurrentMap providerDescs + = getDeviceDescriptions(deviceId); + + // locking device + synchronized (providerDescs) { Device device = devices.get(deviceId); - boolean removed = (device != null) && availableDevices.remove(deviceId); - return !removed ? null : - new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null); + if (device == null) { + return null; + } + boolean removed = availableDevices.remove(deviceId); + if (removed) { + // TODO: broadcast ... DOWN only? + return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null); + } + return null; } } @Override - public synchronized List updatePorts(ProviderId providerId, DeviceId deviceId, - List portDescriptions) { + public List updatePorts(ProviderId providerId, + DeviceId deviceId, + List portDescriptions) { - // TODO: implement multi-provider Device device = devices.get(deviceId); checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); ConcurrentMap descsMap = deviceDescs.get(deviceId); checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); - DeviceDescriptions descs = descsMap.get(providerId); - checkArgument(descs != null, - "Device description for Device ID %s from Provider %s was not found", - deviceId, providerId); - - List events = new ArrayList<>(); - synchronized (this) { - ConcurrentMap ports = getPortMap(deviceId); + synchronized (descsMap) { + DeviceDescriptions descs = descsMap.get(providerId); + // every provider must provide DeviceDescription. + checkArgument(descs != null, + "Device description for Device ID %s from Provider %s was not found", + deviceId, providerId); + + Map ports = getPortMap(deviceId); // Add new ports Set processed = new HashSet<>(); for (PortDescription portDescription : portDescriptions) { - PortNumber number = portDescription.portNumber(); - Port oldPort = ports.get(number); + final PortNumber number = portDescription.portNumber(); + processed.add(portDescription.portNumber()); + + final Port oldPort = ports.get(number); + final Port newPort; + +// event suppression hook? + // update description descs.putPortDesc(portDescription); - Port newPort = composePort(device, number, descsMap); + newPort = composePort(device, number, descsMap); events.add(oldPort == null ? - createPort(device, newPort, ports) : - updatePort(device, oldPort, newPort, ports)); - processed.add(portDescription.portNumber()); + createPort(device, newPort, ports) : + updatePort(device, oldPort, newPort, ports)); } events.addAll(pruneOldPorts(device, ports, processed)); @@ -226,17 +258,19 @@ public class SimpleDeviceStore // Creates a new port based on the port description adds it to the map and // Returns corresponding event. + // Guarded by deviceDescs value (=Device lock) private DeviceEvent createPort(Device device, Port newPort, - ConcurrentMap ports) { + Map ports) { ports.put(newPort.number(), newPort); return new DeviceEvent(PORT_ADDED, device, newPort); } // Checks if the specified port requires update and if so, it replaces the // existing entry in the map and returns corresponding event. + // Guarded by deviceDescs value (=Device lock) private DeviceEvent updatePort(Device device, Port oldPort, Port newPort, - ConcurrentMap ports) { + Map ports) { if (oldPort.isEnabled() != newPort.isEnabled() || !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) { @@ -248,6 +282,7 @@ public class SimpleDeviceStore // Prunes the specified list of ports based on which ports are in the // processed list and returns list of corresponding events. + // Guarded by deviceDescs value (=Device lock) private List pruneOldPorts(Device device, Map ports, Set processed) { @@ -264,12 +299,6 @@ public class SimpleDeviceStore return events; } - private ConcurrentMap getDeviceDescriptions( - DeviceId deviceId) { - return createIfAbsentUnchecked(deviceDescs, deviceId, - NewConcurrentHashMap.ifNeeded()); - } - // Gets the map of ports for the specified device; if one does not already // exist, it creates and registers a new one. private ConcurrentMap getPortMap(DeviceId deviceId) { @@ -277,8 +306,14 @@ public class SimpleDeviceStore NewConcurrentHashMap.ifNeeded()); } + private ConcurrentMap getDeviceDescriptions( + DeviceId deviceId) { + return createIfAbsentUnchecked(deviceDescs, deviceId, + NewConcurrentHashMap.ifNeeded()); + } + @Override - public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId, + public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId, PortDescription portDescription) { Device device = devices.get(deviceId); checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); @@ -286,19 +321,22 @@ public class SimpleDeviceStore ConcurrentMap descsMap = deviceDescs.get(deviceId); checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); - DeviceDescriptions descs = descsMap.get(providerId); - // assuming all providers must to give DeviceDescription - checkArgument(descs != null, - "Device description for Device ID %s from Provider %s was not found", - deviceId, providerId); + synchronized (descsMap) { + DeviceDescriptions descs = descsMap.get(providerId); + // assuming all providers must to give DeviceDescription + checkArgument(descs != null, + "Device description for Device ID %s from Provider %s was not found", + deviceId, providerId); - synchronized (this) { ConcurrentMap ports = getPortMap(deviceId); final PortNumber number = portDescription.portNumber(); - Port oldPort = ports.get(number); + final Port oldPort = ports.get(number); + final Port newPort; + // update description descs.putPortDesc(portDescription); - Port newPort = composePort(device, number, descsMap); + newPort = composePort(device, number, descsMap); + if (oldPort == null) { return createPort(device, newPort, ports); } else { @@ -333,7 +371,7 @@ public class SimpleDeviceStore synchronized (descs) { Device device = devices.remove(deviceId); // should DEVICE_REMOVED carry removed ports? - ConcurrentMap ports = devicePorts.get(deviceId); + Map ports = devicePorts.get(deviceId); if (ports != null) { ports.clear(); } @@ -360,14 +398,14 @@ public class SimpleDeviceStore DeviceDescriptions desc = providerDescs.get(primary); - // base - Type type = desc.getDeviceDesc().type(); - String manufacturer = desc.getDeviceDesc().manufacturer(); - String hwVersion = desc.getDeviceDesc().hwVersion(); - String swVersion = desc.getDeviceDesc().swVersion(); - String serialNumber = desc.getDeviceDesc().serialNumber(); + final DeviceDescription base = desc.getDeviceDesc(); + Type type = base.type(); + String manufacturer = base.manufacturer(); + String hwVersion = base.hwVersion(); + String swVersion = base.swVersion(); + String serialNumber = base.serialNumber(); DefaultAnnotations annotations = DefaultAnnotations.builder().build(); - annotations = merge(annotations, desc.getDeviceDesc().annotations()); + annotations = merge(annotations, base.annotations()); for (Entry e : providerDescs.entrySet()) { if (e.getKey().equals(primary)) { @@ -386,7 +424,14 @@ public class SimpleDeviceStore hwVersion, swVersion, serialNumber, annotations); } - // probably want composePort"s" also + /** + * Returns a Port, merging description given from multiple Providers. + * + * @param device device the port is on + * @param number port number + * @param providerDescs Collection of Descriptions from multiple providers + * @return Port instance + */ private Port composePort(Device device, PortNumber number, ConcurrentMap providerDescs) { @@ -441,7 +486,9 @@ public class SimpleDeviceStore public static final class InitDeviceDescs implements ConcurrentInitializer { + private final DeviceDescription deviceDesc; + public InitDeviceDescs(DeviceDescription deviceDesc) { this.deviceDesc = checkNotNull(deviceDesc); } @@ -456,8 +503,6 @@ public class SimpleDeviceStore * Collection of Description of a Device and it's Ports given from a Provider. */ private static class DeviceDescriptions { - // private final DeviceId id; - // private final ProviderId pid; private final AtomicReference deviceDesc; private final ConcurrentMap portDescs; @@ -475,10 +520,6 @@ public class SimpleDeviceStore return portDescs.get(number); } - public Collection getPortDescs() { - return Collections.unmodifiableCollection(portDescs.values()); - } - /** * Puts DeviceDescription, merging annotations as necessary. *