mirror of
https://github.com/opennetworkinglab/onos.git
synced 2025-10-22 12:51:09 +02:00
Added GossipLinkStore with basic replication for link state.
This commit is contained in:
parent
9fc87594fd
commit
2ff05591f5
486
core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
vendored
Normal file
486
core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
vendored
Normal file
@ -0,0 +1,486 @@
|
||||
package org.onlab.onos.store.link.impl;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.FluentIterable;
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.SetMultimap;
|
||||
|
||||
import org.apache.commons.lang3.concurrent.ConcurrentUtils;
|
||||
import org.apache.felix.scr.annotations.Activate;
|
||||
import org.apache.felix.scr.annotations.Component;
|
||||
import org.apache.felix.scr.annotations.Deactivate;
|
||||
import org.apache.felix.scr.annotations.Reference;
|
||||
import org.apache.felix.scr.annotations.ReferenceCardinality;
|
||||
import org.apache.felix.scr.annotations.Service;
|
||||
import org.onlab.onos.cluster.ClusterService;
|
||||
import org.onlab.onos.net.AnnotationsUtil;
|
||||
import org.onlab.onos.net.ConnectPoint;
|
||||
import org.onlab.onos.net.DefaultAnnotations;
|
||||
import org.onlab.onos.net.DefaultLink;
|
||||
import org.onlab.onos.net.DeviceId;
|
||||
import org.onlab.onos.net.Link;
|
||||
import org.onlab.onos.net.SparseAnnotations;
|
||||
import org.onlab.onos.net.Link.Type;
|
||||
import org.onlab.onos.net.LinkKey;
|
||||
import org.onlab.onos.net.Provided;
|
||||
import org.onlab.onos.net.link.DefaultLinkDescription;
|
||||
import org.onlab.onos.net.link.LinkDescription;
|
||||
import org.onlab.onos.net.link.LinkEvent;
|
||||
import org.onlab.onos.net.link.LinkStore;
|
||||
import org.onlab.onos.net.link.LinkStoreDelegate;
|
||||
import org.onlab.onos.net.provider.ProviderId;
|
||||
import org.onlab.onos.store.AbstractStore;
|
||||
import org.onlab.onos.store.ClockService;
|
||||
import org.onlab.onos.store.Timestamp;
|
||||
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
|
||||
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
|
||||
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
|
||||
import org.onlab.onos.store.cluster.messaging.MessageSubject;
|
||||
import org.onlab.onos.store.common.impl.Timestamped;
|
||||
import org.onlab.onos.store.serializers.DistributedStoreSerializers;
|
||||
import org.onlab.onos.store.serializers.KryoSerializer;
|
||||
import org.onlab.util.KryoPool;
|
||||
import org.onlab.util.NewConcurrentHashMap;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static org.onlab.onos.net.DefaultAnnotations.union;
|
||||
import static org.onlab.onos.net.DefaultAnnotations.merge;
|
||||
import static org.onlab.onos.net.Link.Type.DIRECT;
|
||||
import static org.onlab.onos.net.Link.Type.INDIRECT;
|
||||
import static org.onlab.onos.net.link.LinkEvent.Type.*;
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
|
||||
import static com.google.common.base.Predicates.notNull;
|
||||
|
||||
/**
|
||||
* Manages inventory of infrastructure links in distributed data store
|
||||
* that uses optimistic replication and gossip based techniques.
|
||||
*/
|
||||
@Component(immediate = true)
|
||||
@Service
|
||||
public class GossipLinkStore
|
||||
extends AbstractStore<LinkEvent, LinkStoreDelegate>
|
||||
implements LinkStore {
|
||||
|
||||
private final Logger log = getLogger(getClass());
|
||||
|
||||
// Link inventory
|
||||
private final ConcurrentMap<LinkKey, ConcurrentMap<ProviderId, Timestamped<LinkDescription>>> linkDescs =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
// Link instance cache
|
||||
private final ConcurrentMap<LinkKey, Link> links = new ConcurrentHashMap<>();
|
||||
|
||||
// Egress and ingress link sets
|
||||
private final SetMultimap<DeviceId, LinkKey> srcLinks = createSynchronizedHashMultiMap();
|
||||
private final SetMultimap<DeviceId, LinkKey> dstLinks = createSynchronizedHashMultiMap();
|
||||
|
||||
// Remove links
|
||||
private final Map<LinkKey, Timestamp> removedLinks = Maps.newHashMap();
|
||||
|
||||
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
|
||||
protected ClockService clockService;
|
||||
|
||||
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
|
||||
protected ClusterCommunicationService clusterCommunicator;
|
||||
|
||||
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
|
||||
protected ClusterService clusterService;
|
||||
|
||||
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
|
||||
@Override
|
||||
protected void setupKryoPool() {
|
||||
serializerPool = KryoPool.newBuilder()
|
||||
.register(DistributedStoreSerializers.COMMON)
|
||||
.register(InternalLinkEvent.class)
|
||||
.register(InternalLinkRemovedEvent.class)
|
||||
.build()
|
||||
.populate(1);
|
||||
}
|
||||
};
|
||||
|
||||
@Activate
|
||||
public void activate() {
|
||||
|
||||
clusterCommunicator.addSubscriber(
|
||||
GossipLinkStoreMessageSubjects.LINK_UPDATE, new InternalLinkEventListener());
|
||||
clusterCommunicator.addSubscriber(
|
||||
GossipLinkStoreMessageSubjects.LINK_REMOVED, new InternalLinkRemovedEventListener());
|
||||
|
||||
log.info("Started");
|
||||
}
|
||||
|
||||
@Deactivate
|
||||
public void deactivate() {
|
||||
linkDescs.clear();
|
||||
links.clear();
|
||||
srcLinks.clear();
|
||||
dstLinks.clear();
|
||||
log.info("Stopped");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getLinkCount() {
|
||||
return links.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<Link> getLinks() {
|
||||
return Collections.unmodifiableCollection(links.values());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
|
||||
// lock for iteration
|
||||
synchronized (srcLinks) {
|
||||
return FluentIterable.from(srcLinks.get(deviceId))
|
||||
.transform(lookupLink())
|
||||
.filter(notNull())
|
||||
.toSet();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
|
||||
// lock for iteration
|
||||
synchronized (dstLinks) {
|
||||
return FluentIterable.from(dstLinks.get(deviceId))
|
||||
.transform(lookupLink())
|
||||
.filter(notNull())
|
||||
.toSet();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Link getLink(ConnectPoint src, ConnectPoint dst) {
|
||||
return links.get(new LinkKey(src, dst));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Link> getEgressLinks(ConnectPoint src) {
|
||||
Set<Link> egress = new HashSet<>();
|
||||
for (LinkKey linkKey : srcLinks.get(src.deviceId())) {
|
||||
if (linkKey.src().equals(src)) {
|
||||
egress.add(links.get(linkKey));
|
||||
}
|
||||
}
|
||||
return egress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Link> getIngressLinks(ConnectPoint dst) {
|
||||
Set<Link> ingress = new HashSet<>();
|
||||
for (LinkKey linkKey : dstLinks.get(dst.deviceId())) {
|
||||
if (linkKey.dst().equals(dst)) {
|
||||
ingress.add(links.get(linkKey));
|
||||
}
|
||||
}
|
||||
return ingress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LinkEvent createOrUpdateLink(ProviderId providerId,
|
||||
LinkDescription linkDescription) {
|
||||
|
||||
DeviceId dstDeviceId = linkDescription.dst().deviceId();
|
||||
Timestamp newTimestamp = clockService.getTimestamp(dstDeviceId);
|
||||
|
||||
final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
|
||||
|
||||
LinkEvent event = createOrUpdateLinkInternal(providerId, deltaDesc);
|
||||
|
||||
if (event != null) {
|
||||
log.info("Notifying peers of a link update topology event from providerId: "
|
||||
+ "{} between src: {} and dst: {}",
|
||||
providerId, linkDescription.src(), linkDescription.dst());
|
||||
try {
|
||||
notifyPeers(new InternalLinkEvent(providerId, deltaDesc));
|
||||
} catch (IOException e) {
|
||||
log.info("Failed to notify peers of a link update topology event from providerId: "
|
||||
+ "{} between src: {} and dst: {}",
|
||||
providerId, linkDescription.src(), linkDescription.dst());
|
||||
}
|
||||
}
|
||||
return event;
|
||||
}
|
||||
|
||||
private LinkEvent createOrUpdateLinkInternal(
|
||||
ProviderId providerId,
|
||||
Timestamped<LinkDescription> linkDescription) {
|
||||
|
||||
LinkKey key = new LinkKey(linkDescription.value().src(), linkDescription.value().dst());
|
||||
ConcurrentMap<ProviderId, Timestamped<LinkDescription>> descs = getLinkDescriptions(key);
|
||||
|
||||
synchronized (descs) {
|
||||
// if the link was previously removed, we should proceed if and
|
||||
// only if this request is more recent.
|
||||
Timestamp linkRemovedTimestamp = removedLinks.get(key);
|
||||
if (linkRemovedTimestamp != null) {
|
||||
if (linkDescription.isNewer(linkRemovedTimestamp)) {
|
||||
removedLinks.remove(key);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
final Link oldLink = links.get(key);
|
||||
// update description
|
||||
createOrUpdateLinkDescription(descs, providerId, linkDescription);
|
||||
final Link newLink = composeLink(descs);
|
||||
if (oldLink == null) {
|
||||
return createLink(key, newLink);
|
||||
}
|
||||
return updateLink(key, oldLink, newLink);
|
||||
}
|
||||
}
|
||||
|
||||
// Guarded by linkDescs value (=locking each Link)
|
||||
private Timestamped<LinkDescription> createOrUpdateLinkDescription(
|
||||
ConcurrentMap<ProviderId, Timestamped<LinkDescription>> existingLinkDescriptions,
|
||||
ProviderId providerId,
|
||||
Timestamped<LinkDescription> linkDescription) {
|
||||
|
||||
// merge existing attributes and merge
|
||||
Timestamped<LinkDescription> existingLinkDescription = existingLinkDescriptions.get(providerId);
|
||||
if (existingLinkDescription != null && existingLinkDescription.isNewer(linkDescription)) {
|
||||
return null;
|
||||
}
|
||||
Timestamped<LinkDescription> newLinkDescription = linkDescription;
|
||||
if (existingLinkDescription != null) {
|
||||
SparseAnnotations merged = union(existingLinkDescription.value().annotations(),
|
||||
linkDescription.value().annotations());
|
||||
newLinkDescription = new Timestamped<LinkDescription>(
|
||||
new DefaultLinkDescription(
|
||||
linkDescription.value().src(),
|
||||
linkDescription.value().dst(),
|
||||
linkDescription.value().type(), merged),
|
||||
linkDescription.timestamp());
|
||||
}
|
||||
return existingLinkDescriptions.put(providerId, newLinkDescription);
|
||||
}
|
||||
|
||||
// Creates and stores the link and returns the appropriate event.
|
||||
// Guarded by linkDescs value (=locking each Link)
|
||||
private LinkEvent createLink(LinkKey key, Link newLink) {
|
||||
|
||||
if (newLink.providerId().isAncillary()) {
|
||||
// TODO: revisit ancillary only Link handling
|
||||
|
||||
// currently treating ancillary only as down (not visible outside)
|
||||
return null;
|
||||
}
|
||||
|
||||
links.put(key, newLink);
|
||||
srcLinks.put(newLink.src().deviceId(), key);
|
||||
dstLinks.put(newLink.dst().deviceId(), key);
|
||||
return new LinkEvent(LINK_ADDED, newLink);
|
||||
}
|
||||
|
||||
// Updates, if necessary the specified link and returns the appropriate event.
|
||||
// Guarded by linkDescs value (=locking each Link)
|
||||
private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
|
||||
|
||||
if (newLink.providerId().isAncillary()) {
|
||||
// TODO: revisit ancillary only Link handling
|
||||
|
||||
// currently treating ancillary only as down (not visible outside)
|
||||
return null;
|
||||
}
|
||||
|
||||
if ((oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
|
||||
!AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
|
||||
|
||||
links.put(key, newLink);
|
||||
// strictly speaking following can be ommitted
|
||||
srcLinks.put(oldLink.src().deviceId(), key);
|
||||
dstLinks.put(oldLink.dst().deviceId(), key);
|
||||
return new LinkEvent(LINK_UPDATED, newLink);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
|
||||
final LinkKey key = new LinkKey(src, dst);
|
||||
|
||||
DeviceId dstDeviceId = dst.deviceId();
|
||||
Timestamp timestamp = clockService.getTimestamp(dstDeviceId);
|
||||
|
||||
LinkEvent event = removeLinkInternal(key, timestamp);
|
||||
|
||||
if (event != null) {
|
||||
log.info("Notifying peers of a link removed topology event for a link "
|
||||
+ "between src: {} and dst: {}", src, dst);
|
||||
try {
|
||||
notifyPeers(new InternalLinkRemovedEvent(key, timestamp));
|
||||
} catch (IOException e) {
|
||||
log.error("Failed to notify peers of a link removed topology event for a link "
|
||||
+ "between src: {} and dst: {}", src, dst);
|
||||
}
|
||||
}
|
||||
return event;
|
||||
}
|
||||
|
||||
private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) {
|
||||
ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDescriptions =
|
||||
getLinkDescriptions(key);
|
||||
synchronized (linkDescriptions) {
|
||||
// accept removal request if given timestamp is newer than
|
||||
// the latest Timestamp from Primary provider
|
||||
ProviderId primaryProviderId = pickPrimaryProviderId(linkDescriptions);
|
||||
if (linkDescriptions.get(primaryProviderId).isNewer(timestamp)) {
|
||||
return null;
|
||||
}
|
||||
removedLinks.put(key, timestamp);
|
||||
Link link = links.remove(key);
|
||||
linkDescriptions.clear();
|
||||
if (link != null) {
|
||||
srcLinks.remove(link.src().deviceId(), key);
|
||||
dstLinks.remove(link.dst().deviceId(), key);
|
||||
return new LinkEvent(LINK_REMOVED, link);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static <K, V> SetMultimap<K, V> createSynchronizedHashMultiMap() {
|
||||
return synchronizedSetMultimap(HashMultimap.<K, V>create());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return primary ProviderID, or randomly chosen one if none exists
|
||||
*/
|
||||
private ProviderId pickPrimaryProviderId(
|
||||
ConcurrentMap<ProviderId, Timestamped<LinkDescription>> providerDescs) {
|
||||
|
||||
ProviderId fallBackPrimary = null;
|
||||
for (Entry<ProviderId, Timestamped<LinkDescription>> e : providerDescs.entrySet()) {
|
||||
if (!e.getKey().isAncillary()) {
|
||||
return e.getKey();
|
||||
} else if (fallBackPrimary == null) {
|
||||
// pick randomly as a fallback in case there is no primary
|
||||
fallBackPrimary = e.getKey();
|
||||
}
|
||||
}
|
||||
return fallBackPrimary;
|
||||
}
|
||||
|
||||
private Link composeLink(ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
|
||||
ProviderId primaryProviderId = pickPrimaryProviderId(linkDescriptions);
|
||||
Timestamped<LinkDescription> base = linkDescriptions.get(primaryProviderId);
|
||||
|
||||
ConnectPoint src = base.value().src();
|
||||
ConnectPoint dst = base.value().dst();
|
||||
Type type = base.value().type();
|
||||
DefaultAnnotations annotations = DefaultAnnotations.builder().build();
|
||||
annotations = merge(annotations, base.value().annotations());
|
||||
|
||||
for (Entry<ProviderId, Timestamped<LinkDescription>> e : linkDescriptions.entrySet()) {
|
||||
if (primaryProviderId.equals(e.getKey())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: should keep track of Description timestamp
|
||||
// and only merge conflicting keys when timestamp is newer
|
||||
// Currently assuming there will never be a key conflict between
|
||||
// providers
|
||||
|
||||
// annotation merging. not so efficient, should revisit later
|
||||
annotations = merge(annotations, e.getValue().value().annotations());
|
||||
}
|
||||
|
||||
return new DefaultLink(primaryProviderId , src, dst, type, annotations);
|
||||
}
|
||||
|
||||
private ConcurrentMap<ProviderId, Timestamped<LinkDescription>> getLinkDescriptions(LinkKey key) {
|
||||
return ConcurrentUtils.createIfAbsentUnchecked(linkDescs, key,
|
||||
NewConcurrentHashMap.<ProviderId, Timestamped<LinkDescription>>ifNeeded());
|
||||
}
|
||||
|
||||
private final Function<LinkKey, Link> lookupLink = new LookupLink();
|
||||
private Function<LinkKey, Link> lookupLink() {
|
||||
return lookupLink;
|
||||
}
|
||||
|
||||
private final class LookupLink implements Function<LinkKey, Link> {
|
||||
@Override
|
||||
public Link apply(LinkKey input) {
|
||||
return links.get(input);
|
||||
}
|
||||
}
|
||||
|
||||
private static final Predicate<Provided> IS_PRIMARY = new IsPrimary();
|
||||
private static final Predicate<Provided> isPrimary() {
|
||||
return IS_PRIMARY;
|
||||
}
|
||||
|
||||
private static final class IsPrimary implements Predicate<Provided> {
|
||||
|
||||
@Override
|
||||
public boolean apply(Provided input) {
|
||||
return !input.providerId().isAncillary();
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyDelegateIfNotNull(LinkEvent event) {
|
||||
if (event != null) {
|
||||
notifyDelegate(event);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: should we be throwing exception?
|
||||
private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
|
||||
ClusterMessage message = new ClusterMessage(
|
||||
clusterService.getLocalNode().id(),
|
||||
subject,
|
||||
SERIALIZER.encode(event));
|
||||
clusterCommunicator.broadcast(message);
|
||||
}
|
||||
|
||||
private void notifyPeers(InternalLinkEvent event) throws IOException {
|
||||
broadcastMessage(GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
|
||||
}
|
||||
|
||||
private void notifyPeers(InternalLinkRemovedEvent event) throws IOException {
|
||||
broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
|
||||
}
|
||||
|
||||
private class InternalLinkEventListener implements ClusterMessageHandler {
|
||||
@Override
|
||||
public void handle(ClusterMessage message) {
|
||||
|
||||
log.info("Received link event from peer: {}", message.sender());
|
||||
InternalLinkEvent event = (InternalLinkEvent) SERIALIZER.decode(message.payload());
|
||||
|
||||
ProviderId providerId = event.providerId();
|
||||
Timestamped<LinkDescription> linkDescription = event.linkDescription();
|
||||
|
||||
notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
|
||||
}
|
||||
}
|
||||
|
||||
private class InternalLinkRemovedEventListener implements ClusterMessageHandler {
|
||||
@Override
|
||||
public void handle(ClusterMessage message) {
|
||||
|
||||
log.info("Received link removed event from peer: {}", message.sender());
|
||||
InternalLinkRemovedEvent event = (InternalLinkRemovedEvent) SERIALIZER.decode(message.payload());
|
||||
|
||||
LinkKey linkKey = event.linkKey();
|
||||
Timestamp timestamp = event.timestamp();
|
||||
|
||||
notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
|
||||
}
|
||||
}
|
||||
}
|
14
core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStoreMessageSubjects.java
vendored
Normal file
14
core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStoreMessageSubjects.java
vendored
Normal file
@ -0,0 +1,14 @@
|
||||
package org.onlab.onos.store.link.impl;
|
||||
|
||||
import org.onlab.onos.store.cluster.messaging.MessageSubject;
|
||||
|
||||
/**
|
||||
* MessageSubjects used by GossipLinkStore peer-peer communication.
|
||||
*/
|
||||
public final class GossipLinkStoreMessageSubjects {
|
||||
|
||||
private GossipLinkStoreMessageSubjects() {}
|
||||
|
||||
public static final MessageSubject LINK_UPDATE = new MessageSubject("peer-link-update");
|
||||
public static final MessageSubject LINK_REMOVED = new MessageSubject("peer-link-removed");
|
||||
}
|
46
core/store/dist/src/main/java/org/onlab/onos/store/link/impl/InternalLinkEvent.java
vendored
Normal file
46
core/store/dist/src/main/java/org/onlab/onos/store/link/impl/InternalLinkEvent.java
vendored
Normal file
@ -0,0 +1,46 @@
|
||||
package org.onlab.onos.store.link.impl;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
|
||||
import org.onlab.onos.net.link.LinkDescription;
|
||||
import org.onlab.onos.net.provider.ProviderId;
|
||||
import org.onlab.onos.store.common.impl.Timestamped;
|
||||
|
||||
/**
|
||||
* Information published by GossipDeviceStore to notify peers of a device
|
||||
* change event.
|
||||
*/
|
||||
public class InternalLinkEvent {
|
||||
|
||||
private final ProviderId providerId;
|
||||
private final Timestamped<LinkDescription> linkDescription;
|
||||
|
||||
protected InternalLinkEvent(
|
||||
ProviderId providerId,
|
||||
Timestamped<LinkDescription> linkDescription) {
|
||||
this.providerId = providerId;
|
||||
this.linkDescription = linkDescription;
|
||||
}
|
||||
|
||||
public ProviderId providerId() {
|
||||
return providerId;
|
||||
}
|
||||
|
||||
public Timestamped<LinkDescription> linkDescription() {
|
||||
return linkDescription;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(getClass())
|
||||
.add("providerId", providerId)
|
||||
.add("linkDescription", linkDescription)
|
||||
.toString();
|
||||
}
|
||||
|
||||
// for serializer
|
||||
protected InternalLinkEvent() {
|
||||
this.providerId = null;
|
||||
this.linkDescription = null;
|
||||
}
|
||||
}
|
49
core/store/dist/src/main/java/org/onlab/onos/store/link/impl/InternalLinkRemovedEvent.java
vendored
Normal file
49
core/store/dist/src/main/java/org/onlab/onos/store/link/impl/InternalLinkRemovedEvent.java
vendored
Normal file
@ -0,0 +1,49 @@
|
||||
package org.onlab.onos.store.link.impl;
|
||||
|
||||
import org.onlab.onos.net.LinkKey;
|
||||
import org.onlab.onos.store.Timestamp;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
|
||||
/**
|
||||
* Information published by GossipLinkStore to notify peers of a link
|
||||
* being removed.
|
||||
*/
|
||||
public class InternalLinkRemovedEvent {
|
||||
|
||||
private final LinkKey linkKey;
|
||||
private final Timestamp timestamp;
|
||||
|
||||
/**
|
||||
* Creates a InternalLinkRemovedEvent.
|
||||
* @param linkKey identifier of the removed link.
|
||||
* @param timestamp timestamp of when the link was removed.
|
||||
*/
|
||||
public InternalLinkRemovedEvent(LinkKey linkKey, Timestamp timestamp) {
|
||||
this.linkKey = linkKey;
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
public LinkKey linkKey() {
|
||||
return linkKey;
|
||||
}
|
||||
|
||||
public Timestamp timestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(getClass())
|
||||
.add("linkKey", linkKey)
|
||||
.add("timestamp", timestamp)
|
||||
.toString();
|
||||
}
|
||||
|
||||
// for serializer
|
||||
@SuppressWarnings("unused")
|
||||
private InternalLinkRemovedEvent() {
|
||||
linkKey = null;
|
||||
timestamp = null;
|
||||
}
|
||||
}
|
@ -1,246 +0,0 @@
|
||||
package org.onlab.onos.store.link.impl;
|
||||
|
||||
import static org.onlab.onos.net.Link.Type.DIRECT;
|
||||
import static org.onlab.onos.net.Link.Type.INDIRECT;
|
||||
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED;
|
||||
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED;
|
||||
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_UPDATED;
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.felix.scr.annotations.Activate;
|
||||
import org.apache.felix.scr.annotations.Component;
|
||||
import org.apache.felix.scr.annotations.Deactivate;
|
||||
import org.apache.felix.scr.annotations.Reference;
|
||||
import org.apache.felix.scr.annotations.ReferenceCardinality;
|
||||
import org.apache.felix.scr.annotations.Service;
|
||||
import org.onlab.onos.net.ConnectPoint;
|
||||
import org.onlab.onos.net.DefaultLink;
|
||||
import org.onlab.onos.net.DeviceId;
|
||||
import org.onlab.onos.net.Link;
|
||||
import org.onlab.onos.net.LinkKey;
|
||||
import org.onlab.onos.net.link.LinkDescription;
|
||||
import org.onlab.onos.net.link.LinkEvent;
|
||||
import org.onlab.onos.net.link.LinkStore;
|
||||
import org.onlab.onos.net.link.LinkStoreDelegate;
|
||||
import org.onlab.onos.net.provider.ProviderId;
|
||||
import org.onlab.onos.store.AbstractStore;
|
||||
import org.onlab.onos.store.ClockService;
|
||||
import org.onlab.onos.store.Timestamp;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.ImmutableSet.Builder;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
|
||||
//TODO: Add support for multiple provider and annotations
|
||||
/**
|
||||
* Manages inventory of infrastructure links using a protocol that takes into consideration
|
||||
* the order in which events occur.
|
||||
*/
|
||||
// FIXME: This does not yet implement the full protocol.
|
||||
// The full protocol requires the sender of LLDP message to include the
|
||||
// version information of src device/port and the receiver to
|
||||
// take that into account when figuring out if a more recent src
|
||||
// device/port down event renders the link discovery obsolete.
|
||||
@Component(immediate = true)
|
||||
@Service
|
||||
public class OnosDistributedLinkStore
|
||||
extends AbstractStore<LinkEvent, LinkStoreDelegate>
|
||||
implements LinkStore {
|
||||
|
||||
private final Logger log = getLogger(getClass());
|
||||
|
||||
// Link inventory
|
||||
private ConcurrentMap<LinkKey, VersionedValue<Link>> links;
|
||||
|
||||
public static final String LINK_NOT_FOUND = "Link between %s and %s not found";
|
||||
|
||||
// TODO synchronize?
|
||||
// Egress and ingress link sets
|
||||
private final Multimap<DeviceId, VersionedValue<Link>> srcLinks = HashMultimap.create();
|
||||
private final Multimap<DeviceId, VersionedValue<Link>> dstLinks = HashMultimap.create();
|
||||
|
||||
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
|
||||
protected ClockService clockService;
|
||||
|
||||
@Activate
|
||||
public void activate() {
|
||||
|
||||
links = new ConcurrentHashMap<>();
|
||||
|
||||
log.info("Started");
|
||||
}
|
||||
|
||||
@Deactivate
|
||||
public void deactivate() {
|
||||
log.info("Stopped");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getLinkCount() {
|
||||
return links.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<Link> getLinks() {
|
||||
Builder<Link> builder = ImmutableSet.builder();
|
||||
synchronized (this) {
|
||||
for (VersionedValue<Link> link : links.values()) {
|
||||
builder.add(link.entity());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
|
||||
Set<VersionedValue<Link>> egressLinks = ImmutableSet.copyOf(srcLinks.get(deviceId));
|
||||
Set<Link> rawEgressLinks = new HashSet<>();
|
||||
for (VersionedValue<Link> link : egressLinks) {
|
||||
rawEgressLinks.add(link.entity());
|
||||
}
|
||||
return rawEgressLinks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
|
||||
Set<VersionedValue<Link>> ingressLinks = ImmutableSet.copyOf(dstLinks.get(deviceId));
|
||||
Set<Link> rawIngressLinks = new HashSet<>();
|
||||
for (VersionedValue<Link> link : ingressLinks) {
|
||||
rawIngressLinks.add(link.entity());
|
||||
}
|
||||
return rawIngressLinks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Link getLink(ConnectPoint src, ConnectPoint dst) {
|
||||
VersionedValue<Link> link = links.get(new LinkKey(src, dst));
|
||||
checkArgument(link != null, "LINK_NOT_FOUND", src, dst);
|
||||
return link.entity();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Link> getEgressLinks(ConnectPoint src) {
|
||||
Set<Link> egressLinks = new HashSet<>();
|
||||
for (VersionedValue<Link> link : srcLinks.get(src.deviceId())) {
|
||||
if (link.entity().src().equals(src)) {
|
||||
egressLinks.add(link.entity());
|
||||
}
|
||||
}
|
||||
return egressLinks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Link> getIngressLinks(ConnectPoint dst) {
|
||||
Set<Link> ingressLinks = new HashSet<>();
|
||||
for (VersionedValue<Link> link : dstLinks.get(dst.deviceId())) {
|
||||
if (link.entity().dst().equals(dst)) {
|
||||
ingressLinks.add(link.entity());
|
||||
}
|
||||
}
|
||||
return ingressLinks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LinkEvent createOrUpdateLink(ProviderId providerId,
|
||||
LinkDescription linkDescription) {
|
||||
|
||||
final DeviceId destinationDeviceId = linkDescription.dst().deviceId();
|
||||
final Timestamp newTimestamp = clockService.getTimestamp(destinationDeviceId);
|
||||
|
||||
LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst());
|
||||
VersionedValue<Link> link = links.get(key);
|
||||
if (link == null) {
|
||||
return createLink(providerId, key, linkDescription, newTimestamp);
|
||||
}
|
||||
|
||||
checkState(newTimestamp.compareTo(link.timestamp()) > 0,
|
||||
"Existing Link has a timestamp in the future!");
|
||||
|
||||
return updateLink(providerId, link, key, linkDescription, newTimestamp);
|
||||
}
|
||||
|
||||
// Creates and stores the link and returns the appropriate event.
|
||||
private LinkEvent createLink(ProviderId providerId, LinkKey key,
|
||||
LinkDescription linkDescription, Timestamp timestamp) {
|
||||
VersionedValue<Link> link = new VersionedValue<Link>(new DefaultLink(providerId, key.src(), key.dst(),
|
||||
linkDescription.type()), true, timestamp);
|
||||
synchronized (this) {
|
||||
links.put(key, link);
|
||||
addNewLink(link, timestamp);
|
||||
}
|
||||
// FIXME: notify peers.
|
||||
return new LinkEvent(LINK_ADDED, link.entity());
|
||||
}
|
||||
|
||||
// update Egress and ingress link sets
|
||||
private void addNewLink(VersionedValue<Link> link, Timestamp timestamp) {
|
||||
Link rawLink = link.entity();
|
||||
synchronized (this) {
|
||||
srcLinks.put(rawLink.src().deviceId(), link);
|
||||
dstLinks.put(rawLink.dst().deviceId(), link);
|
||||
}
|
||||
}
|
||||
|
||||
// Updates, if necessary the specified link and returns the appropriate event.
|
||||
private LinkEvent updateLink(ProviderId providerId, VersionedValue<Link> existingLink,
|
||||
LinkKey key, LinkDescription linkDescription, Timestamp timestamp) {
|
||||
// FIXME confirm Link update condition is OK
|
||||
if (existingLink.entity().type() == INDIRECT && linkDescription.type() == DIRECT) {
|
||||
synchronized (this) {
|
||||
|
||||
VersionedValue<Link> updatedLink = new VersionedValue<Link>(
|
||||
new DefaultLink(providerId, existingLink.entity().src(), existingLink.entity().dst(),
|
||||
linkDescription.type()), true, timestamp);
|
||||
links.replace(key, existingLink, updatedLink);
|
||||
|
||||
replaceLink(existingLink, updatedLink);
|
||||
// FIXME: notify peers.
|
||||
return new LinkEvent(LINK_UPDATED, updatedLink.entity());
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// update Egress and ingress link sets
|
||||
private void replaceLink(VersionedValue<Link> current, VersionedValue<Link> updated) {
|
||||
synchronized (this) {
|
||||
srcLinks.remove(current.entity().src().deviceId(), current);
|
||||
dstLinks.remove(current.entity().dst().deviceId(), current);
|
||||
|
||||
srcLinks.put(current.entity().src().deviceId(), updated);
|
||||
dstLinks.put(current.entity().dst().deviceId(), updated);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
|
||||
synchronized (this) {
|
||||
LinkKey key = new LinkKey(src, dst);
|
||||
VersionedValue<Link> link = links.remove(key);
|
||||
if (link != null) {
|
||||
removeLink(link);
|
||||
// notify peers
|
||||
return new LinkEvent(LINK_REMOVED, link.entity());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// update Egress and ingress link sets
|
||||
private void removeLink(VersionedValue<Link> link) {
|
||||
synchronized (this) {
|
||||
srcLinks.remove(link.entity().src().deviceId(), link);
|
||||
dstLinks.remove(link.entity().dst().deviceId(), link);
|
||||
}
|
||||
}
|
||||
}
|
@ -24,6 +24,7 @@ import org.onlab.onos.net.Port;
|
||||
import org.onlab.onos.net.PortNumber;
|
||||
import org.onlab.onos.net.device.DefaultDeviceDescription;
|
||||
import org.onlab.onos.net.device.DefaultPortDescription;
|
||||
import org.onlab.onos.net.link.DefaultLinkDescription;
|
||||
import org.onlab.onos.net.provider.ProviderId;
|
||||
import org.onlab.onos.store.Timestamp;
|
||||
import org.onlab.packet.IpAddress;
|
||||
@ -58,6 +59,7 @@ public final class KryoPoolUtil {
|
||||
DefaultControllerNode.class,
|
||||
DefaultDevice.class,
|
||||
DefaultDeviceDescription.class,
|
||||
DefaultLinkDescription.class,
|
||||
MastershipRole.class,
|
||||
Port.class,
|
||||
DefaultPortDescription.class,
|
||||
|
Loading…
x
Reference in New Issue
Block a user