From 52ef933d5e55176c3ba2b4ce87df151e3587c646 Mon Sep 17 00:00:00 2001 From: pierventre Date: Fri, 9 Jul 2021 22:42:17 +0200 Subject: [PATCH] [SDFAB-20] Prevent listeners ejection and the stop of the group polling Offload listeners processing to external executors to prevent the listener ejection due to time consuming processing In future, we may want to extend the same fix to the HostManager and NetworkConfigHostProvider Additionally, avoid the propagation of the exceptions in GroupDriverProvider which leads to the cancellation of the peridioc poll task Change-Id: I8ea4ec9fda1ccc48bbd3855fd443ee8760cbbb60 --- .../impl/SimpleIntManager.java | 147 ++++++++++-------- .../impl/SimpleIntManagerTest.java | 5 +- .../routeservice/impl/RouteMonitor.java | 56 ++++--- .../net/group/impl/GroupDriverProvider.java | 16 +- .../net/meter/impl/MeterManager.java | 41 ++--- .../impl/GnmiDeviceStateSubscriber.java | 64 +++++--- 6 files changed, 188 insertions(+), 141 deletions(-) diff --git a/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java b/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java index ffb6b4f995..bf7dc184bc 100644 --- a/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java +++ b/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java @@ -72,6 +72,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -79,6 +80,8 @@ import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static org.onlab.util.Tools.groupedThreads; import static org.slf4j.LoggerFactory.getLogger; /** @@ -152,6 +155,8 @@ public class SimpleIntManager implements IntService { } }; + protected ExecutorService eventExecutor; + @Activate public void activate() { @@ -206,6 +211,10 @@ public class SimpleIntManager implements IntService { // Bootstrap config for already existing devices. triggerAllDeviceConfigure(); + // Bootstrap core event executor before adding listener + eventExecutor = newSingleThreadScheduledExecutor(groupedThreads( + "onos/int", "events-%d", log)); + hostService.addListener(hostListener); deviceService.addListener(deviceListener); @@ -256,6 +265,8 @@ public class SimpleIntManager implements IntService { deviceService.getDevices().forEach(d -> cleanupDevice(d.id())); netcfgService.removeListener(appConfigListener); netcfgRegistry.unregisterConfigFactory(intAppConfigFactory); + eventExecutor.shutdownNow(); + eventExecutor = null; log.info("Deactivated"); } @@ -503,30 +514,34 @@ public class SimpleIntManager implements IntService { private class InternalHostListener implements HostListener { @Override public void event(HostEvent event) { - final DeviceId deviceId = event.subject().location().deviceId(); - triggerDeviceConfigure(deviceId); + eventExecutor.execute(() -> { + final DeviceId deviceId = event.subject().location().deviceId(); + triggerDeviceConfigure(deviceId); + }); } } private class InternalDeviceListener implements DeviceListener { @Override public void event(DeviceEvent event) { - switch (event.type()) { - case DEVICE_ADDED: - case DEVICE_UPDATED: - case DEVICE_REMOVED: - case DEVICE_SUSPENDED: - case DEVICE_AVAILABILITY_CHANGED: - case PORT_ADDED: - case PORT_UPDATED: - case PORT_REMOVED: - triggerDeviceConfigure(event.subject().id()); - return; - case PORT_STATS_UPDATED: - return; - default: - log.warn("Unknown device event type {}", event.type()); - } + eventExecutor.execute(() -> { + switch (event.type()) { + case DEVICE_ADDED: + case DEVICE_UPDATED: + case DEVICE_REMOVED: + case DEVICE_SUSPENDED: + case DEVICE_AVAILABILITY_CHANGED: + case PORT_ADDED: + case PORT_UPDATED: + case PORT_REMOVED: + triggerDeviceConfigure(event.subject().id()); + return; + case PORT_STATS_UPDATED: + return; + default: + log.warn("Unknown device event type {}", event.type()); + } + }); } } @@ -580,56 +595,56 @@ public class SimpleIntManager implements IntService { @Override public void event(NetworkConfigEvent event) { - switch (event.type()) { - case CONFIG_ADDED: - case CONFIG_UPDATED: - event.config() - .map(config -> (IntReportConfig) config) - .ifPresent(config -> { - IntDeviceConfig intDeviceConfig = IntDeviceConfig.builder() - .withMinFlowHopLatencyChangeNs(config.minFlowHopLatencyChangeNs()) - .withCollectorPort(config.collectorPort()) - .withCollectorIp(config.collectorIp()) - .enabled(true) - .build(); - setConfig(intDeviceConfig); + eventExecutor.execute(() -> { + if (event.configClass() == IntReportConfig.class) { + switch (event.type()) { + case CONFIG_ADDED: + case CONFIG_UPDATED: + event.config() + .map(config -> (IntReportConfig) config) + .ifPresent(config -> { + IntDeviceConfig intDeviceConfig = IntDeviceConfig.builder() + .withMinFlowHopLatencyChangeNs(config.minFlowHopLatencyChangeNs()) + .withCollectorPort(config.collectorPort()) + .withCollectorIp(config.collectorIp()) + .enabled(true) + .build(); + setConfig(intDeviceConfig); - // For each watched subnet, we install two INT rules. - // One match on the source, another match on the destination. - intentMap.clear(); - config.watchSubnets().forEach(subnet -> { - IntIntent.Builder intIntentBuilder = IntIntent.builder() - .withReportType(IntIntent.IntReportType.TRACKED_FLOW) - .withReportType(IntIntent.IntReportType.DROPPED_PACKET) - .withReportType(IntIntent.IntReportType.CONGESTED_QUEUE) - .withTelemetryMode(IntIntent.TelemetryMode.POSTCARD); - if (subnet.prefixLength() == 0) { - // Special case, match any packet - installIntIntent(intIntentBuilder - .withSelector(DefaultTrafficSelector.emptySelector()) - .build()); - } else { - TrafficSelector selector = DefaultTrafficSelector.builder() - .matchIPSrc(subnet) - .build(); - installIntIntent(intIntentBuilder.withSelector(selector).build()); - selector = DefaultTrafficSelector.builder() - .matchIPDst(subnet) - .build(); - installIntIntent(intIntentBuilder.withSelector(selector).build()); - } - }); - }); - break; - // TODO: Support removing INT config. - default: - break; - } + // For each watched subnet, we install two INT rules. + // One match on the source, another match on the destination. + intentMap.clear(); + config.watchSubnets().forEach(subnet -> { + IntIntent.Builder intIntentBuilder = IntIntent.builder() + .withReportType(IntIntent.IntReportType.TRACKED_FLOW) + .withReportType(IntIntent.IntReportType.DROPPED_PACKET) + .withReportType(IntIntent.IntReportType.CONGESTED_QUEUE) + .withTelemetryMode(IntIntent.TelemetryMode.POSTCARD); + if (subnet.prefixLength() == 0) { + // Special case, match any packet + installIntIntent(intIntentBuilder + .withSelector(DefaultTrafficSelector.emptySelector()) + .build()); + } else { + TrafficSelector selector = DefaultTrafficSelector.builder() + .matchIPSrc(subnet) + .build(); + installIntIntent(intIntentBuilder.withSelector(selector).build()); + selector = DefaultTrafficSelector.builder() + .matchIPDst(subnet) + .build(); + installIntIntent(intIntentBuilder.withSelector(selector).build()); + } + }); + }); + break; + // TODO: Support removing INT config. + default: + break; + } + } + }); } - @Override - public boolean isRelevant(NetworkConfigEvent event) { - return event.configClass() == IntReportConfig.class; - } } } diff --git a/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java b/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java index 8519ed5e31..7fddf3cbb4 100644 --- a/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java +++ b/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; import org.easymock.Capture; import org.easymock.EasyMock; import org.junit.After; @@ -81,6 +82,7 @@ import static org.easymock.EasyMock.reset; import static org.easymock.EasyMock.verify; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.onlab.junit.TestTools.assertAfter; import static org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable.IntFunctionality.POSTCARD; import static org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable.IntFunctionality.SINK; import static org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable.IntFunctionality.SOURCE; @@ -151,6 +153,7 @@ public class SimpleIntManagerTest { manager.hostService = hostService; manager.netcfgService = networkConfigService; manager.netcfgRegistry = networkConfigRegistry; + manager.eventExecutor = MoreExecutors.newDirectExecutorService(); expect(coreService.registerApplication(APP_NAME)) .andReturn(APP_ID).anyTimes(); @@ -226,7 +229,7 @@ public class SimpleIntManagerTest { // The INT intent installation order can be random, so we need to collect // all expected INT intents and check if actual intent exists. - assertEquals(5, intentMap.size()); + assertAfter(50, 100, () -> assertEquals(5, intentMap.size())); intentMap.entrySet().forEach(entry -> { IntIntent actualIntIntent = entry.getValue().value(); assertTrue(expectedIntIntents.contains(actualIntIntent)); diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java index ede60db15a..5ae26289c3 100644 --- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java +++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collection; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; @@ -65,6 +66,9 @@ public class RouteMonitor { private final ScheduledExecutorService reaperExecutor = newSingleThreadScheduledExecutor(groupedThreads("route/reaper", "", log)); + private final ExecutorService eventExecutor = newSingleThreadScheduledExecutor(groupedThreads( + "onos/routemonitor", "events-%d", log)); + /** * Creates a new route monitor. * @@ -94,6 +98,8 @@ public class RouteMonitor { public void shutdown() { stopProcessing(); clusterService.removeListener(clusterListener); + eventExecutor.shutdownNow(); + reaperExecutor.shutdownNow(); asyncLock.unlock(); } @@ -145,31 +151,33 @@ public class RouteMonitor { @Override public void event(ClusterEvent event) { - switch (event.type()) { - case INSTANCE_DEACTIVATED: - NodeId id = event.subject().id(); - log.info("Node {} deactivated", id); + eventExecutor.execute(() -> { + switch (event.type()) { + case INSTANCE_DEACTIVATED: + NodeId id = event.subject().id(); + log.info("Node {} deactivated", id); - // DistributedLock is introduced to guarantee that minority nodes won't try to remove - // routes that originated from majority nodes. - // Adding 15 seconds retry for the leadership election to be completed. - asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> { - if (result != null && result.isPresent()) { - log.debug("Lock obtained. Put {} into removal queue", id); - queue.addOne(id); - asyncLock.unlock(); - } else { - log.debug("Fail to obtain lock. Do not remove routes from {}", id); - } - }); - break; - case INSTANCE_ADDED: - case INSTANCE_REMOVED: - case INSTANCE_ACTIVATED: - case INSTANCE_READY: - default: - break; - } + // DistributedLock is introduced to guarantee that minority nodes won't try to remove + // routes that originated from majority nodes. + // Adding 15 seconds retry for the leadership election to be completed. + asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> { + if (result != null && result.isPresent()) { + log.debug("Lock obtained. Put {} into removal queue", id); + queue.addOne(id); + asyncLock.unlock(); + } else { + log.debug("Fail to obtain lock. Do not remove routes from {}", id); + } + }); + break; + case INSTANCE_ADDED: + case INSTANCE_REMOVED: + case INSTANCE_ACTIVATED: + case INSTANCE_READY: + default: + break; + } + }); } } } diff --git a/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java b/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java index f86e4a1eda..1092d8da34 100644 --- a/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java +++ b/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java @@ -117,12 +117,16 @@ public class GroupDriverProvider extends AbstractProvider implements GroupProvid } private void pollGroups() { - deviceService.getAvailableDevices().forEach(device -> { - if (mastershipService.isLocalMaster(device.id()) && - device.is(GroupProgrammable.class)) { - pollDeviceGroups(device.id()); - } - }); + try { + deviceService.getAvailableDevices().forEach(device -> { + if (mastershipService.isLocalMaster(device.id()) && + device.is(GroupProgrammable.class)) { + pollDeviceGroups(device.id()); + } + }); + } catch (Exception e) { + log.warn("Exception thrown while polling groups", e); + } } private void pollDeviceGroups(DeviceId deviceId) { diff --git a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java index 39ef3d906e..363876c3d3 100644 --- a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java +++ b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java @@ -508,26 +508,29 @@ public class MeterManager @Override public void event(DeviceEvent event) { - switch (event.type()) { - case DEVICE_REMOVED: - case DEVICE_AVAILABILITY_CHANGED: - DeviceId deviceId = event.subject().id(); - if (!deviceService.isAvailable(deviceId)) { - BasicDeviceConfig cfg = netCfgService.getConfig(deviceId, BasicDeviceConfig.class); - //if purgeOnDisconnection is set for the device or it's a global configuration - // lets remove the meters. - boolean purge = cfg != null && cfg.isPurgeOnDisconnectionConfigured() ? - cfg.purgeOnDisconnection() : purgeOnDisconnection; - if (purge) { - log.info("PurgeOnDisconnection is requested for device {}, " + - "removing meters", deviceId); - store.purgeMeter(deviceId); + DeviceId deviceId = event.subject().id(); + meterInstallers.execute(() -> { + switch (event.type()) { + case DEVICE_REMOVED: + case DEVICE_AVAILABILITY_CHANGED: + if (!deviceService.isAvailable(deviceId)) { + BasicDeviceConfig cfg = netCfgService.getConfig(deviceId, BasicDeviceConfig.class); + //if purgeOnDisconnection is set for the device or it's a global configuration + // lets remove the meters. + boolean purge = cfg != null && cfg.isPurgeOnDisconnectionConfigured() ? + cfg.purgeOnDisconnection() : purgeOnDisconnection; + if (purge) { + log.info("PurgeOnDisconnection is requested for device {}, " + + "removing meters", deviceId); + store.purgeMeter(deviceId); + } } - } - break; - default: - break; - } + break; + default: + break; + } + }, deviceId.hashCode()); + } } diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java index d55ca7f748..f36e7a85ea 100644 --- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java +++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java @@ -52,9 +52,13 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static org.onlab.util.Tools.groupedThreads; + /** * Entity that manages gNMI subscription for devices using OpenConfig models and * that reports relevant events to the core. @@ -78,6 +82,8 @@ class GnmiDeviceStateSubscriber { private final Striped deviceLocks = Striped.lock(30); + private ExecutorService eventExecutor; + GnmiDeviceStateSubscriber(GnmiController gnmiController, DeviceService deviceService, MastershipService mastershipService, DeviceProviderService providerService) { @@ -88,6 +94,8 @@ class GnmiDeviceStateSubscriber { } public void activate() { + eventExecutor = newSingleThreadScheduledExecutor(groupedThreads( + "onos/gnmi", "events-%d", log)); deviceService.addListener(deviceEventListener); mastershipService.addListener(mastershipListener); gnmiController.addListener(gnmiEventListener); @@ -100,6 +108,8 @@ class GnmiDeviceStateSubscriber { deviceService.removeListener(deviceEventListener); mastershipService.removeListener(mastershipListener); gnmiController.removeListener(gnmiEventListener); + eventExecutor.shutdownNow(); + eventExecutor = null; } private void checkSubscription(DeviceId deviceId) { @@ -248,19 +258,21 @@ class GnmiDeviceStateSubscriber { @Override public void event(GnmiEvent event) { - if (!deviceSubscribed.containsKey(event.subject().deviceId())) { - log.warn("Received gNMI event from {}, but we did'nt expect to " + - "be subscribed to it! Discarding event...", - event.subject().deviceId()); - return; - } + eventExecutor.execute(() -> { + if (!deviceSubscribed.containsKey(event.subject().deviceId())) { + log.warn("Received gNMI event from {}, but we did'nt expect to " + + "be subscribed to it! Discarding event...", + event.subject().deviceId()); + return; + } - log.debug("Received gNMI event {}", event.toString()); - if (event.type() == GnmiEvent.Type.UPDATE) { - handleGnmiUpdate((GnmiUpdate) event.subject()); - } else { - log.debug("Unsupported gNMI event type: {}", event.type()); - } + log.debug("Received gNMI event {}", event.toString()); + if (event.type() == GnmiEvent.Type.UPDATE) { + handleGnmiUpdate((GnmiUpdate) event.subject()); + } else { + log.debug("Unsupported gNMI event type: {}", event.type()); + } + }); } } @@ -268,7 +280,7 @@ class GnmiDeviceStateSubscriber { @Override public void event(MastershipEvent event) { - checkSubscription(event.subject()); + eventExecutor.execute(() -> checkSubscription(event.subject())); } } @@ -276,18 +288,20 @@ class GnmiDeviceStateSubscriber { @Override public void event(DeviceEvent event) { - switch (event.type()) { - case DEVICE_ADDED: - case DEVICE_AVAILABILITY_CHANGED: - case DEVICE_UPDATED: - case DEVICE_REMOVED: - case PORT_ADDED: - case PORT_REMOVED: - checkSubscription(event.subject().id()); - break; - default: - break; - } + eventExecutor.execute(() -> { + switch (event.type()) { + case DEVICE_ADDED: + case DEVICE_AVAILABILITY_CHANGED: + case DEVICE_UPDATED: + case DEVICE_REMOVED: + case PORT_ADDED: + case PORT_REMOVED: + checkSubscription(event.subject().id()); + break; + default: + break; + } + }); } } }