DeviceStore update

- Add off-line/remove handling to Gossip~
- Backport lock scope to Simple~

Change-Id: I5b4c8e12738ef78920341fb8699c4b07bde8712a
This commit is contained in:
Yuta HIGUCHI 2014-10-06 14:43:53 -07:00
parent d2266a70f7
commit c35efaca47
2 changed files with 300 additions and 142 deletions

View File

@ -2,7 +2,8 @@ package org.onlab.onos.store.device.impl;
import com.google.common.collect.FluentIterable; import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.concurrent.ConcurrentException; import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer; import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Activate;
@ -59,7 +60,7 @@ import static org.onlab.onos.net.DefaultAnnotations.merge;
import static org.onlab.onos.net.DefaultAnnotations.union; import static org.onlab.onos.net.DefaultAnnotations.union;
import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verify;
// TODO: implement remove event handling and call *Internal // TODO: give me a better name
/** /**
* Manages inventory of infrastructure devices using gossip protocol to distribute * Manages inventory of infrastructure devices using gossip protocol to distribute
* information. * information.
@ -79,14 +80,18 @@ public class GossipDeviceStore
// collection of Description given from various providers // collection of Description given from various providers
private final ConcurrentMap<DeviceId, private final ConcurrentMap<DeviceId,
ConcurrentMap<ProviderId, DeviceDescriptions>> ConcurrentMap<ProviderId, DeviceDescriptions>>
deviceDescs = new ConcurrentHashMap<>(); deviceDescs = Maps.newConcurrentMap();
// cache of Device and Ports generated by compositing descriptions from providers // cache of Device and Ports generated by compositing descriptions from providers
private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>(); private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>(); private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
// to be updated under Device lock
private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
// available(=UP) devices // available(=UP) devices
private final Set<DeviceId> availableDevices = new HashSet<>(); private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService; protected ClockService clockService;
@ -121,7 +126,8 @@ public class GossipDeviceStore
} }
@Override @Override
public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId, public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
DeviceId deviceId,
DeviceDescription deviceDescription) { DeviceDescription deviceDescription) {
Timestamp newTimestamp = clockService.getTimestamp(deviceId); Timestamp newTimestamp = clockService.getTimestamp(deviceId);
final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp); final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
@ -133,22 +139,26 @@ public class GossipDeviceStore
return event; return event;
} }
private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId, DeviceId deviceId, private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
Timestamped<DeviceDescription> deltaDesc) { DeviceId deviceId,
Timestamped<DeviceDescription> deltaDesc) {
// Collection of DeviceDescriptions for a Device // Collection of DeviceDescriptions for a Device
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
= getDeviceDescriptions(deviceId); = getDeviceDescriptions(deviceId);
DeviceDescriptions descs
= createIfAbsentUnchecked(providerDescs, providerId,
new InitDeviceDescs(deltaDesc));
// update description
synchronized (providerDescs) { synchronized (providerDescs) {
// locking per device // locking per device
if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
log.debug("Ignoring outdated event: {}", deltaDesc);
return null;
}
DeviceDescriptions descs
= createIfAbsentUnchecked(providerDescs, providerId,
new InitDeviceDescs(deltaDesc));
final Device oldDevice = devices.get(deviceId); final Device oldDevice = devices.get(deviceId);
final Device newDevice; final Device newDevice;
@ -163,18 +173,18 @@ public class GossipDeviceStore
} }
if (oldDevice == null) { if (oldDevice == null) {
// ADD // ADD
return createDevice(providerId, newDevice); return createDevice(providerId, newDevice, deltaDesc.timestamp());
} else { } else {
// UPDATE or ignore (no change or stale) // UPDATE or ignore (no change or stale)
return updateDevice(providerId, oldDevice, newDevice); return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
} }
} }
} }
// Creates the device and returns the appropriate event if necessary. // Creates the device and returns the appropriate event if necessary.
// Guarded by deviceDescs value (=locking Device) // Guarded by deviceDescs value (=Device lock)
private DeviceEvent createDevice(ProviderId providerId, private DeviceEvent createDevice(ProviderId providerId,
Device newDevice) { Device newDevice, Timestamp timestamp) {
// update composed device cache // update composed device cache
Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice); Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
@ -183,16 +193,17 @@ public class GossipDeviceStore
providerId, oldDevice, newDevice); providerId, oldDevice, newDevice);
if (!providerId.isAncillary()) { if (!providerId.isAncillary()) {
availableDevices.add(newDevice.id()); markOnline(newDevice.id(), timestamp);
} }
return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null); return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
} }
// Updates the device and returns the appropriate event if necessary. // Updates the device and returns the appropriate event if necessary.
// Guarded by deviceDescs value (=locking Device) // Guarded by deviceDescs value (=Device lock)
private DeviceEvent updateDevice(ProviderId providerId, private DeviceEvent updateDevice(ProviderId providerId,
Device oldDevice, Device newDevice) { Device oldDevice,
Device newDevice, Timestamp newTimestamp) {
// We allow only certain attributes to trigger update // We allow only certain attributes to trigger update
if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) || if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
@ -207,14 +218,14 @@ public class GossipDeviceStore
, newDevice); , newDevice);
} }
if (!providerId.isAncillary()) { if (!providerId.isAncillary()) {
availableDevices.add(newDevice.id()); markOnline(newDevice.id(), newTimestamp);
} }
return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null); return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
} }
// Otherwise merely attempt to change availability if primary provider // Otherwise merely attempt to change availability if primary provider
if (!providerId.isAncillary()) { if (!providerId.isAncillary()) {
boolean added = availableDevices.add(newDevice.id()); boolean added = markOnline(newDevice.id(), newTimestamp);
return !added ? null : return !added ? null :
new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null); new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
} }
@ -223,11 +234,29 @@ public class GossipDeviceStore
@Override @Override
public DeviceEvent markOffline(DeviceId deviceId) { public DeviceEvent markOffline(DeviceId deviceId) {
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs Timestamp timestamp = clockService.getTimestamp(deviceId);
return markOfflineInternal(deviceId, timestamp);
}
private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
Map<ProviderId, DeviceDescriptions> providerDescs
= getDeviceDescriptions(deviceId); = getDeviceDescriptions(deviceId);
// locking device // locking device
synchronized (providerDescs) { synchronized (providerDescs) {
// accept off-line if given timestamp is newer than
// the latest Timestamp from Primary provider
DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
Timestamp lastTimestamp = primDescs.getLatestTimestamp();
if (timestamp.compareTo(lastTimestamp) <= 0) {
// outdated event ignore
return null;
}
offline.put(deviceId, timestamp);
Device device = devices.get(deviceId); Device device = devices.get(deviceId);
if (device == null) { if (device == null) {
return null; return null;
@ -236,15 +265,37 @@ public class GossipDeviceStore
if (removed) { if (removed) {
// TODO: broadcast ... DOWN only? // TODO: broadcast ... DOWN only?
return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null); return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
} }
return null; return null;
} }
} }
/**
* Marks the device as available if the given timestamp is not outdated,
* compared to the time the device has been marked offline.
*
* @param deviceId identifier of the device
* @param timestamp of the event triggering this change.
* @return true if availability change request was accepted and changed the state
*/
// Guarded by deviceDescs value (=Device lock)
private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
// accept on-line if given timestamp is newer than
// the latest offline request Timestamp
Timestamp offlineTimestamp = offline.get(deviceId);
if (offlineTimestamp == null ||
offlineTimestamp.compareTo(timestamp) < 0) {
offline.remove(deviceId);
return availableDevices.add(deviceId);
}
return false;
}
@Override @Override
public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId, public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
List<PortDescription> portDescriptions) { DeviceId deviceId,
List<PortDescription> portDescriptions) {
Timestamp newTimestamp = clockService.getTimestamp(deviceId); Timestamp newTimestamp = clockService.getTimestamp(deviceId);
List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size()); List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size());
@ -252,7 +303,8 @@ public class GossipDeviceStore
deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp)); deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
} }
List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, deltaDescs); List<DeviceEvent> events = updatePortsInternal(providerId, deviceId,
new Timestamped<>(portDescriptions, newTimestamp));
if (!events.isEmpty()) { if (!events.isEmpty()) {
// FIXME: broadcast deltaDesc, UP // FIXME: broadcast deltaDesc, UP
log.debug("broadcast deltaDesc"); log.debug("broadcast deltaDesc");
@ -261,8 +313,9 @@ public class GossipDeviceStore
} }
private List<DeviceEvent> updatePortsInternal(ProviderId providerId, DeviceId deviceId, private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
List<Timestamped<PortDescription>> deltaDescs) { DeviceId deviceId,
Timestamped<List<PortDescription>> portDescriptions) {
Device device = devices.get(deviceId); Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
@ -270,30 +323,41 @@ public class GossipDeviceStore
ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
DeviceDescriptions descs = descsMap.get(providerId);
// every provider must provide DeviceDescription.
checkArgument(descs != null,
"Device description for Device ID %s from Provider %s was not found",
deviceId, providerId);
List<DeviceEvent> events = new ArrayList<>(); List<DeviceEvent> events = new ArrayList<>();
synchronized (descsMap) { synchronized (descsMap) {
if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
log.debug("Ignoring outdated events: {}", portDescriptions);
return null;
}
DeviceDescriptions descs = descsMap.get(providerId);
// every provider must provide DeviceDescription.
checkArgument(descs != null,
"Device description for Device ID %s from Provider %s was not found",
deviceId, providerId);
Map<PortNumber, Port> ports = getPortMap(deviceId); Map<PortNumber, Port> ports = getPortMap(deviceId);
final Timestamp newTimestamp = portDescriptions.timestamp();
// Add new ports // Add new ports
Set<PortNumber> processed = new HashSet<>(); Set<PortNumber> processed = new HashSet<>();
for (Timestamped<PortDescription> deltaDesc : deltaDescs) { for (PortDescription portDescription : portDescriptions.value()) {
final PortNumber number = deltaDesc.value().portNumber(); final PortNumber number = portDescription.portNumber();
processed.add(number);
final Port oldPort = ports.get(number); final Port oldPort = ports.get(number);
final Port newPort; final Port newPort;
final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number); final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
if (existingPortDesc == null || if (existingPortDesc == null ||
deltaDesc == existingPortDesc || newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
deltaDesc.isNewer(existingPortDesc)) {
// on new port or valid update // on new port or valid update
// update description // update description
descs.putPortDesc(deltaDesc); descs.putPortDesc(new Timestamped<>(portDescription,
portDescriptions.timestamp()));
newPort = composePort(device, number, descsMap); newPort = composePort(device, number, descsMap);
} else { } else {
// outdated event, ignored. // outdated event, ignored.
@ -303,7 +367,6 @@ public class GossipDeviceStore
events.add(oldPort == null ? events.add(oldPort == null ?
createPort(device, newPort, ports) : createPort(device, newPort, ports) :
updatePort(device, oldPort, newPort, ports)); updatePort(device, oldPort, newPort, ports));
processed.add(number);
} }
events.addAll(pruneOldPorts(device, ports, processed)); events.addAll(pruneOldPorts(device, ports, processed));
@ -313,7 +376,7 @@ public class GossipDeviceStore
// Creates a new port based on the port description adds it to the map and // Creates a new port based on the port description adds it to the map and
// Returns corresponding event. // Returns corresponding event.
// Guarded by deviceDescs value (=locking Device) // Guarded by deviceDescs value (=Device lock)
private DeviceEvent createPort(Device device, Port newPort, private DeviceEvent createPort(Device device, Port newPort,
Map<PortNumber, Port> ports) { Map<PortNumber, Port> ports) {
ports.put(newPort.number(), newPort); ports.put(newPort.number(), newPort);
@ -322,7 +385,7 @@ public class GossipDeviceStore
// Checks if the specified port requires update and if so, it replaces the // Checks if the specified port requires update and if so, it replaces the
// existing entry in the map and returns corresponding event. // existing entry in the map and returns corresponding event.
// Guarded by deviceDescs value (=locking Device) // Guarded by deviceDescs value (=Device lock)
private DeviceEvent updatePort(Device device, Port oldPort, private DeviceEvent updatePort(Device device, Port oldPort,
Port newPort, Port newPort,
Map<PortNumber, Port> ports) { Map<PortNumber, Port> ports) {
@ -337,7 +400,7 @@ public class GossipDeviceStore
// Prunes the specified list of ports based on which ports are in the // Prunes the specified list of ports based on which ports are in the
// processed list and returns list of corresponding events. // processed list and returns list of corresponding events.
// Guarded by deviceDescs value (=locking Device) // Guarded by deviceDescs value (=Device lock)
private List<DeviceEvent> pruneOldPorts(Device device, private List<DeviceEvent> pruneOldPorts(Device device,
Map<PortNumber, Port> ports, Map<PortNumber, Port> ports,
Set<PortNumber> processed) { Set<PortNumber> processed) {
@ -389,13 +452,19 @@ public class GossipDeviceStore
ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
DeviceDescriptions descs = descsMap.get(providerId);
// assuming all providers must to give DeviceDescription
checkArgument(descs != null,
"Device description for Device ID %s from Provider %s was not found",
deviceId, providerId);
synchronized (descsMap) { synchronized (descsMap) {
if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
log.debug("Ignoring outdated event: {}", deltaDesc);
return null;
}
DeviceDescriptions descs = descsMap.get(providerId);
// assuming all providers must to give DeviceDescription
checkArgument(descs != null,
"Device description for Device ID %s from Provider %s was not found",
deviceId, providerId);
ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId); ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
final PortNumber number = deltaDesc.value().portNumber(); final PortNumber number = deltaDesc.value().portNumber();
final Port oldPort = ports.get(number); final Port oldPort = ports.get(number);
@ -443,19 +512,51 @@ public class GossipDeviceStore
} }
@Override @Override
public DeviceEvent removeDevice(DeviceId deviceId) { public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
ConcurrentMap<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId); Timestamp timestamp = clockService.getTimestamp(deviceId);
DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
// TODO: broadcast removal event
return event;
}
private DeviceEvent removeDeviceInternal(DeviceId deviceId,
Timestamp timestamp) {
Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
synchronized (descs) { synchronized (descs) {
// accept removal request if given timestamp is newer than
// the latest Timestamp from Primary provider
DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
Timestamp lastTimestamp = primDescs.getLatestTimestamp();
if (timestamp.compareTo(lastTimestamp) <= 0) {
// outdated event ignore
return null;
}
removalRequest.put(deviceId, timestamp);
Device device = devices.remove(deviceId); Device device = devices.remove(deviceId);
// should DEVICE_REMOVED carry removed ports? // should DEVICE_REMOVED carry removed ports?
devicePorts.get(deviceId).clear(); Map<PortNumber, Port> ports = devicePorts.get(deviceId);
availableDevices.remove(deviceId); if (ports != null) {
ports.clear();
}
markOfflineInternal(deviceId, timestamp);
descs.clear(); descs.clear();
return device == null ? null : return device == null ? null :
new DeviceEvent(DEVICE_REMOVED, device, null); new DeviceEvent(DEVICE_REMOVED, device, null);
} }
} }
private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
Timestamp removalTimestamp = removalRequest.get(deviceId);
if (removalTimestamp != null &&
removalTimestamp.compareTo(timestampToCheck) >= 0) {
// removalRequest is more recent
return true;
}
return false;
}
/** /**
* Returns a Device, merging description given from multiple Providers. * Returns a Device, merging description given from multiple Providers.
* *
@ -472,7 +573,7 @@ public class GossipDeviceStore
DeviceDescriptions desc = providerDescs.get(primary); DeviceDescriptions desc = providerDescs.get(primary);
DeviceDescription base = desc.getDeviceDesc().value(); final DeviceDescription base = desc.getDeviceDesc().value();
Type type = base.type(); Type type = base.type();
String manufacturer = base.manufacturer(); String manufacturer = base.manufacturer();
String hwVersion = base.hwVersion(); String hwVersion = base.hwVersion();
@ -545,7 +646,7 @@ public class GossipDeviceStore
* @return primary ProviderID, or randomly chosen one if none exists * @return primary ProviderID, or randomly chosen one if none exists
*/ */
private ProviderId pickPrimaryPID( private ProviderId pickPrimaryPID(
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) { Map<ProviderId, DeviceDescriptions> providerDescs) {
ProviderId fallBackPrimary = null; ProviderId fallBackPrimary = null;
for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) { for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
if (!e.getKey().isAncillary()) { if (!e.getKey().isAncillary()) {
@ -558,6 +659,12 @@ public class GossipDeviceStore
return fallBackPrimary; return fallBackPrimary;
} }
private DeviceDescriptions getPrimaryDescriptions(
Map<ProviderId, DeviceDescriptions> providerDescs) {
ProviderId pid = pickPrimaryPID(providerDescs);
return providerDescs.get(pid);
}
public static final class InitDeviceDescs public static final class InitDeviceDescs
implements ConcurrentInitializer<DeviceDescriptions> { implements ConcurrentInitializer<DeviceDescriptions> {
@ -586,6 +693,16 @@ public class GossipDeviceStore
this.portDescs = new ConcurrentHashMap<>(); this.portDescs = new ConcurrentHashMap<>();
} }
Timestamp getLatestTimestamp() {
Timestamp latest = deviceDesc.get().timestamp();
for (Timestamped<PortDescription> desc : portDescs.values()) {
if (desc.timestamp().compareTo(latest) > 0) {
latest = desc.timestamp();
}
}
return latest;
}
public Timestamped<DeviceDescription> getDeviceDesc() { public Timestamped<DeviceDescription> getDeviceDesc() {
return deviceDesc.get(); return deviceDesc.get();
} }

View File

@ -2,6 +2,8 @@ package org.onlab.onos.store.trivial.impl;
import com.google.common.collect.FluentIterable; import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.concurrent.ConcurrentException; import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer; import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
@ -32,7 +34,6 @@ import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -48,6 +49,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.notNull; import static com.google.common.base.Predicates.notNull;
import static com.google.common.base.Verify.verify;
import static org.onlab.onos.net.device.DeviceEvent.Type.*; import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
@ -71,14 +73,14 @@ public class SimpleDeviceStore
// collection of Description given from various providers // collection of Description given from various providers
private final ConcurrentMap<DeviceId, private final ConcurrentMap<DeviceId,
ConcurrentMap<ProviderId, DeviceDescriptions>> ConcurrentMap<ProviderId, DeviceDescriptions>>
deviceDescs = new ConcurrentHashMap<>(); deviceDescs = Maps.newConcurrentMap();
// cache of Device and Ports generated by compositing descriptions from providers // cache of Device and Ports generated by compositing descriptions from providers
private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>(); private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>(); private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
// available(=UP) devices // available(=UP) devices
private final Set<DeviceId> availableDevices = new HashSet<>(); private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
@Activate @Activate
@ -88,6 +90,10 @@ public class SimpleDeviceStore
@Deactivate @Deactivate
public void deactivate() { public void deactivate() {
deviceDescs.clear();
devices.clear();
devicePorts.clear();
availableDevices.clear();
log.info("Stopped"); log.info("Stopped");
} }
@ -107,45 +113,54 @@ public class SimpleDeviceStore
} }
@Override @Override
public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId, public DeviceEvent createOrUpdateDevice(ProviderId providerId,
DeviceId deviceId,
DeviceDescription deviceDescription) { DeviceDescription deviceDescription) {
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
= getDeviceDescriptions(deviceId); = getDeviceDescriptions(deviceId);
Device oldDevice = devices.get(deviceId); synchronized (providerDescs) {
// locking per device
DeviceDescriptions descs DeviceDescriptions descs
= createIfAbsentUnchecked(providerDescs, providerId, = createIfAbsentUnchecked(providerDescs, providerId,
new InitDeviceDescs(deviceDescription)); new InitDeviceDescs(deviceDescription));
// update description Device oldDevice = devices.get(deviceId);
descs.putDeviceDesc(deviceDescription); // update description
Device newDevice = composeDevice(deviceId, providerDescs); descs.putDeviceDesc(deviceDescription);
Device newDevice = composeDevice(deviceId, providerDescs);
if (oldDevice == null) { if (oldDevice == null) {
// ADD // ADD
return createDevice(providerId, newDevice); return createDevice(providerId, newDevice);
} else { } else {
// UPDATE or ignore (no change or stale) // UPDATE or ignore (no change or stale)
return updateDevice(providerId, oldDevice, newDevice); return updateDevice(providerId, oldDevice, newDevice);
}
} }
} }
// Creates the device and returns the appropriate event if necessary. // Creates the device and returns the appropriate event if necessary.
// Guarded by deviceDescs value (=Device lock)
private DeviceEvent createDevice(ProviderId providerId, Device newDevice) { private DeviceEvent createDevice(ProviderId providerId, Device newDevice) {
// update composed device cache // update composed device cache
synchronized (this) { Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
devices.putIfAbsent(newDevice.id(), newDevice); verify(oldDevice == null,
if (!providerId.isAncillary()) { "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
availableDevices.add(newDevice.id()); providerId, oldDevice, newDevice);
}
if (!providerId.isAncillary()) {
availableDevices.add(newDevice.id());
} }
return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null); return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
} }
// Updates the device and returns the appropriate event if necessary. // Updates the device and returns the appropriate event if necessary.
// Guarded by deviceDescs value (=Device lock)
private DeviceEvent updateDevice(ProviderId providerId, Device oldDevice, Device newDevice) { private DeviceEvent updateDevice(ProviderId providerId, Device oldDevice, Device newDevice) {
// We allow only certain attributes to trigger update // We allow only certain attributes to trigger update
@ -153,70 +168,87 @@ public class SimpleDeviceStore
!Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) || !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
!AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) { !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
synchronized (this) { boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
devices.replace(newDevice.id(), oldDevice, newDevice); if (!replaced) {
if (!providerId.isAncillary()) { verify(replaced,
availableDevices.add(newDevice.id()); "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
} providerId, oldDevice, devices.get(newDevice.id())
, newDevice);
}
if (!providerId.isAncillary()) {
availableDevices.add(newDevice.id());
} }
return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null); return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
} }
// Otherwise merely attempt to change availability if primary provider // Otherwise merely attempt to change availability if primary provider
if (!providerId.isAncillary()) { if (!providerId.isAncillary()) {
synchronized (this) {
boolean added = availableDevices.add(newDevice.id()); boolean added = availableDevices.add(newDevice.id());
return !added ? null : return !added ? null :
new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null); new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
}
} }
return null; return null;
} }
@Override @Override
public DeviceEvent markOffline(DeviceId deviceId) { public DeviceEvent markOffline(DeviceId deviceId) {
synchronized (this) { ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
= getDeviceDescriptions(deviceId);
// locking device
synchronized (providerDescs) {
Device device = devices.get(deviceId); Device device = devices.get(deviceId);
boolean removed = (device != null) && availableDevices.remove(deviceId); if (device == null) {
return !removed ? null : return null;
new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null); }
boolean removed = availableDevices.remove(deviceId);
if (removed) {
// TODO: broadcast ... DOWN only?
return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
}
return null;
} }
} }
@Override @Override
public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId, public List<DeviceEvent> updatePorts(ProviderId providerId,
List<PortDescription> portDescriptions) { DeviceId deviceId,
List<PortDescription> portDescriptions) {
// TODO: implement multi-provider
Device device = devices.get(deviceId); Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
DeviceDescriptions descs = descsMap.get(providerId);
checkArgument(descs != null,
"Device description for Device ID %s from Provider %s was not found",
deviceId, providerId);
List<DeviceEvent> events = new ArrayList<>(); List<DeviceEvent> events = new ArrayList<>();
synchronized (this) { synchronized (descsMap) {
ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId); DeviceDescriptions descs = descsMap.get(providerId);
// every provider must provide DeviceDescription.
checkArgument(descs != null,
"Device description for Device ID %s from Provider %s was not found",
deviceId, providerId);
Map<PortNumber, Port> ports = getPortMap(deviceId);
// Add new ports // Add new ports
Set<PortNumber> processed = new HashSet<>(); Set<PortNumber> processed = new HashSet<>();
for (PortDescription portDescription : portDescriptions) { for (PortDescription portDescription : portDescriptions) {
PortNumber number = portDescription.portNumber(); final PortNumber number = portDescription.portNumber();
Port oldPort = ports.get(number); processed.add(portDescription.portNumber());
final Port oldPort = ports.get(number);
final Port newPort;
// event suppression hook?
// update description // update description
descs.putPortDesc(portDescription); descs.putPortDesc(portDescription);
Port newPort = composePort(device, number, descsMap); newPort = composePort(device, number, descsMap);
events.add(oldPort == null ? events.add(oldPort == null ?
createPort(device, newPort, ports) : createPort(device, newPort, ports) :
updatePort(device, oldPort, newPort, ports)); updatePort(device, oldPort, newPort, ports));
processed.add(portDescription.portNumber());
} }
events.addAll(pruneOldPorts(device, ports, processed)); events.addAll(pruneOldPorts(device, ports, processed));
@ -226,17 +258,19 @@ public class SimpleDeviceStore
// Creates a new port based on the port description adds it to the map and // Creates a new port based on the port description adds it to the map and
// Returns corresponding event. // Returns corresponding event.
// Guarded by deviceDescs value (=Device lock)
private DeviceEvent createPort(Device device, Port newPort, private DeviceEvent createPort(Device device, Port newPort,
ConcurrentMap<PortNumber, Port> ports) { Map<PortNumber, Port> ports) {
ports.put(newPort.number(), newPort); ports.put(newPort.number(), newPort);
return new DeviceEvent(PORT_ADDED, device, newPort); return new DeviceEvent(PORT_ADDED, device, newPort);
} }
// Checks if the specified port requires update and if so, it replaces the // Checks if the specified port requires update and if so, it replaces the
// existing entry in the map and returns corresponding event. // existing entry in the map and returns corresponding event.
// Guarded by deviceDescs value (=Device lock)
private DeviceEvent updatePort(Device device, Port oldPort, private DeviceEvent updatePort(Device device, Port oldPort,
Port newPort, Port newPort,
ConcurrentMap<PortNumber, Port> ports) { Map<PortNumber, Port> ports) {
if (oldPort.isEnabled() != newPort.isEnabled() || if (oldPort.isEnabled() != newPort.isEnabled() ||
!AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) { !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
@ -248,6 +282,7 @@ public class SimpleDeviceStore
// Prunes the specified list of ports based on which ports are in the // Prunes the specified list of ports based on which ports are in the
// processed list and returns list of corresponding events. // processed list and returns list of corresponding events.
// Guarded by deviceDescs value (=Device lock)
private List<DeviceEvent> pruneOldPorts(Device device, private List<DeviceEvent> pruneOldPorts(Device device,
Map<PortNumber, Port> ports, Map<PortNumber, Port> ports,
Set<PortNumber> processed) { Set<PortNumber> processed) {
@ -264,12 +299,6 @@ public class SimpleDeviceStore
return events; return events;
} }
private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
DeviceId deviceId) {
return createIfAbsentUnchecked(deviceDescs, deviceId,
NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
}
// Gets the map of ports for the specified device; if one does not already // Gets the map of ports for the specified device; if one does not already
// exist, it creates and registers a new one. // exist, it creates and registers a new one.
private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) { private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
@ -277,8 +306,14 @@ public class SimpleDeviceStore
NewConcurrentHashMap.<PortNumber, Port>ifNeeded()); NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
} }
private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
DeviceId deviceId) {
return createIfAbsentUnchecked(deviceDescs, deviceId,
NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
}
@Override @Override
public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId, public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
PortDescription portDescription) { PortDescription portDescription) {
Device device = devices.get(deviceId); Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
@ -286,19 +321,22 @@ public class SimpleDeviceStore
ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
DeviceDescriptions descs = descsMap.get(providerId); synchronized (descsMap) {
// assuming all providers must to give DeviceDescription DeviceDescriptions descs = descsMap.get(providerId);
checkArgument(descs != null, // assuming all providers must to give DeviceDescription
"Device description for Device ID %s from Provider %s was not found", checkArgument(descs != null,
deviceId, providerId); "Device description for Device ID %s from Provider %s was not found",
deviceId, providerId);
synchronized (this) {
ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId); ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
final PortNumber number = portDescription.portNumber(); final PortNumber number = portDescription.portNumber();
Port oldPort = ports.get(number); final Port oldPort = ports.get(number);
final Port newPort;
// update description // update description
descs.putPortDesc(portDescription); descs.putPortDesc(portDescription);
Port newPort = composePort(device, number, descsMap); newPort = composePort(device, number, descsMap);
if (oldPort == null) { if (oldPort == null) {
return createPort(device, newPort, ports); return createPort(device, newPort, ports);
} else { } else {
@ -333,7 +371,7 @@ public class SimpleDeviceStore
synchronized (descs) { synchronized (descs) {
Device device = devices.remove(deviceId); Device device = devices.remove(deviceId);
// should DEVICE_REMOVED carry removed ports? // should DEVICE_REMOVED carry removed ports?
ConcurrentMap<PortNumber, Port> ports = devicePorts.get(deviceId); Map<PortNumber, Port> ports = devicePorts.get(deviceId);
if (ports != null) { if (ports != null) {
ports.clear(); ports.clear();
} }
@ -360,14 +398,14 @@ public class SimpleDeviceStore
DeviceDescriptions desc = providerDescs.get(primary); DeviceDescriptions desc = providerDescs.get(primary);
// base final DeviceDescription base = desc.getDeviceDesc();
Type type = desc.getDeviceDesc().type(); Type type = base.type();
String manufacturer = desc.getDeviceDesc().manufacturer(); String manufacturer = base.manufacturer();
String hwVersion = desc.getDeviceDesc().hwVersion(); String hwVersion = base.hwVersion();
String swVersion = desc.getDeviceDesc().swVersion(); String swVersion = base.swVersion();
String serialNumber = desc.getDeviceDesc().serialNumber(); String serialNumber = base.serialNumber();
DefaultAnnotations annotations = DefaultAnnotations.builder().build(); DefaultAnnotations annotations = DefaultAnnotations.builder().build();
annotations = merge(annotations, desc.getDeviceDesc().annotations()); annotations = merge(annotations, base.annotations());
for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) { for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
if (e.getKey().equals(primary)) { if (e.getKey().equals(primary)) {
@ -386,7 +424,14 @@ public class SimpleDeviceStore
hwVersion, swVersion, serialNumber, annotations); hwVersion, swVersion, serialNumber, annotations);
} }
// probably want composePort"s" also /**
* Returns a Port, merging description given from multiple Providers.
*
* @param device device the port is on
* @param number port number
* @param providerDescs Collection of Descriptions from multiple providers
* @return Port instance
*/
private Port composePort(Device device, PortNumber number, private Port composePort(Device device, PortNumber number,
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) { ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
@ -441,7 +486,9 @@ public class SimpleDeviceStore
public static final class InitDeviceDescs public static final class InitDeviceDescs
implements ConcurrentInitializer<DeviceDescriptions> { implements ConcurrentInitializer<DeviceDescriptions> {
private final DeviceDescription deviceDesc; private final DeviceDescription deviceDesc;
public InitDeviceDescs(DeviceDescription deviceDesc) { public InitDeviceDescs(DeviceDescription deviceDesc) {
this.deviceDesc = checkNotNull(deviceDesc); this.deviceDesc = checkNotNull(deviceDesc);
} }
@ -456,8 +503,6 @@ public class SimpleDeviceStore
* Collection of Description of a Device and it's Ports given from a Provider. * Collection of Description of a Device and it's Ports given from a Provider.
*/ */
private static class DeviceDescriptions { private static class DeviceDescriptions {
// private final DeviceId id;
// private final ProviderId pid;
private final AtomicReference<DeviceDescription> deviceDesc; private final AtomicReference<DeviceDescription> deviceDesc;
private final ConcurrentMap<PortNumber, PortDescription> portDescs; private final ConcurrentMap<PortNumber, PortDescription> portDescs;
@ -475,10 +520,6 @@ public class SimpleDeviceStore
return portDescs.get(number); return portDescs.get(number);
} }
public Collection<PortDescription> getPortDescs() {
return Collections.unmodifiableCollection(portDescs.values());
}
/** /**
* Puts DeviceDescription, merging annotations as necessary. * Puts DeviceDescription, merging annotations as necessary.
* *