From b72201bcda02bb96ba3d68da63fdbfd9909befff Mon Sep 17 00:00:00 2001 From: Pier Luigi Date: Thu, 25 Jan 2018 16:16:02 +0100 Subject: [PATCH] [CORD-2607] Mcast buckets correction Change-Id: Ib47b2d8e40babdbb2ccdba61b48365a141752016 --- .../segmentrouting/McastHandler.java | 643 ++++++++++++------ .../segmentrouting/SegmentRoutingManager.java | 2 + .../pipeline/ofdpa/Ofdpa2GroupHandler.java | 45 +- 3 files changed, 457 insertions(+), 233 deletions(-) diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java index 94b3a4cf04..e917b300d2 100644 --- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java +++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java @@ -50,6 +50,7 @@ import org.onosproject.net.flowobjective.ForwardingObjective; import org.onosproject.net.flowobjective.NextObjective; import org.onosproject.net.flowobjective.ObjectiveContext; import org.onosproject.net.mcast.McastEvent; +import org.onosproject.net.mcast.McastRoute; import org.onosproject.net.mcast.McastRouteInfo; import org.onosproject.net.topology.TopologyService; import org.onosproject.segmentrouting.config.SegmentRoutingAppConfig; @@ -62,6 +63,7 @@ import org.onosproject.store.service.Versioned; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -69,9 +71,15 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkState; +import static java.util.concurrent.Executors.newScheduledThreadPool; +import static org.onlab.util.Tools.groupedThreads; import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN; /** @@ -87,6 +95,49 @@ public class McastHandler { private final KryoNamespace.Builder mcastKryo; private final ConsistentMap mcastRoleStore; + // Mcast lock to serialize local operations + private final Lock mcastLock = new ReentrantLock(); + + /** + * Acquires the lock used when making mcast changes. + */ + private void mcastLock() { + mcastLock.lock(); + } + + /** + * Releases the lock used when making mcast changes. + */ + private void mcastUnlock() { + mcastLock.unlock(); + } + + // Stability threshold for Mcast. Seconds + private static final long MCAST_STABLITY_THRESHOLD = 5; + // Last change done + private Instant lastMcastChange = Instant.now(); + + /** + * Determines if mcast in the network has been stable in the last + * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time + * to the last mcast change timestamp. + * + * @return true if stable + */ + private boolean isMcastStable() { + long last = (long) (lastMcastChange.toEpochMilli() / 1000.0); + long now = (long) (Instant.now().toEpochMilli() / 1000.0); + log.debug("Mcast stable since {}s", now - last); + return (now - last) > MCAST_STABLITY_THRESHOLD; + } + + // Verify interval for Mcast + private static final long MCAST_VERIFY_INTERVAL = 30; + + // Executor for mcast bucket corrector + private ScheduledExecutorService executorService + = newScheduledThreadPool(1, groupedThreads("mcastBktCorrector", "mcastbktC-%d", log)); + /** * Role in the multicast tree. */ @@ -129,6 +180,10 @@ public class McastHandler { .withName("onos-mcast-role-store") .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role"))) .build(); + // Init the executor service and the buckets corrector + executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10, + MCAST_VERIFY_INTERVAL, + TimeUnit.SECONDS); } /** @@ -144,6 +199,13 @@ public class McastHandler { }); } + /** + * Clean up when deactivating the application. + */ + protected void terminate() { + executorService.shutdown(); + } + /** * Processes the SOURCE_ADDED event. * @@ -160,9 +222,7 @@ public class McastHandler { Set sinks = mcastRouteInfo.sinks(); IpAddress mcastIp = mcastRouteInfo.route().group(); - sinks.forEach(sink -> { - processSinkAddedInternal(source, sink, mcastIp); - }); + sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp)); } /** @@ -200,43 +260,65 @@ public class McastHandler { ConnectPoint sink = mcastRouteInfo.sink().orElse(null); IpAddress mcastIp = mcastRouteInfo.route().group(); - // Continue only when this instance is the master of source device - if (!srManager.mastershipService.isLocalMaster(source.deviceId())) { - log.info("Skip {} due to lack of mastership of the source device {}", - mcastIp, source.deviceId()); - return; - } + processSinkRemovedInternal(source, sink, mcastIp); + } - // When source and sink are on the same device - if (source.deviceId().equals(sink.deviceId())) { - // Source and sink are on even the same port. There must be something wrong. - if (source.port().equals(sink.port())) { - log.warn("Sink is on the same port of source. Abort"); + /** + * Removes a path from source to sink for given multicast group. + * + * @param source connect point of the multicast source + * @param sink connection point of the multicast sink + * @param mcastIp multicast group IP address + */ + private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink, + IpAddress mcastIp) { + lastMcastChange = Instant.now(); + mcastLock(); + try { + // Continue only when this instance is the master of source device + if (!srManager.mastershipService.isLocalMaster(source.deviceId())) { + log.debug("Skip {} due to lack of mastership of the source device {}", + mcastIp, source.deviceId()); return; } - removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source)); - return; - } - // Process the egress device - boolean isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null)); - if (isLast) { - mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId())); - } + // When source and sink are on the same device + if (source.deviceId().equals(sink.deviceId())) { + // Source and sink are on even the same port. There must be something wrong. + if (source.port().equals(sink.port())) { + log.warn("Skip {} since sink {} is on the same port of source {}. Abort", + mcastIp, sink, source); + return; + } + removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source)); + return; + } - // If this is the last sink on the device, also update upstream - Optional mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp); - if (mcastPath.isPresent()) { - List links = Lists.newArrayList(mcastPath.get().links()); - Collections.reverse(links); - for (Link link : links) { - if (isLast) { - isLast = removePortFromDevice(link.src().deviceId(), link.src().port(), - mcastIp, - assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null)); - mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId())); + // Process the egress device + boolean isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null)); + if (isLast) { + mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId())); + } + + // If this is the last sink on the device, also update upstream + Optional mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp); + if (mcastPath.isPresent()) { + List links = Lists.newArrayList(mcastPath.get().links()); + Collections.reverse(links); + for (Link link : links) { + if (isLast) { + isLast = removePortFromDevice( + link.src().deviceId(), + link.src().port(), + mcastIp, + assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null) + ); + mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId())); + } } } + } finally { + mcastUnlock(); } } @@ -249,54 +331,61 @@ public class McastHandler { */ private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink, IpAddress mcastIp) { - // Continue only when this instance is the master of source device - if (!srManager.mastershipService.isLocalMaster(source.deviceId())) { - log.info("Skip {} due to lack of mastership of the source device {}", - source.deviceId()); - return; - } - - // Process the ingress device - addFilterToDevice(source.deviceId(), source.port(), assignedVlan(source), mcastIp); - - // When source and sink are on the same device - if (source.deviceId().equals(sink.deviceId())) { - // Source and sink are on even the same port. There must be something wrong. - if (source.port().equals(sink.port())) { - log.warn("Sink is on the same port of source. Abort"); + lastMcastChange = Instant.now(); + mcastLock(); + try { + // Continue only when this instance is the master of source device + if (!srManager.mastershipService.isLocalMaster(source.deviceId())) { + log.debug("Skip {} due to lack of mastership of the source device {}", + mcastIp, source.deviceId()); return; } - addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source)); - mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), McastRole.INGRESS); - return; - } - // Find a path. If present, create/update groups and flows for each hop - Optional mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp); - if (mcastPath.isPresent()) { - List links = mcastPath.get().links(); - checkState(links.size() == 2, - "Path in leaf-spine topology should always be two hops: ", links); + // Process the ingress device + addFilterToDevice(source.deviceId(), source.port(), assignedVlan(source), mcastIp); - links.forEach(link -> { - addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp, - assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null)); - addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null), mcastIp); - }); + // When source and sink are on the same device + if (source.deviceId().equals(sink.deviceId())) { + // Source and sink are on even the same port. There must be something wrong. + if (source.port().equals(sink.port())) { + log.warn("Skip {} since sink {} is on the same port of source {}. Abort", + mcastIp, sink, source); + return; + } + addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source)); + mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), McastRole.INGRESS); + return; + } - // Process the egress device - addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null)); + // Find a path. If present, create/update groups and flows for each hop + Optional mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp); + if (mcastPath.isPresent()) { + List links = mcastPath.get().links(); + checkState(links.size() == 2, + "Path in leaf-spine topology should always be two hops: ", links); - // Setup mcast roles - mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()), - McastRole.INGRESS); - mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()), - McastRole.TRANSIT); - mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), - McastRole.EGRESS); - } else { - log.warn("Unable to find a path from {} to {}. Abort sinkAdded", - source.deviceId(), sink.deviceId()); + links.forEach(link -> { + addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp, + assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null)); + addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null), mcastIp); + }); + + // Process the egress device + addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null)); + + // Setup mcast roles + mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()), + McastRole.INGRESS); + mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()), + McastRole.TRANSIT); + mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), + McastRole.EGRESS); + } else { + log.warn("Unable to find a path from {} to {}. Abort sinkAdded", + source.deviceId(), sink.deviceId()); + } + } finally { + mcastUnlock(); } } @@ -306,56 +395,63 @@ public class McastHandler { * @param affectedLink Link that is going down */ protected void processLinkDown(Link affectedLink) { - getAffectedGroups(affectedLink).forEach(mcastIp -> { - // TODO Optimize when the group editing is in place - log.debug("Processing link down {} for group {}", - affectedLink, mcastIp); + lastMcastChange = Instant.now(); + mcastLock(); + try { + // Get groups affected by the link down event + getAffectedGroups(affectedLink).forEach(mcastIp -> { + // TODO Optimize when the group editing is in place + log.debug("Processing link down {} for group {}", + affectedLink, mcastIp); - // Find out the ingress, transit and egress device of affected group - DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS) - .stream().findAny().orElse(null); - DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT) - .stream().findAny().orElse(null); - Set egressDevices = getDevice(mcastIp, McastRole.EGRESS); - ConnectPoint source = getSource(mcastIp); + // Find out the ingress, transit and egress device of affected group + DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS) + .stream().findAny().orElse(null); + DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT) + .stream().findAny().orElse(null); + Set egressDevices = getDevice(mcastIp, McastRole.EGRESS); + ConnectPoint source = getSource(mcastIp); - // Do not proceed if any of these info is missing - if (ingressDevice == null || transitDevice == null - || egressDevices == null || source == null) { - log.warn("Missing ingress {}, transit {}, egress {} devices or source {}", - ingressDevice, transitDevice, egressDevices, source); - return; - } - - // Continue only when this instance is the master of source device - if (!srManager.mastershipService.isLocalMaster(source.deviceId())) { - log.info("Skip {} due to lack of mastership of the source device {}", - source.deviceId()); - return; - } - - // Remove entire transit - removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null)); - - // Remove transit-facing port on ingress device - PortNumber ingressTransitPort = ingressTransitPort(mcastIp); - if (ingressTransitPort != null) { - removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source)); - mcastRoleStore.remove(new McastStoreKey(mcastIp, transitDevice)); - } - - // Construct a new path for each egress device - egressDevices.forEach(egressDevice -> { - Optional mcastPath = getPath(ingressDevice, egressDevice, mcastIp); - if (mcastPath.isPresent()) { - installPath(mcastIp, source, mcastPath.get()); - } else { - log.warn("Fail to recover egress device {} from link failure {}", - egressDevice, affectedLink); - removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null)); + // Do not proceed if any of these info is missing + if (ingressDevice == null || transitDevice == null + || egressDevices == null || source == null) { + log.warn("Missing ingress {}, transit {}, egress {} devices or source {}", + ingressDevice, transitDevice, egressDevices, source); + return; } + + // Continue only when this instance is the master of source device + if (!srManager.mastershipService.isLocalMaster(source.deviceId())) { + log.debug("Skip {} due to lack of mastership of the source device {}", + source.deviceId()); + return; + } + + // Remove entire transit + removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null)); + + // Remove transit-facing port on ingress device + PortNumber ingressTransitPort = ingressTransitPort(mcastIp); + if (ingressTransitPort != null) { + removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source)); + mcastRoleStore.remove(new McastStoreKey(mcastIp, transitDevice)); + } + + // Construct a new path for each egress device + egressDevices.forEach(egressDevice -> { + Optional mcastPath = getPath(ingressDevice, egressDevice, mcastIp); + if (mcastPath.isPresent()) { + installPath(mcastIp, source, mcastPath.get()); + } else { + log.warn("Fail to recover egress device {} from link failure {}", + egressDevice, affectedLink); + removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null)); + } + }); }); - }); + } finally { + mcastUnlock(); + } } /** @@ -364,103 +460,109 @@ public class McastHandler { * @param deviceDown device going down */ protected void processDeviceDown(DeviceId deviceDown) { - // Get the mcast groups affected by the device going down - getAffectedGroups(deviceDown).forEach(mcastIp -> { - // TODO Optimize when the group editing is in place - log.debug("Processing device down {} for group {}", - deviceDown, mcastIp); + lastMcastChange = Instant.now(); + mcastLock(); + try { + // Get the mcast groups affected by the device going down + getAffectedGroups(deviceDown).forEach(mcastIp -> { + // TODO Optimize when the group editing is in place + log.debug("Processing device down {} for group {}", + deviceDown, mcastIp); - // Find out the ingress, transit and egress device of affected group - DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS) - .stream().findAny().orElse(null); - DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT) - .stream().findAny().orElse(null); - Set egressDevices = getDevice(mcastIp, McastRole.EGRESS); - ConnectPoint source = getSource(mcastIp); + // Find out the ingress, transit and egress device of affected group + DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS) + .stream().findAny().orElse(null); + DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT) + .stream().findAny().orElse(null); + Set egressDevices = getDevice(mcastIp, McastRole.EGRESS); + ConnectPoint source = getSource(mcastIp); - // Do not proceed if ingress device or source of this group are missing - // If sinks are in other leafs, we have ingress, transit, egress, and source - // If sinks are in the same leaf, we have just ingress and source - if (ingressDevice == null || source == null) { - log.warn("Missing ingress {} or source {} for group {}", - ingressDevice, source, mcastIp); - return; - } - - // Continue only when we have the mastership on the operation - if (!srManager.mastershipService.isLocalMaster(source.deviceId())) { - // When the source is available we just check the mastership - if (srManager.deviceService.isAvailable(source.deviceId())) { - log.info("Skip {} due to lack of mastership of the source device {}", - mcastIp, source.deviceId()); + // Do not proceed if ingress device or source of this group are missing + // If sinks are in other leafs, we have ingress, transit, egress, and source + // If sinks are in the same leaf, we have just ingress and source + if (ingressDevice == null || source == null) { + log.warn("Missing ingress {} or source {} for group {}", + ingressDevice, source, mcastIp); return; } - // Fallback with Leadership service - // source id is used a topic - NodeId leader = srManager.leadershipService.runForLeadership( - source.deviceId().toString()).leaderNodeId(); - // Verify if this node is the leader - if (!srManager.clusterService.getLocalNode().id().equals(leader)) { - log.info("Skip {} due to lack of leadership on the topic {}", - mcastIp, source.deviceId()); - return; - } - } - // If it exists, we have to remove it in any case - if (transitDevice != null) { - // Remove entire transit - removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null)); - } - // If the ingress is down - if (ingressDevice.equals(deviceDown)) { - // Remove entire ingress - removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source)); - // If other sinks different from the ingress exist - if (!egressDevices.isEmpty()) { - // Remove all the remaining egress - egressDevices.forEach( - egressDevice -> removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null)) - ); - } - } else { - // Egress or transit could be down at this point - // Get the ingress-transit port if it exists - PortNumber ingressTransitPort = ingressTransitPort(mcastIp); - if (ingressTransitPort != null) { - // Remove transit-facing port on ingress device - removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source)); - } - // One of the egress device is down - if (egressDevices.contains(deviceDown)) { - // Remove entire device down - removeGroupFromDevice(deviceDown, mcastIp, assignedVlan(null)); - // Remove the device down from egress - egressDevices.remove(deviceDown); - // If there are no more egress and ingress does not have sinks - if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) { - // Remove entire ingress - mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice)); - // We have done + // Continue only when we have the mastership on the operation + if (!srManager.mastershipService.isLocalMaster(source.deviceId())) { + // When the source is available we just check the mastership + if (srManager.deviceService.isAvailable(source.deviceId())) { + log.debug("Skip {} due to lack of mastership of the source device {}", + mcastIp, source.deviceId()); + return; + } + // Fallback with Leadership service + // source id is used a topic + NodeId leader = srManager.leadershipService.runForLeadership( + source.deviceId().toString()).leaderNodeId(); + // Verify if this node is the leader + if (!srManager.clusterService.getLocalNode().id().equals(leader)) { + log.debug("Skip {} due to lack of leadership on the topic {}", + mcastIp, source.deviceId()); return; } } - // Construct a new path for each egress device - egressDevices.forEach(egressDevice -> { - Optional mcastPath = getPath(ingressDevice, egressDevice, mcastIp); - // If there is a new path - if (mcastPath.isPresent()) { - // Let's install the new mcast path for this egress - installPath(mcastIp, source, mcastPath.get()); - } else { - // We were not able to find an alternative path for this egress - log.warn("Fail to recover egress device {} from device down {}", - egressDevice, deviceDown); - removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null)); + + // If it exists, we have to remove it in any case + if (transitDevice != null) { + // Remove entire transit + removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null)); + } + // If the ingress is down + if (ingressDevice.equals(deviceDown)) { + // Remove entire ingress + removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source)); + // If other sinks different from the ingress exist + if (!egressDevices.isEmpty()) { + // Remove all the remaining egress + egressDevices.forEach( + egressDevice -> removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null)) + ); } - }); - } - }); + } else { + // Egress or transit could be down at this point + // Get the ingress-transit port if it exists + PortNumber ingressTransitPort = ingressTransitPort(mcastIp); + if (ingressTransitPort != null) { + // Remove transit-facing port on ingress device + removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source)); + } + // One of the egress device is down + if (egressDevices.contains(deviceDown)) { + // Remove entire device down + removeGroupFromDevice(deviceDown, mcastIp, assignedVlan(null)); + // Remove the device down from egress + egressDevices.remove(deviceDown); + // If there are no more egress and ingress does not have sinks + if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) { + // Remove entire ingress + mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice)); + // We have done + return; + } + } + // Construct a new path for each egress device + egressDevices.forEach(egressDevice -> { + Optional mcastPath = getPath(ingressDevice, egressDevice, mcastIp); + // If there is a new path + if (mcastPath.isPresent()) { + // Let's install the new mcast path for this egress + installPath(mcastIp, source, mcastPath.get()); + } else { + // We were not able to find an alternative path for this egress + log.warn("Fail to recover egress device {} from device down {}", + egressDevice, deviceDown); + removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null)); + } + }); + } + }); + } finally { + mcastUnlock(); + } } /** @@ -1056,7 +1158,9 @@ public class McastHandler { } /** - * Adds or removes filtering objective for given device and port. + * Updates filtering objective for given device and port. + * It is called in general when the mcast config has been + * changed. * * @param deviceId device ID * @param portNum ingress port number @@ -1065,15 +1169,118 @@ public class McastHandler { */ protected void updateFilterToDevice(DeviceId deviceId, PortNumber portNum, VlanId vlanId, boolean install) { - srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> { - ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute); - if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) { - if (install) { - addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group()); - } else { - removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group()); + lastMcastChange = Instant.now(); + mcastLock(); + try { + // Iterates over the route and updates properly the filtering objective + // on the source device. + srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> { + ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute); + if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) { + if (install) { + addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group()); + } else { + removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group()); + } } + }); + } finally { + mcastUnlock(); + } + } + + /** + * Performs bucket verification operation for all mcast groups in the devices. + * Firstly, it verifies that mcast is stable before trying verification operation. + * Verification consists in creating new nexts with VERIFY operation. Actually, + * the operation is totally delegated to the driver. + */ + private final class McastBucketCorrector implements Runnable { + + @Override + public void run() { + // Verify if the Mcast has been stable for MCAST_STABLITY_THRESHOLD + if (!isMcastStable()) { + return; } - }); + // Acquires lock + mcastLock(); + try { + // Iterates over the routes and verify the related next objectives + srManager.multicastRouteService.getRoutes() + .stream() + .map(McastRoute::group) + .forEach(mcastIp -> { + log.trace("Running mcast buckets corrector for mcast group: {}", + mcastIp); + + // For each group we get current information in the store + // and issue a check of the next objectives in place + DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS) + .stream().findAny().orElse(null); + DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT) + .stream().findAny().orElse(null); + Set egressDevices = getDevice(mcastIp, McastRole.EGRESS); + ConnectPoint source = getSource(mcastIp); + + // Do not proceed if ingress device or source of this group are missing + if (ingressDevice == null || source == null) { + log.warn("Unable to run buckets corrector. " + + "Missing ingress {} or source {} for group {}", + ingressDevice, source, mcastIp); + return; + } + + // Continue only when this instance is the master of source device + if (!srManager.mastershipService.isLocalMaster(source.deviceId())) { + log.trace("Unable to run buckets corrector. " + + "Skip {} due to lack of mastership " + + "of the source device {}", + mcastIp, source.deviceId()); + return; + } + + // Create the set of the devices to be processed + ImmutableSet.Builder devicesBuilder = ImmutableSet.builder(); + devicesBuilder.add(ingressDevice); + if (transitDevice != null) { + devicesBuilder.add(transitDevice); + } + if (!egressDevices.isEmpty()) { + devicesBuilder.addAll(egressDevices); + } + Set devicesToProcess = devicesBuilder.build(); + + // Iterate over the devices + devicesToProcess.forEach(deviceId -> { + McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId); + // If next exists in our store verify related next objective + if (mcastNextObjStore.containsKey(currentKey)) { + NextObjective currentNext = mcastNextObjStore.get(currentKey).value(); + // Get current ports + Set currentPorts = getPorts(currentNext.next()); + // Rebuild the next objective + currentNext = nextObjBuilder( + mcastIp, + assignedVlan(deviceId.equals(source.deviceId()) ? source : null), + currentPorts, + currentNext.id() + ).verify(); + // Send to the flowobjective service + srManager.flowObjectiveService.next(deviceId, currentNext); + } else { + log.warn("Unable to run buckets corrector." + + "Missing next for {} and group {}", + deviceId, mcastIp); + } + }); + + }); + } finally { + // Finally, it releases the lock + mcastUnlock(); + } + + } } } diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java index c3980b21e2..2b607affce 100644 --- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java +++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java @@ -490,6 +490,8 @@ public class SegmentRoutingManager implements SegmentRoutingService { portNextObjStore.destroy(); tunnelStore.destroy(); policyStore.destroy(); + + mcastHandler.terminate(); log.info("Stopped"); } diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java index a3670e6cab..e41ca4c327 100644 --- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java +++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java @@ -1588,8 +1588,8 @@ public class Ofdpa2GroupHandler { * modified to match the given next objective */ protected void verifyGroup(NextObjective nextObjective, NextGroup next) { - if (nextObjective.type() != NextObjective.Type.HASHED) { - log.warn("verification not supported for {} group", nextObjective.type()); + if (nextObjective.type() == NextObjective.Type.SIMPLE) { + log.warn("verification not supported for indirect group"); fail(nextObjective, ObjectiveError.UNSUPPORTED); return; } @@ -1640,17 +1640,25 @@ public class Ofdpa2GroupHandler { indicesToRemove.addAll(otherIndices); } + log.debug("Buckets to create {}", bucketsToCreate); + log.debug("Indices to remove {}", indicesToRemove); + if (!bucketsToCreate.isEmpty()) { log.info("creating {} buckets as part of nextId: {} verification", bucketsToCreate.size(), nextObjective.id()); //create a nextObjective only with these buckets NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder() .withId(nextObjective.id()) - .withType(NextObjective.Type.HASHED) + .withType(nextObjective.type()) .withMeta(nextObjective.meta()) .fromApp(nextObjective.appId()); - bucketsToCreate.forEach(bucket -> nextObjBuilder.addTreatment(bucket)); - addBucketToHashGroup(nextObjBuilder.addToExisting(), allActiveKeys); + bucketsToCreate.forEach(nextObjBuilder::addTreatment); + // According to the next type we call the proper add function + if (nextObjective.type() == NextObjective.Type.HASHED) { + addBucketToHashGroup(nextObjBuilder.addToExisting(), allActiveKeys); + } else { + addBucketToBroadcastGroup(nextObjBuilder.addToExisting(), allActiveKeys); + } } if (!indicesToRemove.isEmpty()) { @@ -1667,9 +1675,9 @@ public class Ofdpa2GroupHandler { // Nevertheless groupStore may not be in sync due to bug in the store // - see CORD-1844. XXX When this bug is fixed, the rest of this verify // method will not be required. - GroupKey hashGroupKey = allActiveKeys.get(0).peekFirst(); - Group hashGroup = groupService.getGroup(deviceId, hashGroupKey); - int actualGroupSize = hashGroup.buckets().buckets().size(); + GroupKey topGroupKey = allActiveKeys.get(0).peekFirst(); + Group topGroup = groupService.getGroup(deviceId, topGroupKey); + int actualGroupSize = topGroup.buckets().buckets().size(); int objGroupSize = nextObjective.next().size(); if (actualGroupSize != objGroupSize) { log.warn("Mismatch detected in device:{}, nextId:{}, nextObjective-size" @@ -1677,9 +1685,10 @@ public class Ofdpa2GroupHandler { objGroupSize, actualGroupSize); } if (actualGroupSize > objGroupSize) { + // Group in the device has more chains List bucketsToRemove = Lists.newArrayList(); //check every bucket in the actual group - for (GroupBucket bucket : hashGroup.buckets().buckets()) { + for (GroupBucket bucket : topGroup.buckets().buckets()) { GroupInstruction g = (GroupInstruction) bucket.treatment() .allInstructions().iterator().next(); GroupId gidToCheck = g.groupId(); // the group pointed to @@ -1707,11 +1716,12 @@ public class Ofdpa2GroupHandler { + "buckets to remove"); } else { GroupBuckets removeBuckets = new GroupBuckets(bucketsToRemove); - groupService.removeBucketsFromGroup(deviceId, hashGroupKey, - removeBuckets, hashGroupKey, + groupService.removeBucketsFromGroup(deviceId, topGroupKey, + removeBuckets, topGroupKey, nextObjective.appId()); } } else if (actualGroupSize < objGroupSize) { + // Group in the device has less chains // should also add buckets not in group-store but in obj-store List bucketsToAdd = Lists.newArrayList(); //check every bucket in the obj @@ -1727,7 +1737,7 @@ public class Ofdpa2GroupHandler { continue; } boolean matches = false; - for (GroupBucket bucket : hashGroup.buckets().buckets()) { + for (GroupBucket bucket : topGroup.buckets().buckets()) { GroupInstruction g = (GroupInstruction) bucket.treatment() .allInstructions().iterator().next(); GroupId gidToCheck = g.groupId(); // the group pointed to @@ -1741,7 +1751,12 @@ public class Ofdpa2GroupHandler { TrafficTreatment t = DefaultTrafficTreatment.builder() .group(pointedGroup.id()) .build(); - bucketsToAdd.add(DefaultGroupBucket.createSelectGroupBucket(t)); + // Create the proper bucket according to the next type + if (nextObjective.type() == NextObjective.Type.HASHED) { + bucketsToAdd.add(DefaultGroupBucket.createSelectGroupBucket(t)); + } else { + bucketsToAdd.add(DefaultGroupBucket.createAllGroupBucket(t)); + } } } if (bucketsToAdd.isEmpty()) { @@ -1749,8 +1764,8 @@ public class Ofdpa2GroupHandler { + "buckets to add"); } else { GroupBuckets addBuckets = new GroupBuckets(bucketsToAdd); - groupService.addBucketsToGroup(deviceId, hashGroupKey, - addBuckets, hashGroupKey, + groupService.addBucketsToGroup(deviceId, topGroupKey, + addBuckets, topGroupKey, nextObjective.appId()); } }