diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java new file mode 100644 index 0000000000..7bbad93d5a --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java @@ -0,0 +1,486 @@ +package org.onlab.onos.store.link.impl; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.SetMultimap; + +import org.apache.commons.lang3.concurrent.ConcurrentUtils; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.onos.cluster.ClusterService; +import org.onlab.onos.net.AnnotationsUtil; +import org.onlab.onos.net.ConnectPoint; +import org.onlab.onos.net.DefaultAnnotations; +import org.onlab.onos.net.DefaultLink; +import org.onlab.onos.net.DeviceId; +import org.onlab.onos.net.Link; +import org.onlab.onos.net.SparseAnnotations; +import org.onlab.onos.net.Link.Type; +import org.onlab.onos.net.LinkKey; +import org.onlab.onos.net.Provided; +import org.onlab.onos.net.link.DefaultLinkDescription; +import org.onlab.onos.net.link.LinkDescription; +import org.onlab.onos.net.link.LinkEvent; +import org.onlab.onos.net.link.LinkStore; +import org.onlab.onos.net.link.LinkStoreDelegate; +import org.onlab.onos.net.provider.ProviderId; +import org.onlab.onos.store.AbstractStore; +import org.onlab.onos.store.ClockService; +import org.onlab.onos.store.Timestamp; +import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; +import org.onlab.onos.store.cluster.messaging.ClusterMessage; +import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; +import org.onlab.onos.store.cluster.messaging.MessageSubject; +import org.onlab.onos.store.common.impl.Timestamped; +import org.onlab.onos.store.serializers.DistributedStoreSerializers; +import org.onlab.onos.store.serializers.KryoSerializer; +import org.onlab.util.KryoPool; +import org.onlab.util.NewConcurrentHashMap; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.onlab.onos.net.DefaultAnnotations.union; +import static org.onlab.onos.net.DefaultAnnotations.merge; +import static org.onlab.onos.net.Link.Type.DIRECT; +import static org.onlab.onos.net.Link.Type.INDIRECT; +import static org.onlab.onos.net.link.LinkEvent.Type.*; +import static org.slf4j.LoggerFactory.getLogger; +import static com.google.common.collect.Multimaps.synchronizedSetMultimap; +import static com.google.common.base.Predicates.notNull; + +/** + * Manages inventory of infrastructure links in distributed data store + * that uses optimistic replication and gossip based techniques. + */ +@Component(immediate = true) +@Service +public class GossipLinkStore + extends AbstractStore + implements LinkStore { + + private final Logger log = getLogger(getClass()); + + // Link inventory + private final ConcurrentMap>> linkDescs = + new ConcurrentHashMap<>(); + + // Link instance cache + private final ConcurrentMap links = new ConcurrentHashMap<>(); + + // Egress and ingress link sets + private final SetMultimap srcLinks = createSynchronizedHashMultiMap(); + private final SetMultimap dstLinks = createSynchronizedHashMultiMap(); + + // Remove links + private final Map removedLinks = Maps.newHashMap(); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClockService clockService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterCommunicationService clusterCommunicator; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + private static final KryoSerializer SERIALIZER = new KryoSerializer() { + @Override + protected void setupKryoPool() { + serializerPool = KryoPool.newBuilder() + .register(DistributedStoreSerializers.COMMON) + .register(InternalLinkEvent.class) + .register(InternalLinkRemovedEvent.class) + .build() + .populate(1); + } + }; + + @Activate + public void activate() { + + clusterCommunicator.addSubscriber( + GossipLinkStoreMessageSubjects.LINK_UPDATE, new InternalLinkEventListener()); + clusterCommunicator.addSubscriber( + GossipLinkStoreMessageSubjects.LINK_REMOVED, new InternalLinkRemovedEventListener()); + + log.info("Started"); + } + + @Deactivate + public void deactivate() { + linkDescs.clear(); + links.clear(); + srcLinks.clear(); + dstLinks.clear(); + log.info("Stopped"); + } + + @Override + public int getLinkCount() { + return links.size(); + } + + @Override + public Iterable getLinks() { + return Collections.unmodifiableCollection(links.values()); + } + + @Override + public Set getDeviceEgressLinks(DeviceId deviceId) { + // lock for iteration + synchronized (srcLinks) { + return FluentIterable.from(srcLinks.get(deviceId)) + .transform(lookupLink()) + .filter(notNull()) + .toSet(); + } + } + + @Override + public Set getDeviceIngressLinks(DeviceId deviceId) { + // lock for iteration + synchronized (dstLinks) { + return FluentIterable.from(dstLinks.get(deviceId)) + .transform(lookupLink()) + .filter(notNull()) + .toSet(); + } + } + + @Override + public Link getLink(ConnectPoint src, ConnectPoint dst) { + return links.get(new LinkKey(src, dst)); + } + + @Override + public Set getEgressLinks(ConnectPoint src) { + Set egress = new HashSet<>(); + for (LinkKey linkKey : srcLinks.get(src.deviceId())) { + if (linkKey.src().equals(src)) { + egress.add(links.get(linkKey)); + } + } + return egress; + } + + @Override + public Set getIngressLinks(ConnectPoint dst) { + Set ingress = new HashSet<>(); + for (LinkKey linkKey : dstLinks.get(dst.deviceId())) { + if (linkKey.dst().equals(dst)) { + ingress.add(links.get(linkKey)); + } + } + return ingress; + } + + @Override + public LinkEvent createOrUpdateLink(ProviderId providerId, + LinkDescription linkDescription) { + + DeviceId dstDeviceId = linkDescription.dst().deviceId(); + Timestamp newTimestamp = clockService.getTimestamp(dstDeviceId); + + final Timestamped deltaDesc = new Timestamped<>(linkDescription, newTimestamp); + + LinkEvent event = createOrUpdateLinkInternal(providerId, deltaDesc); + + if (event != null) { + log.info("Notifying peers of a link update topology event from providerId: " + + "{} between src: {} and dst: {}", + providerId, linkDescription.src(), linkDescription.dst()); + try { + notifyPeers(new InternalLinkEvent(providerId, deltaDesc)); + } catch (IOException e) { + log.info("Failed to notify peers of a link update topology event from providerId: " + + "{} between src: {} and dst: {}", + providerId, linkDescription.src(), linkDescription.dst()); + } + } + return event; + } + + private LinkEvent createOrUpdateLinkInternal( + ProviderId providerId, + Timestamped linkDescription) { + + LinkKey key = new LinkKey(linkDescription.value().src(), linkDescription.value().dst()); + ConcurrentMap> descs = getLinkDescriptions(key); + + synchronized (descs) { + // if the link was previously removed, we should proceed if and + // only if this request is more recent. + Timestamp linkRemovedTimestamp = removedLinks.get(key); + if (linkRemovedTimestamp != null) { + if (linkDescription.isNewer(linkRemovedTimestamp)) { + removedLinks.remove(key); + } else { + return null; + } + } + + final Link oldLink = links.get(key); + // update description + createOrUpdateLinkDescription(descs, providerId, linkDescription); + final Link newLink = composeLink(descs); + if (oldLink == null) { + return createLink(key, newLink); + } + return updateLink(key, oldLink, newLink); + } + } + + // Guarded by linkDescs value (=locking each Link) + private Timestamped createOrUpdateLinkDescription( + ConcurrentMap> existingLinkDescriptions, + ProviderId providerId, + Timestamped linkDescription) { + + // merge existing attributes and merge + Timestamped existingLinkDescription = existingLinkDescriptions.get(providerId); + if (existingLinkDescription != null && existingLinkDescription.isNewer(linkDescription)) { + return null; + } + Timestamped newLinkDescription = linkDescription; + if (existingLinkDescription != null) { + SparseAnnotations merged = union(existingLinkDescription.value().annotations(), + linkDescription.value().annotations()); + newLinkDescription = new Timestamped( + new DefaultLinkDescription( + linkDescription.value().src(), + linkDescription.value().dst(), + linkDescription.value().type(), merged), + linkDescription.timestamp()); + } + return existingLinkDescriptions.put(providerId, newLinkDescription); + } + + // Creates and stores the link and returns the appropriate event. + // Guarded by linkDescs value (=locking each Link) + private LinkEvent createLink(LinkKey key, Link newLink) { + + if (newLink.providerId().isAncillary()) { + // TODO: revisit ancillary only Link handling + + // currently treating ancillary only as down (not visible outside) + return null; + } + + links.put(key, newLink); + srcLinks.put(newLink.src().deviceId(), key); + dstLinks.put(newLink.dst().deviceId(), key); + return new LinkEvent(LINK_ADDED, newLink); + } + + // Updates, if necessary the specified link and returns the appropriate event. + // Guarded by linkDescs value (=locking each Link) + private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) { + + if (newLink.providerId().isAncillary()) { + // TODO: revisit ancillary only Link handling + + // currently treating ancillary only as down (not visible outside) + return null; + } + + if ((oldLink.type() == INDIRECT && newLink.type() == DIRECT) || + !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) { + + links.put(key, newLink); + // strictly speaking following can be ommitted + srcLinks.put(oldLink.src().deviceId(), key); + dstLinks.put(oldLink.dst().deviceId(), key); + return new LinkEvent(LINK_UPDATED, newLink); + } + return null; + } + + @Override + public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) { + final LinkKey key = new LinkKey(src, dst); + + DeviceId dstDeviceId = dst.deviceId(); + Timestamp timestamp = clockService.getTimestamp(dstDeviceId); + + LinkEvent event = removeLinkInternal(key, timestamp); + + if (event != null) { + log.info("Notifying peers of a link removed topology event for a link " + + "between src: {} and dst: {}", src, dst); + try { + notifyPeers(new InternalLinkRemovedEvent(key, timestamp)); + } catch (IOException e) { + log.error("Failed to notify peers of a link removed topology event for a link " + + "between src: {} and dst: {}", src, dst); + } + } + return event; + } + + private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) { + ConcurrentMap> linkDescriptions = + getLinkDescriptions(key); + synchronized (linkDescriptions) { + // accept removal request if given timestamp is newer than + // the latest Timestamp from Primary provider + ProviderId primaryProviderId = pickPrimaryProviderId(linkDescriptions); + if (linkDescriptions.get(primaryProviderId).isNewer(timestamp)) { + return null; + } + removedLinks.put(key, timestamp); + Link link = links.remove(key); + linkDescriptions.clear(); + if (link != null) { + srcLinks.remove(link.src().deviceId(), key); + dstLinks.remove(link.dst().deviceId(), key); + return new LinkEvent(LINK_REMOVED, link); + } + return null; + } + } + + private static SetMultimap createSynchronizedHashMultiMap() { + return synchronizedSetMultimap(HashMultimap.create()); + } + + /** + * @return primary ProviderID, or randomly chosen one if none exists + */ + private ProviderId pickPrimaryProviderId( + ConcurrentMap> providerDescs) { + + ProviderId fallBackPrimary = null; + for (Entry> e : providerDescs.entrySet()) { + if (!e.getKey().isAncillary()) { + return e.getKey(); + } else if (fallBackPrimary == null) { + // pick randomly as a fallback in case there is no primary + fallBackPrimary = e.getKey(); + } + } + return fallBackPrimary; + } + + private Link composeLink(ConcurrentMap> linkDescriptions) { + ProviderId primaryProviderId = pickPrimaryProviderId(linkDescriptions); + Timestamped base = linkDescriptions.get(primaryProviderId); + + ConnectPoint src = base.value().src(); + ConnectPoint dst = base.value().dst(); + Type type = base.value().type(); + DefaultAnnotations annotations = DefaultAnnotations.builder().build(); + annotations = merge(annotations, base.value().annotations()); + + for (Entry> e : linkDescriptions.entrySet()) { + if (primaryProviderId.equals(e.getKey())) { + continue; + } + + // TODO: should keep track of Description timestamp + // and only merge conflicting keys when timestamp is newer + // Currently assuming there will never be a key conflict between + // providers + + // annotation merging. not so efficient, should revisit later + annotations = merge(annotations, e.getValue().value().annotations()); + } + + return new DefaultLink(primaryProviderId , src, dst, type, annotations); + } + + private ConcurrentMap> getLinkDescriptions(LinkKey key) { + return ConcurrentUtils.createIfAbsentUnchecked(linkDescs, key, + NewConcurrentHashMap.>ifNeeded()); + } + + private final Function lookupLink = new LookupLink(); + private Function lookupLink() { + return lookupLink; + } + + private final class LookupLink implements Function { + @Override + public Link apply(LinkKey input) { + return links.get(input); + } + } + + private static final Predicate IS_PRIMARY = new IsPrimary(); + private static final Predicate isPrimary() { + return IS_PRIMARY; + } + + private static final class IsPrimary implements Predicate { + + @Override + public boolean apply(Provided input) { + return !input.providerId().isAncillary(); + } + } + + private void notifyDelegateIfNotNull(LinkEvent event) { + if (event != null) { + notifyDelegate(event); + } + } + + // TODO: should we be throwing exception? + private void broadcastMessage(MessageSubject subject, Object event) throws IOException { + ClusterMessage message = new ClusterMessage( + clusterService.getLocalNode().id(), + subject, + SERIALIZER.encode(event)); + clusterCommunicator.broadcast(message); + } + + private void notifyPeers(InternalLinkEvent event) throws IOException { + broadcastMessage(GossipLinkStoreMessageSubjects.LINK_UPDATE, event); + } + + private void notifyPeers(InternalLinkRemovedEvent event) throws IOException { + broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event); + } + + private class InternalLinkEventListener implements ClusterMessageHandler { + @Override + public void handle(ClusterMessage message) { + + log.info("Received link event from peer: {}", message.sender()); + InternalLinkEvent event = (InternalLinkEvent) SERIALIZER.decode(message.payload()); + + ProviderId providerId = event.providerId(); + Timestamped linkDescription = event.linkDescription(); + + notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription)); + } + } + + private class InternalLinkRemovedEventListener implements ClusterMessageHandler { + @Override + public void handle(ClusterMessage message) { + + log.info("Received link removed event from peer: {}", message.sender()); + InternalLinkRemovedEvent event = (InternalLinkRemovedEvent) SERIALIZER.decode(message.payload()); + + LinkKey linkKey = event.linkKey(); + Timestamp timestamp = event.timestamp(); + + notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp)); + } + } +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStoreMessageSubjects.java new file mode 100644 index 0000000000..46e7186196 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStoreMessageSubjects.java @@ -0,0 +1,14 @@ +package org.onlab.onos.store.link.impl; + +import org.onlab.onos.store.cluster.messaging.MessageSubject; + +/** + * MessageSubjects used by GossipLinkStore peer-peer communication. + */ +public final class GossipLinkStoreMessageSubjects { + + private GossipLinkStoreMessageSubjects() {} + + public static final MessageSubject LINK_UPDATE = new MessageSubject("peer-link-update"); + public static final MessageSubject LINK_REMOVED = new MessageSubject("peer-link-removed"); +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/InternalLinkEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/InternalLinkEvent.java new file mode 100644 index 0000000000..9bb3445ab7 --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/InternalLinkEvent.java @@ -0,0 +1,46 @@ +package org.onlab.onos.store.link.impl; + +import com.google.common.base.MoreObjects; + +import org.onlab.onos.net.link.LinkDescription; +import org.onlab.onos.net.provider.ProviderId; +import org.onlab.onos.store.common.impl.Timestamped; + +/** + * Information published by GossipDeviceStore to notify peers of a device + * change event. + */ +public class InternalLinkEvent { + + private final ProviderId providerId; + private final Timestamped linkDescription; + + protected InternalLinkEvent( + ProviderId providerId, + Timestamped linkDescription) { + this.providerId = providerId; + this.linkDescription = linkDescription; + } + + public ProviderId providerId() { + return providerId; + } + + public Timestamped linkDescription() { + return linkDescription; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("providerId", providerId) + .add("linkDescription", linkDescription) + .toString(); + } + + // for serializer + protected InternalLinkEvent() { + this.providerId = null; + this.linkDescription = null; + } +} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/InternalLinkRemovedEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/InternalLinkRemovedEvent.java new file mode 100644 index 0000000000..22e65ed48a --- /dev/null +++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/InternalLinkRemovedEvent.java @@ -0,0 +1,49 @@ +package org.onlab.onos.store.link.impl; + +import org.onlab.onos.net.LinkKey; +import org.onlab.onos.store.Timestamp; + +import com.google.common.base.MoreObjects; + +/** + * Information published by GossipLinkStore to notify peers of a link + * being removed. + */ +public class InternalLinkRemovedEvent { + + private final LinkKey linkKey; + private final Timestamp timestamp; + + /** + * Creates a InternalLinkRemovedEvent. + * @param linkKey identifier of the removed link. + * @param timestamp timestamp of when the link was removed. + */ + public InternalLinkRemovedEvent(LinkKey linkKey, Timestamp timestamp) { + this.linkKey = linkKey; + this.timestamp = timestamp; + } + + public LinkKey linkKey() { + return linkKey; + } + + public Timestamp timestamp() { + return timestamp; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("linkKey", linkKey) + .add("timestamp", timestamp) + .toString(); + } + + // for serializer + @SuppressWarnings("unused") + private InternalLinkRemovedEvent() { + linkKey = null; + timestamp = null; + } +} \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java deleted file mode 100644 index 63245a33c3..0000000000 --- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java +++ /dev/null @@ -1,246 +0,0 @@ -package org.onlab.onos.store.link.impl; - -import static org.onlab.onos.net.Link.Type.DIRECT; -import static org.onlab.onos.net.Link.Type.INDIRECT; -import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED; -import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED; -import static org.onlab.onos.net.link.LinkEvent.Type.LINK_UPDATED; -import static org.slf4j.LoggerFactory.getLogger; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.Service; -import org.onlab.onos.net.ConnectPoint; -import org.onlab.onos.net.DefaultLink; -import org.onlab.onos.net.DeviceId; -import org.onlab.onos.net.Link; -import org.onlab.onos.net.LinkKey; -import org.onlab.onos.net.link.LinkDescription; -import org.onlab.onos.net.link.LinkEvent; -import org.onlab.onos.net.link.LinkStore; -import org.onlab.onos.net.link.LinkStoreDelegate; -import org.onlab.onos.net.provider.ProviderId; -import org.onlab.onos.store.AbstractStore; -import org.onlab.onos.store.ClockService; -import org.onlab.onos.store.Timestamp; -import org.slf4j.Logger; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multimap; -import com.google.common.collect.ImmutableSet.Builder; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -//TODO: Add support for multiple provider and annotations -/** - * Manages inventory of infrastructure links using a protocol that takes into consideration - * the order in which events occur. - */ -// FIXME: This does not yet implement the full protocol. -// The full protocol requires the sender of LLDP message to include the -// version information of src device/port and the receiver to -// take that into account when figuring out if a more recent src -// device/port down event renders the link discovery obsolete. -@Component(immediate = true) -@Service -public class OnosDistributedLinkStore - extends AbstractStore - implements LinkStore { - - private final Logger log = getLogger(getClass()); - - // Link inventory - private ConcurrentMap> links; - - public static final String LINK_NOT_FOUND = "Link between %s and %s not found"; - - // TODO synchronize? - // Egress and ingress link sets - private final Multimap> srcLinks = HashMultimap.create(); - private final Multimap> dstLinks = HashMultimap.create(); - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClockService clockService; - - @Activate - public void activate() { - - links = new ConcurrentHashMap<>(); - - log.info("Started"); - } - - @Deactivate - public void deactivate() { - log.info("Stopped"); - } - - @Override - public int getLinkCount() { - return links.size(); - } - - @Override - public Iterable getLinks() { - Builder builder = ImmutableSet.builder(); - synchronized (this) { - for (VersionedValue link : links.values()) { - builder.add(link.entity()); - } - return builder.build(); - } - } - - @Override - public Set getDeviceEgressLinks(DeviceId deviceId) { - Set> egressLinks = ImmutableSet.copyOf(srcLinks.get(deviceId)); - Set rawEgressLinks = new HashSet<>(); - for (VersionedValue link : egressLinks) { - rawEgressLinks.add(link.entity()); - } - return rawEgressLinks; - } - - @Override - public Set getDeviceIngressLinks(DeviceId deviceId) { - Set> ingressLinks = ImmutableSet.copyOf(dstLinks.get(deviceId)); - Set rawIngressLinks = new HashSet<>(); - for (VersionedValue link : ingressLinks) { - rawIngressLinks.add(link.entity()); - } - return rawIngressLinks; - } - - @Override - public Link getLink(ConnectPoint src, ConnectPoint dst) { - VersionedValue link = links.get(new LinkKey(src, dst)); - checkArgument(link != null, "LINK_NOT_FOUND", src, dst); - return link.entity(); - } - - @Override - public Set getEgressLinks(ConnectPoint src) { - Set egressLinks = new HashSet<>(); - for (VersionedValue link : srcLinks.get(src.deviceId())) { - if (link.entity().src().equals(src)) { - egressLinks.add(link.entity()); - } - } - return egressLinks; - } - - @Override - public Set getIngressLinks(ConnectPoint dst) { - Set ingressLinks = new HashSet<>(); - for (VersionedValue link : dstLinks.get(dst.deviceId())) { - if (link.entity().dst().equals(dst)) { - ingressLinks.add(link.entity()); - } - } - return ingressLinks; - } - - @Override - public LinkEvent createOrUpdateLink(ProviderId providerId, - LinkDescription linkDescription) { - - final DeviceId destinationDeviceId = linkDescription.dst().deviceId(); - final Timestamp newTimestamp = clockService.getTimestamp(destinationDeviceId); - - LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst()); - VersionedValue link = links.get(key); - if (link == null) { - return createLink(providerId, key, linkDescription, newTimestamp); - } - - checkState(newTimestamp.compareTo(link.timestamp()) > 0, - "Existing Link has a timestamp in the future!"); - - return updateLink(providerId, link, key, linkDescription, newTimestamp); - } - - // Creates and stores the link and returns the appropriate event. - private LinkEvent createLink(ProviderId providerId, LinkKey key, - LinkDescription linkDescription, Timestamp timestamp) { - VersionedValue link = new VersionedValue(new DefaultLink(providerId, key.src(), key.dst(), - linkDescription.type()), true, timestamp); - synchronized (this) { - links.put(key, link); - addNewLink(link, timestamp); - } - // FIXME: notify peers. - return new LinkEvent(LINK_ADDED, link.entity()); - } - - // update Egress and ingress link sets - private void addNewLink(VersionedValue link, Timestamp timestamp) { - Link rawLink = link.entity(); - synchronized (this) { - srcLinks.put(rawLink.src().deviceId(), link); - dstLinks.put(rawLink.dst().deviceId(), link); - } - } - - // Updates, if necessary the specified link and returns the appropriate event. - private LinkEvent updateLink(ProviderId providerId, VersionedValue existingLink, - LinkKey key, LinkDescription linkDescription, Timestamp timestamp) { - // FIXME confirm Link update condition is OK - if (existingLink.entity().type() == INDIRECT && linkDescription.type() == DIRECT) { - synchronized (this) { - - VersionedValue updatedLink = new VersionedValue( - new DefaultLink(providerId, existingLink.entity().src(), existingLink.entity().dst(), - linkDescription.type()), true, timestamp); - links.replace(key, existingLink, updatedLink); - - replaceLink(existingLink, updatedLink); - // FIXME: notify peers. - return new LinkEvent(LINK_UPDATED, updatedLink.entity()); - } - } - return null; - } - - // update Egress and ingress link sets - private void replaceLink(VersionedValue current, VersionedValue updated) { - synchronized (this) { - srcLinks.remove(current.entity().src().deviceId(), current); - dstLinks.remove(current.entity().dst().deviceId(), current); - - srcLinks.put(current.entity().src().deviceId(), updated); - dstLinks.put(current.entity().dst().deviceId(), updated); - } - } - - @Override - public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) { - synchronized (this) { - LinkKey key = new LinkKey(src, dst); - VersionedValue link = links.remove(key); - if (link != null) { - removeLink(link); - // notify peers - return new LinkEvent(LINK_REMOVED, link.entity()); - } - return null; - } - } - - // update Egress and ingress link sets - private void removeLink(VersionedValue link) { - synchronized (this) { - srcLinks.remove(link.entity().src().deviceId(), link); - dstLinks.remove(link.entity().dst().deviceId(), link); - } - } -} diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java index 103e6701fb..efecb6c7a1 100644 --- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java +++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java @@ -24,6 +24,7 @@ import org.onlab.onos.net.Port; import org.onlab.onos.net.PortNumber; import org.onlab.onos.net.device.DefaultDeviceDescription; import org.onlab.onos.net.device.DefaultPortDescription; +import org.onlab.onos.net.link.DefaultLinkDescription; import org.onlab.onos.net.provider.ProviderId; import org.onlab.onos.store.Timestamp; import org.onlab.packet.IpAddress; @@ -58,6 +59,7 @@ public final class KryoPoolUtil { DefaultControllerNode.class, DefaultDevice.class, DefaultDeviceDescription.class, + DefaultLinkDescription.class, MastershipRole.class, Port.class, DefaultPortDescription.class,