[CORD-2739] Caching in McastHandler

Change-Id: I7cb2aa98f55ce96e5c1992bcada3d29a19b6526c
This commit is contained in:
Pier Luigi 2018-02-28 17:24:03 +01:00 committed by Thomas Vachuska
parent 9789311cd4
commit 05514fd960
4 changed files with 228 additions and 101 deletions

View File

@ -1538,19 +1538,11 @@ public class SegmentRoutingManager implements SegmentRoutingService {
public void event(McastEvent event) { public void event(McastEvent event) {
switch (event.type()) { switch (event.type()) {
case SOURCE_ADDED: case SOURCE_ADDED:
mcastHandler.processSourceAdded(event);
break;
case SOURCE_UPDATED: case SOURCE_UPDATED:
mcastHandler.processSourceUpdated(event);
break;
case SINK_ADDED: case SINK_ADDED:
mcastHandler.processSinkAdded(event);
break;
case SINK_REMOVED: case SINK_REMOVED:
mcastHandler.processSinkRemoved(event);
break;
case ROUTE_REMOVED: case ROUTE_REMOVED:
mcastHandler.processRouteRemoved(event); mcastHandler.processMcastEvent(event);
break; break;
case ROUTE_ADDED: case ROUTE_ADDED:
default: default:

View File

@ -0,0 +1,95 @@
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.segmentrouting.mcast;
import org.onlab.packet.IpAddress;
import org.onosproject.net.ConnectPoint;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Key of the multicast event cache.
*/
class McastCacheKey {
// The group ip
private final IpAddress mcastIp;
// The sink connect point
private final ConnectPoint sink;
/**
* Constructs a key for multicast event cache.
*
* @param mcastIp multicast group IP address
* @param sink connect point of the sink
*/
public McastCacheKey(IpAddress mcastIp, ConnectPoint sink) {
checkNotNull(mcastIp, "mcastIp cannot be null");
checkNotNull(sink, "sink cannot be null");
checkArgument(mcastIp.isMulticast(), "mcastIp must be a multicast address");
this.mcastIp = mcastIp;
this.sink = sink;
}
/**
* Returns the multicast IP address of this key.
*
* @return multicast IP
*/
public IpAddress mcastIp() {
return mcastIp;
}
/**
* Returns the sink of this key.
*
* @return connect point of the sink
*/
public ConnectPoint sink() {
return sink;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof McastCacheKey)) {
return false;
}
McastCacheKey that =
(McastCacheKey) o;
return (Objects.equals(this.mcastIp, that.mcastIp) &&
Objects.equals(this.sink, that.sink));
}
@Override
public int hashCode() {
return Objects.hash(mcastIp, sink);
}
@Override
public String toString() {
return toStringHelper(getClass())
.add("mcastIp", mcastIp)
.add("sink", sink)
.toString();
}
}

View File

@ -16,6 +16,10 @@
package org.onosproject.segmentrouting.mcast; package org.onosproject.segmentrouting.mcast;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -40,6 +44,7 @@ import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficSelector; import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment; import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.Criteria; import org.onosproject.net.flow.criteria.Criteria;
import org.onosproject.net.flow.criteria.VlanIdCriterion;
import org.onosproject.net.flow.instructions.Instructions.OutputInstruction; import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
import org.onosproject.net.flowobjective.DefaultFilteringObjective; import org.onosproject.net.flowobjective.DefaultFilteringObjective;
import org.onosproject.net.flowobjective.DefaultForwardingObjective; import org.onosproject.net.flowobjective.DefaultForwardingObjective;
@ -84,6 +89,10 @@ import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newScheduledThreadPool; import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads; import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.flow.criteria.Criterion.Type.VLAN_VID;
import static org.onosproject.net.mcast.McastEvent.Type.ROUTE_REMOVED;
import static org.onosproject.net.mcast.McastEvent.Type.SOURCE_ADDED;
import static org.onosproject.net.mcast.McastEvent.Type.SOURCE_UPDATED;
import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN; import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
/** /**
@ -99,6 +108,98 @@ public class McastHandler {
private final KryoNamespace.Builder mcastKryo; private final KryoNamespace.Builder mcastKryo;
private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore; private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
// Wait time for the cache
private static final int WAIT_TIME_MS = 1000;
/**
* The mcastEventCache is implemented to avoid race condition by giving more time to the
* underlying subsystems to process previous calls.
*/
private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
.expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
// Get group ip, sink and related event
IpAddress mcastIp = notification.getKey().mcastIp();
ConnectPoint sink = notification.getKey().sink();
McastEvent mcastEvent = notification.getValue();
RemovalCause cause = notification.getCause();
log.debug("mcastEventCache removal event. group={}, sink={}, mcastEvent={}, cause={}",
mcastIp, sink, mcastEvent, cause);
// If it expires or it has been replaced, we deque the event
switch (notification.getCause()) {
case REPLACED:
case EXPIRED:
dequeueMcastEvent(mcastEvent);
break;
default:
break;
}
}).build();
private void enqueueMcastEvent(McastEvent mcastEvent) {
log.debug("Enqueue mcastEvent {}", mcastEvent);
final McastRouteInfo mcastRouteInfo = mcastEvent.subject();
// Let's create the keys of the cache
ImmutableSet.Builder<ConnectPoint> sinksBuilder = ImmutableSet.builder();
// For this event we will have a set of sinks
if (mcastEvent.type() == SOURCE_ADDED ||
mcastEvent.type() == SOURCE_UPDATED ||
mcastEvent.type() == ROUTE_REMOVED) {
// Add all the sinks
sinksBuilder.addAll(mcastRouteInfo.sinks());
} else {
// We have just one sink in this case
ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
// It is always true, unless something of bad happened
// in the mcast route store
if (sink != null) {
sinksBuilder.add(sink);
}
}
// Push the elements in the cache
sinksBuilder.build().forEach(sink -> {
McastCacheKey cacheKey = new McastCacheKey(mcastRouteInfo.route().group(),
sink);
mcastEventCache.put(cacheKey, mcastEvent);
});
}
private void dequeueMcastEvent(McastEvent mcastEvent) {
log.debug("Dequeue mcastEvent {}", mcastEvent);
final McastRouteInfo mcastRouteInfo = mcastEvent.subject();
// Get source, mcast group
ConnectPoint source = mcastRouteInfo.source().orElse(null);
IpAddress mcastIp = mcastRouteInfo.route().group();
// According to the event type let's call the proper method
switch (mcastEvent.type()) {
case SOURCE_ADDED:
// Get all the sinks and process
Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
break;
case SOURCE_UPDATED:
// Get old source
ConnectPoint oldSource = mcastEvent.prevSubject().source().orElse(null);
// Just the first cached element will be processed
processSourceUpdatedInternal(mcastIp, source, oldSource);
break;
case ROUTE_REMOVED:
// Process the route removed, just the first cached element will be processed
processRouteRemovedInternal(source, mcastIp);
break;
case SINK_ADDED:
// Get the only sink and process
ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
processSinkAddedInternal(source, sink, mcastIp);
break;
case SINK_REMOVED:
sink = mcastRouteInfo.sink().orElse(null);
processSinkRemovedInternal(source, sink, mcastIp);
break;
default:
break;
}
}
// Mcast lock to serialize local operations // Mcast lock to serialize local operations
private final Lock mcastLock = new ReentrantLock(); private final Lock mcastLock = new ReentrantLock();
@ -140,7 +241,7 @@ public class McastHandler {
// Executor for mcast bucket corrector // Executor for mcast bucket corrector
private ScheduledExecutorService executorService private ScheduledExecutorService executorService
= newScheduledThreadPool(1, groupedThreads("mcastBktCorrector", "mcastbktC-%d", log)); = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
/** /**
* Constructs the McastEventHandler. * Constructs the McastEventHandler.
@ -170,6 +271,9 @@ public class McastHandler {
executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10, executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
MCAST_VERIFY_INTERVAL, MCAST_VERIFY_INTERVAL,
TimeUnit.SECONDS); TimeUnit.SECONDS);
// Schedule the clean up, this will allow the processing of the expired events
executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
WAIT_TIME_MS, TimeUnit.MILLISECONDS);
} }
/** /**
@ -193,97 +297,21 @@ public class McastHandler {
} }
/** /**
* Processes the SOURCE_ADDED event. * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
* SINK_REMOVED and ROUTE_REMOVED events.
* *
* @param event McastEvent with SOURCE_ADDED type * @param event McastEvent with SOURCE_ADDED type
*/ */
public void processSourceAdded(McastEvent event) { public void processMcastEvent(McastEvent event) {
log.info("processSourceAdded {}", event); log.info("process {}", event);
// Verify if it is a complete event
McastRouteInfo mcastRouteInfo = event.subject(); McastRouteInfo mcastRouteInfo = event.subject();
if (!mcastRouteInfo.isComplete()) { if (!mcastRouteInfo.isComplete()) {
log.info("Incompleted McastRouteInfo. Abort."); log.info("Incompleted McastRouteInfo. Abort {}", event.type());
return; return;
} }
ConnectPoint source = mcastRouteInfo.source().orElse(null); // Just enqueue for now
Set<ConnectPoint> sinks = mcastRouteInfo.sinks(); enqueueMcastEvent(event);
IpAddress mcastIp = mcastRouteInfo.route().group();
sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
}
/**
* Processes the SOURCE_UPDATED event.
*
* @param event McastEvent with SOURCE_UPDATED type
*/
public void processSourceUpdated(McastEvent event) {
log.info("processSourceUpdated {}", event);
// Get old and new data
McastRouteInfo mcastRouteInfo = event.subject();
ConnectPoint newSource = mcastRouteInfo.source().orElse(null);
mcastRouteInfo = event.prevSubject();
ConnectPoint oldSource = mcastRouteInfo.source().orElse(null);
// and group ip
IpAddress mcastIp = mcastRouteInfo.route().group();
// Process the update event
processSourceUpdatedInternal(mcastIp, newSource, oldSource);
}
/**
* Processes the SINK_ADDED event.
*
* @param event McastEvent with SINK_ADDED type
*/
public void processSinkAdded(McastEvent event) {
log.info("processSinkAdded {}", event);
McastRouteInfo mcastRouteInfo = event.subject();
if (!mcastRouteInfo.isComplete()) {
log.info("Incompleted McastRouteInfo. Abort.");
return;
}
ConnectPoint source = mcastRouteInfo.source().orElse(null);
ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
IpAddress mcastIp = mcastRouteInfo.route().group();
processSinkAddedInternal(source, sink, mcastIp);
}
/**
* Processes the SINK_REMOVED event.
*
* @param event McastEvent with SINK_REMOVED type
*/
public void processSinkRemoved(McastEvent event) {
log.info("processSinkRemoved {}", event);
McastRouteInfo mcastRouteInfo = event.subject();
if (!mcastRouteInfo.isComplete()) {
log.info("Incompleted McastRouteInfo. Abort.");
return;
}
ConnectPoint source = mcastRouteInfo.source().orElse(null);
ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
IpAddress mcastIp = mcastRouteInfo.route().group();
processSinkRemovedInternal(source, sink, mcastIp);
}
/**
* Processes the ROUTE_REMOVED event.
*
* @param event McastEvent with ROUTE_REMOVED type
*/
public void processRouteRemoved(McastEvent event) {
log.info("processRouteRemoved {}", event);
McastRouteInfo mcastRouteInfo = event.subject();
if (!mcastRouteInfo.source().isPresent()) {
log.info("Incompleted McastRouteInfo. Abort.");
return;
}
// Get group ip and ingress connect point
IpAddress mcastIp = mcastRouteInfo.route().group();
ConnectPoint source = mcastRouteInfo.source().orElse(null);
processRouteRemovedInternal(source, mcastIp);
} }
/** /**
@ -318,11 +346,14 @@ public class McastHandler {
NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value(); NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
Set<PortNumber> outputPorts = getPorts(nextObjective.next()); Set<PortNumber> outputPorts = getPorts(nextObjective.next());
// This an optimization to avoid unnecessary removal and add
if (!assignedVlanFromNext(nextObjective).equals(assignedVlan(newSource))) {
// Let's remove old flows and groups // Let's remove old flows and groups
removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource)); removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource));
// Push new flows and group // Push new flows and group
outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber, outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
mcastIp, assignedVlan(newSource))); mcastIp, assignedVlan(newSource)));
}
addFilterToDevice(newSource.deviceId(), newSource.port(), addFilterToDevice(newSource.deviceId(), newSource.port(),
assignedVlan(newSource), mcastIp); assignedVlan(newSource), mcastIp);
// Setup mcast roles // Setup mcast roles
@ -1211,6 +1242,16 @@ public class McastHandler {
return SegmentRoutingManager.INTERNAL_VLAN; return SegmentRoutingManager.INTERNAL_VLAN;
} }
/**
* Gets assigned VLAN according to the value in the meta.
*
* @param nextObjective nextObjective to analyze
* @return assigned VLAN ID
*/
private VlanId assignedVlanFromNext(NextObjective nextObjective) {
return ((VlanIdCriterion) nextObjective.meta().getCriterion(VLAN_VID)).vlanId();
}
/** /**
* Gets the spine-facing port on ingress device of given multicast group. * Gets the spine-facing port on ingress device of given multicast group.
* *

View File

@ -41,7 +41,6 @@ import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned; import org.onosproject.store.service.Versioned;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
@ -180,7 +179,7 @@ public class DistributedMcastStore
// and the source connect point // and the source connect point
notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_REMOVED, notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
mcastRouteInfo(route, mcastRouteInfo(route,
Collections.emptySet(), oldData.sinks(),
oldData.source() oldData.source()
))); )));
break; break;