diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/ECLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/ECLinkStore.java new file mode 100644 index 0000000000..ac7f6bc2c6 --- /dev/null +++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/ECLinkStore.java @@ -0,0 +1,384 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.link.impl; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.onosproject.net.DefaultAnnotations.merge; +import static org.onosproject.net.DefaultAnnotations.union; +import static org.onosproject.net.Link.State.ACTIVE; +import static org.onosproject.net.Link.State.INACTIVE; +import static org.onosproject.net.Link.Type.DIRECT; +import static org.onosproject.net.Link.Type.INDIRECT; +import static org.onosproject.net.LinkKey.linkKey; +import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED; +import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED; +import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED; +import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT; +import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE; +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.apache.felix.scr.annotations.Service; +import org.onlab.util.KryoNamespace; +import org.onlab.util.SharedExecutors; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.NodeId; +import org.onosproject.mastership.MastershipService; +import org.onosproject.net.AnnotationKeys; +import org.onosproject.net.AnnotationsUtil; +import org.onosproject.net.ConnectPoint; +import org.onosproject.net.DefaultAnnotations; +import org.onosproject.net.DefaultLink; +import org.onosproject.net.DeviceId; +import org.onosproject.net.Link; +import org.onosproject.net.LinkKey; +import org.onosproject.net.Link.Type; +import org.onosproject.net.device.DeviceClockService; +import org.onosproject.net.link.DefaultLinkDescription; +import org.onosproject.net.link.LinkDescription; +import org.onosproject.net.link.LinkEvent; +import org.onosproject.net.link.LinkStore; +import org.onosproject.net.link.LinkStoreDelegate; +import org.onosproject.net.provider.ProviderId; +import org.onosproject.store.AbstractStore; +import org.onosproject.store.cluster.messaging.ClusterCommunicationService; +import org.onosproject.store.cluster.messaging.MessageSubject; +import org.onosproject.store.impl.MastershipBasedTimestamp; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.serializers.KryoSerializer; +import org.onosproject.store.serializers.custom.DistributedStoreSerializers; +import org.onosproject.store.service.EventuallyConsistentMap; +import org.onosproject.store.service.EventuallyConsistentMapEvent; +import org.onosproject.store.service.EventuallyConsistentMapListener; +import org.onosproject.store.service.StorageService; +import org.slf4j.Logger; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +/** + * Manages the inventory of links using a {@code EventuallyConsistentMap}. + */ +@Component(immediate = true, enabled = false) +@Service +public class ECLinkStore + extends AbstractStore + implements LinkStore { + + private final Logger log = getLogger(getClass()); + + private final Map links = Maps.newConcurrentMap(); + private EventuallyConsistentMap, LinkDescription> linkDescriptions; + + private static final MessageSubject LINK_INJECT_MESSAGE = new MessageSubject("inject-link-request"); + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected MastershipService mastershipService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected DeviceClockService deviceClockService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterCommunicationService clusterCommunicator; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + private EventuallyConsistentMapListener, LinkDescription> linkTracker = + new InternalLinkTracker(); + + protected static final KryoSerializer SERIALIZER = new KryoSerializer() { + @Override + protected void setupKryoPool() { + serializerPool = KryoNamespace.newBuilder() + .register(DistributedStoreSerializers.STORE_COMMON) + .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN) + .register(Provided.class) + .build(); + } + }; + + @Activate + public void activate() { + KryoNamespace.Builder serializer = KryoNamespace.newBuilder() + .register(KryoNamespaces.API) + .register(MastershipBasedTimestamp.class) + .register(Provided.class); + + linkDescriptions = storageService., LinkDescription>eventuallyConsistentMapBuilder() + .withName("onos-link-descriptions") + .withSerializer(serializer) + .withTimestampProvider((k, v) -> { + try { + return v == null ? null : deviceClockService.getTimestamp(v.dst().deviceId()); + } catch (IllegalStateException e) { + return null; + } + }).build(); + + clusterCommunicator.addSubscriber(LINK_INJECT_MESSAGE, + SERIALIZER::decode, + this::injectLink, + SharedExecutors.getPoolThreadExecutor()); + + linkDescriptions.addListener(linkTracker); + + log.info("Started"); + } + + @Deactivate + public void deactivate() { + linkDescriptions.removeListener(linkTracker); + linkDescriptions.destroy(); + links.clear(); + clusterCommunicator.removeSubscriber(LINK_INJECT_MESSAGE); + + log.info("Stopped"); + } + + @Override + public int getLinkCount() { + return links.size(); + } + + @Override + public Iterable getLinks() { + return links.values(); + } + + @Override + public Set getDeviceEgressLinks(DeviceId deviceId) { + return filter(links.values(), link -> deviceId.equals(link.src().deviceId())); + } + + @Override + public Set getDeviceIngressLinks(DeviceId deviceId) { + return filter(links.values(), link -> deviceId.equals(link.dst().deviceId())); + } + + @Override + public Link getLink(ConnectPoint src, ConnectPoint dst) { + return links.get(linkKey(src, dst)); + } + + @Override + public Set getEgressLinks(ConnectPoint src) { + return filter(links.values(), link -> src.equals(link.src())); + } + + @Override + public Set getIngressLinks(ConnectPoint dst) { + return filter(links.values(), link -> dst.equals(link.dst())); + } + + @Override + public LinkEvent createOrUpdateLink(ProviderId providerId, + LinkDescription linkDescription) { + final DeviceId dstDeviceId = linkDescription.dst().deviceId(); + final NodeId dstNodeId = mastershipService.getMasterFor(dstDeviceId); + + // Process link update only if we're the master of the destination node, + // otherwise signal the actual master. + if (clusterService.getLocalNode().id().equals(dstNodeId)) { + LinkKey linkKey = linkKey(linkDescription.src(), linkDescription.dst()); + Provided internalLinkKey = new Provided<>(linkKey, providerId); + linkDescriptions.compute(internalLinkKey, (k, v) -> createOrUpdateLinkInternal(v , linkDescription)); + return refreshLinkCache(linkKey); + } else { + if (dstNodeId == null) { + return null; + } + return Futures.getUnchecked(clusterCommunicator.sendAndReceive(new Provided<>(linkDescription, providerId), + LINK_INJECT_MESSAGE, + SERIALIZER::encode, + SERIALIZER::decode, + dstNodeId)); + } + } + + private LinkDescription createOrUpdateLinkInternal(LinkDescription current, LinkDescription updated) { + if (current != null) { + // we only allow transition from INDIRECT -> DIRECT + return new DefaultLinkDescription( + current.src(), + current.dst(), + current.type() == DIRECT ? DIRECT : updated.type(), + union(current.annotations(), updated.annotations())); + } + return updated; + } + + private LinkEvent refreshLinkCache(LinkKey linkKey) { + AtomicReference eventType = new AtomicReference<>(); + Link link = links.compute(linkKey, (key, existingLink) -> { + Link newLink = composeLink(linkKey); + if (existingLink == null) { + eventType.set(LINK_ADDED); + return newLink; + } else if (existingLink.state() != newLink.state() || + (existingLink.type() == INDIRECT && newLink.type() == DIRECT) || + !AnnotationsUtil.isEqual(existingLink.annotations(), newLink.annotations())) { + eventType.set(LINK_UPDATED); + return newLink; + } else { + return existingLink; + } + }); + return eventType.get() != null ? new LinkEvent(eventType.get(), link) : null; + } + + private Set getAllProviders(LinkKey linkKey) { + return linkDescriptions.keySet() + .stream() + .filter(key -> key.key().equals(linkKey)) + .map(key -> key.providerId()) + .collect(Collectors.toSet()); + } + + private ProviderId getBaseProviderId(LinkKey linkKey) { + Set allProviders = getAllProviders(linkKey); + if (allProviders.size() > 0) { + return allProviders.stream() + .filter(p -> !p.isAncillary()) + .findFirst() + .orElse(Iterables.getFirst(allProviders, null)); + } + return null; + } + + private Link composeLink(LinkKey linkKey) { + + ProviderId baseProviderId = checkNotNull(getBaseProviderId(linkKey)); + LinkDescription base = linkDescriptions.get(new Provided<>(linkKey, baseProviderId)); + + ConnectPoint src = base.src(); + ConnectPoint dst = base.dst(); + Type type = base.type(); + AtomicReference annotations = new AtomicReference<>(DefaultAnnotations.builder().build()); + annotations.set(merge(annotations.get(), base.annotations())); + + getAllProviders(linkKey).stream() + .map(p -> new Provided<>(linkKey, p)) + .forEach(key -> { + annotations.set(merge(annotations.get(), + linkDescriptions.get(key).annotations())); + }); + + boolean isDurable = Objects.equals(annotations.get().value(AnnotationKeys.DURABLE), "true"); + return new DefaultLink(baseProviderId, src, dst, type, ACTIVE, isDurable, annotations.get()); + } + + // Updates, if necessary the specified link and returns the appropriate event. + // Guarded by linkDescs value (=locking each Link) + private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) { + // Note: INDIRECT -> DIRECT transition only + // so that BDDP discovered Link will not overwrite LDDP Link + if (oldLink.state() != newLink.state() || + (oldLink.type() == INDIRECT && newLink.type() == DIRECT) || + !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) { + + links.put(key, newLink); + return new LinkEvent(LINK_UPDATED, newLink); + } + return null; + } + + @Override + public LinkEvent removeOrDownLink(ConnectPoint src, ConnectPoint dst) { + Link link = getLink(src, dst); + if (link == null) { + return null; + } + + if (link.isDurable()) { + // FIXME: this will not sync link state!!! + return link.state() == INACTIVE ? null : + updateLink(linkKey(link.src(), link.dst()), link, + new DefaultLink(link.providerId(), + link.src(), link.dst(), + link.type(), INACTIVE, + link.isDurable(), + link.annotations())); + } + return removeLink(src, dst); + } + + @Override + public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) { + final LinkKey linkKey = LinkKey.linkKey(src, dst); + LinkDescription removedLinkDescription = + linkDescriptions.remove(new Provided<>(linkKey, checkNotNull(getBaseProviderId(linkKey)))); + if (removedLinkDescription != null) { + return purgeLinkCache(linkKey); + } + return null; + } + + private LinkEvent purgeLinkCache(LinkKey linkKey) { + Link removedLink = links.remove(linkKey); + if (removedLink != null) { + getAllProviders(linkKey).forEach(p -> linkDescriptions.remove(new Provided<>(linkKey, p))); + return new LinkEvent(LINK_REMOVED, removedLink); + } + return null; + } + + private Set filter(Collection links, Predicate predicate) { + return links.stream().filter(predicate).collect(Collectors.toSet()); + } + + private LinkEvent injectLink(Provided linkInjectRequest) { + log.trace("Received request to inject link {}", linkInjectRequest); + + ProviderId providerId = linkInjectRequest.providerId(); + LinkDescription linkDescription = linkInjectRequest.key(); + + final DeviceId deviceId = linkDescription.dst().deviceId(); + if (!deviceClockService.isTimestampAvailable(deviceId)) { + // workaround for ONOS-1208 + log.warn("Not ready to accept update. Dropping {}", linkInjectRequest); + return null; + } + return createOrUpdateLink(providerId, linkDescription); + } + + private class InternalLinkTracker implements EventuallyConsistentMapListener, LinkDescription> { + @Override + public void event(EventuallyConsistentMapEvent, LinkDescription> event) { + if (event.type() == PUT) { + notifyDelegate(refreshLinkCache(event.key().key())); + } else if (event.type() == REMOVE) { + notifyDelegate(purgeLinkCache(event.key().key())); + } + } + } +} \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/Provided.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/Provided.java new file mode 100644 index 0000000000..b5b9e64476 --- /dev/null +++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/Provided.java @@ -0,0 +1,68 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.link.impl; + +import java.util.Objects; + +import org.onosproject.net.provider.ProviderId; + +import com.google.common.base.MoreObjects; + +/** + * Encapsulation of a provider supplied key. + * + * @param key + */ +public class Provided { + private final K key; + private final ProviderId providerId; + + public Provided(K key, ProviderId providerId) { + this.key = key; + this.providerId = providerId; + } + + public ProviderId providerId() { + return providerId; + } + + public K key() { + return key; + } + + @Override + public int hashCode() { + return Objects.hash(key, providerId); + } + + @Override + public boolean equals(Object other) { + if (other instanceof Provided) { + Provided that = (Provided) other; + return Objects.equals(key, that.key) && + Objects.equals(providerId, that.providerId); + } + return false; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("key", key) + .add("providerId", providerId) + .toString(); + } +}