diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java index c6bb015fae..37b27ad920 100644 --- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java +++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java @@ -1538,19 +1538,11 @@ public class SegmentRoutingManager implements SegmentRoutingService { public void event(McastEvent event) { switch (event.type()) { case SOURCE_ADDED: - mcastHandler.processSourceAdded(event); - break; case SOURCE_UPDATED: - mcastHandler.processSourceUpdated(event); - break; case SINK_ADDED: - mcastHandler.processSinkAdded(event); - break; case SINK_REMOVED: - mcastHandler.processSinkRemoved(event); - break; case ROUTE_REMOVED: - mcastHandler.processRouteRemoved(event); + mcastHandler.processMcastEvent(event); break; case ROUTE_ADDED: default: diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java new file mode 100644 index 0000000000..b0875caa10 --- /dev/null +++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java @@ -0,0 +1,95 @@ +/* + * Copyright 2018-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onosproject.segmentrouting.mcast; + +import org.onlab.packet.IpAddress; +import org.onosproject.net.ConnectPoint; + +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Key of the multicast event cache. + */ +class McastCacheKey { + // The group ip + private final IpAddress mcastIp; + // The sink connect point + private final ConnectPoint sink; + + /** + * Constructs a key for multicast event cache. + * + * @param mcastIp multicast group IP address + * @param sink connect point of the sink + */ + public McastCacheKey(IpAddress mcastIp, ConnectPoint sink) { + checkNotNull(mcastIp, "mcastIp cannot be null"); + checkNotNull(sink, "sink cannot be null"); + checkArgument(mcastIp.isMulticast(), "mcastIp must be a multicast address"); + this.mcastIp = mcastIp; + this.sink = sink; + } + + /** + * Returns the multicast IP address of this key. + * + * @return multicast IP + */ + public IpAddress mcastIp() { + return mcastIp; + } + + /** + * Returns the sink of this key. + * + * @return connect point of the sink + */ + public ConnectPoint sink() { + return sink; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof McastCacheKey)) { + return false; + } + McastCacheKey that = + (McastCacheKey) o; + return (Objects.equals(this.mcastIp, that.mcastIp) && + Objects.equals(this.sink, that.sink)); + } + + @Override + public int hashCode() { + return Objects.hash(mcastIp, sink); + } + + @Override + public String toString() { + return toStringHelper(getClass()) + .add("mcastIp", mcastIp) + .add("sink", sink) + .toString(); + } +} diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java index dc8a36dea5..51a4524b37 100644 --- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java +++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java @@ -16,6 +16,10 @@ package org.onosproject.segmentrouting.mcast; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalCause; +import com.google.common.cache.RemovalNotification; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -40,6 +44,7 @@ import org.onosproject.net.flow.DefaultTrafficTreatment; import org.onosproject.net.flow.TrafficSelector; import org.onosproject.net.flow.TrafficTreatment; import org.onosproject.net.flow.criteria.Criteria; +import org.onosproject.net.flow.criteria.VlanIdCriterion; import org.onosproject.net.flow.instructions.Instructions.OutputInstruction; import org.onosproject.net.flowobjective.DefaultFilteringObjective; import org.onosproject.net.flowobjective.DefaultForwardingObjective; @@ -84,6 +89,10 @@ import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkState; import static java.util.concurrent.Executors.newScheduledThreadPool; import static org.onlab.util.Tools.groupedThreads; +import static org.onosproject.net.flow.criteria.Criterion.Type.VLAN_VID; +import static org.onosproject.net.mcast.McastEvent.Type.ROUTE_REMOVED; +import static org.onosproject.net.mcast.McastEvent.Type.SOURCE_ADDED; +import static org.onosproject.net.mcast.McastEvent.Type.SOURCE_UPDATED; import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN; /** @@ -99,6 +108,98 @@ public class McastHandler { private final KryoNamespace.Builder mcastKryo; private final ConsistentMap mcastRoleStore; + // Wait time for the cache + private static final int WAIT_TIME_MS = 1000; + /** + * The mcastEventCache is implemented to avoid race condition by giving more time to the + * underlying subsystems to process previous calls. + */ + private Cache mcastEventCache = CacheBuilder.newBuilder() + .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS) + .removalListener((RemovalNotification notification) -> { + // Get group ip, sink and related event + IpAddress mcastIp = notification.getKey().mcastIp(); + ConnectPoint sink = notification.getKey().sink(); + McastEvent mcastEvent = notification.getValue(); + RemovalCause cause = notification.getCause(); + log.debug("mcastEventCache removal event. group={}, sink={}, mcastEvent={}, cause={}", + mcastIp, sink, mcastEvent, cause); + // If it expires or it has been replaced, we deque the event + switch (notification.getCause()) { + case REPLACED: + case EXPIRED: + dequeueMcastEvent(mcastEvent); + break; + default: + break; + } + }).build(); + + private void enqueueMcastEvent(McastEvent mcastEvent) { + log.debug("Enqueue mcastEvent {}", mcastEvent); + final McastRouteInfo mcastRouteInfo = mcastEvent.subject(); + // Let's create the keys of the cache + ImmutableSet.Builder sinksBuilder = ImmutableSet.builder(); + // For this event we will have a set of sinks + if (mcastEvent.type() == SOURCE_ADDED || + mcastEvent.type() == SOURCE_UPDATED || + mcastEvent.type() == ROUTE_REMOVED) { + // Add all the sinks + sinksBuilder.addAll(mcastRouteInfo.sinks()); + } else { + // We have just one sink in this case + ConnectPoint sink = mcastRouteInfo.sink().orElse(null); + // It is always true, unless something of bad happened + // in the mcast route store + if (sink != null) { + sinksBuilder.add(sink); + } + } + // Push the elements in the cache + sinksBuilder.build().forEach(sink -> { + McastCacheKey cacheKey = new McastCacheKey(mcastRouteInfo.route().group(), + sink); + mcastEventCache.put(cacheKey, mcastEvent); + }); + } + + private void dequeueMcastEvent(McastEvent mcastEvent) { + log.debug("Dequeue mcastEvent {}", mcastEvent); + final McastRouteInfo mcastRouteInfo = mcastEvent.subject(); + // Get source, mcast group + ConnectPoint source = mcastRouteInfo.source().orElse(null); + IpAddress mcastIp = mcastRouteInfo.route().group(); + // According to the event type let's call the proper method + switch (mcastEvent.type()) { + case SOURCE_ADDED: + // Get all the sinks and process + Set sinks = mcastRouteInfo.sinks(); + sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp)); + break; + case SOURCE_UPDATED: + // Get old source + ConnectPoint oldSource = mcastEvent.prevSubject().source().orElse(null); + // Just the first cached element will be processed + processSourceUpdatedInternal(mcastIp, source, oldSource); + break; + case ROUTE_REMOVED: + // Process the route removed, just the first cached element will be processed + processRouteRemovedInternal(source, mcastIp); + break; + case SINK_ADDED: + // Get the only sink and process + ConnectPoint sink = mcastRouteInfo.sink().orElse(null); + processSinkAddedInternal(source, sink, mcastIp); + break; + case SINK_REMOVED: + sink = mcastRouteInfo.sink().orElse(null); + processSinkRemovedInternal(source, sink, mcastIp); + break; + default: + break; + } + } + // Mcast lock to serialize local operations private final Lock mcastLock = new ReentrantLock(); @@ -140,7 +241,7 @@ public class McastHandler { // Executor for mcast bucket corrector private ScheduledExecutorService executorService - = newScheduledThreadPool(1, groupedThreads("mcastBktCorrector", "mcastbktC-%d", log)); + = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log)); /** * Constructs the McastEventHandler. @@ -170,6 +271,9 @@ public class McastHandler { executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10, MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS); + // Schedule the clean up, this will allow the processing of the expired events + executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0, + WAIT_TIME_MS, TimeUnit.MILLISECONDS); } /** @@ -193,97 +297,21 @@ public class McastHandler { } /** - * Processes the SOURCE_ADDED event. + * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED, + * SINK_REMOVED and ROUTE_REMOVED events. * * @param event McastEvent with SOURCE_ADDED type */ - public void processSourceAdded(McastEvent event) { - log.info("processSourceAdded {}", event); + public void processMcastEvent(McastEvent event) { + log.info("process {}", event); + // Verify if it is a complete event McastRouteInfo mcastRouteInfo = event.subject(); if (!mcastRouteInfo.isComplete()) { - log.info("Incompleted McastRouteInfo. Abort."); + log.info("Incompleted McastRouteInfo. Abort {}", event.type()); return; } - ConnectPoint source = mcastRouteInfo.source().orElse(null); - Set sinks = mcastRouteInfo.sinks(); - IpAddress mcastIp = mcastRouteInfo.route().group(); - - sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp)); - } - - /** - * Processes the SOURCE_UPDATED event. - * - * @param event McastEvent with SOURCE_UPDATED type - */ - public void processSourceUpdated(McastEvent event) { - log.info("processSourceUpdated {}", event); - // Get old and new data - McastRouteInfo mcastRouteInfo = event.subject(); - ConnectPoint newSource = mcastRouteInfo.source().orElse(null); - mcastRouteInfo = event.prevSubject(); - ConnectPoint oldSource = mcastRouteInfo.source().orElse(null); - // and group ip - IpAddress mcastIp = mcastRouteInfo.route().group(); - // Process the update event - processSourceUpdatedInternal(mcastIp, newSource, oldSource); - } - - /** - * Processes the SINK_ADDED event. - * - * @param event McastEvent with SINK_ADDED type - */ - public void processSinkAdded(McastEvent event) { - log.info("processSinkAdded {}", event); - McastRouteInfo mcastRouteInfo = event.subject(); - if (!mcastRouteInfo.isComplete()) { - log.info("Incompleted McastRouteInfo. Abort."); - return; - } - ConnectPoint source = mcastRouteInfo.source().orElse(null); - ConnectPoint sink = mcastRouteInfo.sink().orElse(null); - IpAddress mcastIp = mcastRouteInfo.route().group(); - - processSinkAddedInternal(source, sink, mcastIp); - } - - /** - * Processes the SINK_REMOVED event. - * - * @param event McastEvent with SINK_REMOVED type - */ - public void processSinkRemoved(McastEvent event) { - log.info("processSinkRemoved {}", event); - McastRouteInfo mcastRouteInfo = event.subject(); - if (!mcastRouteInfo.isComplete()) { - log.info("Incompleted McastRouteInfo. Abort."); - return; - } - ConnectPoint source = mcastRouteInfo.source().orElse(null); - ConnectPoint sink = mcastRouteInfo.sink().orElse(null); - IpAddress mcastIp = mcastRouteInfo.route().group(); - - processSinkRemovedInternal(source, sink, mcastIp); - } - - /** - * Processes the ROUTE_REMOVED event. - * - * @param event McastEvent with ROUTE_REMOVED type - */ - public void processRouteRemoved(McastEvent event) { - log.info("processRouteRemoved {}", event); - McastRouteInfo mcastRouteInfo = event.subject(); - if (!mcastRouteInfo.source().isPresent()) { - log.info("Incompleted McastRouteInfo. Abort."); - return; - } - // Get group ip and ingress connect point - IpAddress mcastIp = mcastRouteInfo.route().group(); - ConnectPoint source = mcastRouteInfo.source().orElse(null); - - processRouteRemovedInternal(source, mcastIp); + // Just enqueue for now + enqueueMcastEvent(event); } /** @@ -318,11 +346,14 @@ public class McastHandler { NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value(); Set outputPorts = getPorts(nextObjective.next()); - // Let's remove old flows and groups - removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource)); - // Push new flows and group - outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber, - mcastIp, assignedVlan(newSource))); + // This an optimization to avoid unnecessary removal and add + if (!assignedVlanFromNext(nextObjective).equals(assignedVlan(newSource))) { + // Let's remove old flows and groups + removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource)); + // Push new flows and group + outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber, + mcastIp, assignedVlan(newSource))); + } addFilterToDevice(newSource.deviceId(), newSource.port(), assignedVlan(newSource), mcastIp); // Setup mcast roles @@ -1211,6 +1242,16 @@ public class McastHandler { return SegmentRoutingManager.INTERNAL_VLAN; } + /** + * Gets assigned VLAN according to the value in the meta. + * + * @param nextObjective nextObjective to analyze + * @return assigned VLAN ID + */ + private VlanId assignedVlanFromNext(NextObjective nextObjective) { + return ((VlanIdCriterion) nextObjective.meta().getCriterion(VLAN_VID)).vlanId(); + } + /** * Gets the spine-facing port on ingress device of given multicast group. * diff --git a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java index 83dee45e1b..d371fdf49b 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java @@ -41,7 +41,6 @@ import org.onosproject.store.service.StorageService; import org.onosproject.store.service.Versioned; import org.slf4j.Logger; -import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -180,7 +179,7 @@ public class DistributedMcastStore // and the source connect point notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_REMOVED, mcastRouteInfo(route, - Collections.emptySet(), + oldData.sinks(), oldData.source() ))); break;