[CORD-2748] Multicast flows are not properly updated when changing source

Change-Id: Ia4a9e9f132156e4b9a7eb56dd943f1fd13bc1560
This commit is contained in:
Pier Luigi 2018-02-26 12:31:38 +01:00 committed by Charles Chan
parent f7049c5cbc
commit 57d417971e
5 changed files with 167 additions and 8 deletions

View File

@ -227,6 +227,24 @@ public class McastHandler {
sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
}
/**
* Processes the SOURCE_UPDATED event.
*
* @param event McastEvent with SOURCE_UPDATED type
*/
protected void processSourceUpdated(McastEvent event) {
log.info("processSourceUpdated {}", event);
// Get old and new data
McastRouteInfo mcastRouteInfo = event.subject();
ConnectPoint newSource = mcastRouteInfo.source().orElse(null);
mcastRouteInfo = event.prevSubject();
ConnectPoint oldSource = mcastRouteInfo.source().orElse(null);
// and group ip
IpAddress mcastIp = mcastRouteInfo.route().group();
// Process the update event
processSourceUpdatedInternal(mcastIp, newSource, oldSource);
}
/**
* Processes the SINK_ADDED event.
*
@ -279,11 +297,58 @@ public class McastHandler {
}
// Get group ip and ingress connect point
IpAddress mcastIp = mcastRouteInfo.route().group();
ConnectPoint source = mcastRouteInfo.source().get();
ConnectPoint source = mcastRouteInfo.source().orElse(null);
processRouteRemovedInternal(source, mcastIp);
}
/**
* Process the SOURCE_UPDATED event.
*
* @param newSource the updated srouce info
* @param oldSource the outdated source info
*/
private void processSourceUpdatedInternal(IpAddress mcastIp,
ConnectPoint newSource,
ConnectPoint oldSource) {
lastMcastChange = Instant.now();
mcastLock();
try {
log.debug("Processing source updated for group {}", mcastIp);
// Build key for the store and retrieve old data
McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, oldSource.deviceId());
// Verify leadership on the operation
if (!isLeader(oldSource)) {
log.debug("Skip {} due to lack of leadership", mcastIp);
return;
}
// This device is not serving this multicast group
if (!mcastRoleStore.containsKey(mcastStoreKey) ||
!mcastNextObjStore.containsKey(mcastStoreKey)) {
log.warn("{} is not serving {}. Abort.", oldSource.deviceId(), mcastIp);
return;
}
NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
Set<PortNumber> outputPorts = getPorts(nextObjective.next());
// Let's remove old flows and groups
removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource));
// Push new flows and group
outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
mcastIp, assignedVlan(newSource)));
addFilterToDevice(newSource.deviceId(), newSource.port(),
assignedVlan(newSource), mcastIp);
// Setup mcast roles
mcastRoleStore.put(new McastStoreKey(mcastIp, newSource.deviceId()),
McastRole.INGRESS);
} finally {
mcastUnlock();
}
}
/**
* Removes the entire mcast tree related to this group.
*
@ -293,7 +358,7 @@ public class McastHandler {
lastMcastChange = Instant.now();
mcastLock();
try {
log.debug("Processing route down for group {}", mcastIp);
log.debug("Processing route removed for group {}", mcastIp);
// Find out the ingress, transit and egress device of the affected group
DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
@ -322,7 +387,6 @@ public class McastHandler {
if (ingressDevice != null) {
removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
}
} finally {
mcastUnlock();
}

View File

@ -1533,6 +1533,9 @@ public class SegmentRoutingManager implements SegmentRoutingService {
case SOURCE_ADDED:
mcastHandler.processSourceAdded(event);
break;
case SOURCE_UPDATED:
mcastHandler.processSourceUpdated(event);
break;
case SINK_ADDED:
mcastHandler.processSinkAdded(event);
break;

View File

@ -17,6 +17,8 @@ package org.onosproject.net.mcast;
import org.onosproject.event.AbstractEvent;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
@ -45,6 +47,11 @@ public class McastEvent extends AbstractEvent<McastEvent.Type, McastRouteInfo> {
*/
SOURCE_ADDED,
/**
* A source for a mcast route has been updated.
*/
SOURCE_UPDATED,
/**
* A sink for a mcast route (ie. the subject) has been added.
*/
@ -56,15 +63,74 @@ public class McastEvent extends AbstractEvent<McastEvent.Type, McastRouteInfo> {
SINK_REMOVED
}
// Used when an update event happens
private McastRouteInfo prevSubject;
/**
* Creates a McastEvent of a given type using the subject.
*
* @param type the event type
* @param subject the subject of the event type
*/
public McastEvent(McastEvent.Type type, McastRouteInfo subject) {
super(type, subject);
}
/**
* Creates a McastEvent of a given type using the subject and
* the previous subject.
*
* @param type the event type
* @param subject the subject of the event
* @param prevSubject the previous subject of the event
*/
public McastEvent(McastEvent.Type type, McastRouteInfo subject,
McastRouteInfo prevSubject) {
super(type, subject);
// For now we have just this kind of updates
if (type == Type.SOURCE_UPDATED) {
this.prevSubject = prevSubject;
}
}
/**
* Gets the previous subject in this mcast event.
*
* @return the previous subject, or null if previous subject is not
* specified.
*/
public McastRouteInfo prevSubject() {
return this.prevSubject;
}
@Override
public int hashCode() {
return Objects.hash(type(), subject(), prevSubject);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof McastEvent)) {
return false;
}
McastEvent that = (McastEvent) other;
return Objects.equals(this.subject(), that.subject()) &&
Objects.equals(this.type(), that.type()) &&
Objects.equals(this.prevSubject, that.prevSubject);
}
@Override
public String toString() {
return toStringHelper(this)
.add("type", type())
.add("info", subject()).toString();
.add("info", subject())
.add("prevInfo", prevSubject())
.toString();
}
}

View File

@ -135,11 +135,28 @@ public class DistributedMcastStore
checkNotNull(newData);
checkNotNull(oldData);
// They are not equal
if (!Objects.equal(oldData.source(), newData.source())) {
notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
mcastRouteInfo(route,
newData.sinks(),
newData.source())));
// Both not null, it is an update event
if (oldData.source() != null && newData.source() != null) {
// Broadcast old and new data
notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_UPDATED,
mcastRouteInfo(route,
newData.sinks(),
newData.source()),
mcastRouteInfo(route,
oldData.sinks(),
oldData.source())));
} else if (oldData.source() == null && newData.source() != null) {
// It is a source added event, broadcast new data
notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
mcastRouteInfo(route,
newData.sinks(),
newData.source())));
} else {
// Scenario not managed for now
log.warn("Unhandled scenario {} - new {} - old {}", event.type());
}
} else {
Sets.difference(newData.sinks(), oldData.sinks()).forEach(sink ->
notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,

View File

@ -25,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
/**
@ -101,4 +102,12 @@ public final class MulticastData {
Objects.equals(sinks(), other.sinks()) &&
Objects.equals(isEmpty, other.isEmpty);
}
@Override
public String toString() {
return toStringHelper(this)
.add("source", source())
.add("sinks", sinks())
.toString();
}
}