diff --git a/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java b/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java index ffb6b4f995..bf7dc184bc 100644 --- a/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java +++ b/apps/inbandtelemetry/impl/src/main/java/org/onosproject/inbandtelemetry/impl/SimpleIntManager.java @@ -72,6 +72,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -79,6 +80,8 @@ import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static org.onlab.util.Tools.groupedThreads; import static org.slf4j.LoggerFactory.getLogger; /** @@ -152,6 +155,8 @@ public class SimpleIntManager implements IntService { } }; + protected ExecutorService eventExecutor; + @Activate public void activate() { @@ -206,6 +211,10 @@ public class SimpleIntManager implements IntService { // Bootstrap config for already existing devices. triggerAllDeviceConfigure(); + // Bootstrap core event executor before adding listener + eventExecutor = newSingleThreadScheduledExecutor(groupedThreads( + "onos/int", "events-%d", log)); + hostService.addListener(hostListener); deviceService.addListener(deviceListener); @@ -256,6 +265,8 @@ public class SimpleIntManager implements IntService { deviceService.getDevices().forEach(d -> cleanupDevice(d.id())); netcfgService.removeListener(appConfigListener); netcfgRegistry.unregisterConfigFactory(intAppConfigFactory); + eventExecutor.shutdownNow(); + eventExecutor = null; log.info("Deactivated"); } @@ -503,30 +514,34 @@ public class SimpleIntManager implements IntService { private class InternalHostListener implements HostListener { @Override public void event(HostEvent event) { - final DeviceId deviceId = event.subject().location().deviceId(); - triggerDeviceConfigure(deviceId); + eventExecutor.execute(() -> { + final DeviceId deviceId = event.subject().location().deviceId(); + triggerDeviceConfigure(deviceId); + }); } } private class InternalDeviceListener implements DeviceListener { @Override public void event(DeviceEvent event) { - switch (event.type()) { - case DEVICE_ADDED: - case DEVICE_UPDATED: - case DEVICE_REMOVED: - case DEVICE_SUSPENDED: - case DEVICE_AVAILABILITY_CHANGED: - case PORT_ADDED: - case PORT_UPDATED: - case PORT_REMOVED: - triggerDeviceConfigure(event.subject().id()); - return; - case PORT_STATS_UPDATED: - return; - default: - log.warn("Unknown device event type {}", event.type()); - } + eventExecutor.execute(() -> { + switch (event.type()) { + case DEVICE_ADDED: + case DEVICE_UPDATED: + case DEVICE_REMOVED: + case DEVICE_SUSPENDED: + case DEVICE_AVAILABILITY_CHANGED: + case PORT_ADDED: + case PORT_UPDATED: + case PORT_REMOVED: + triggerDeviceConfigure(event.subject().id()); + return; + case PORT_STATS_UPDATED: + return; + default: + log.warn("Unknown device event type {}", event.type()); + } + }); } } @@ -580,56 +595,56 @@ public class SimpleIntManager implements IntService { @Override public void event(NetworkConfigEvent event) { - switch (event.type()) { - case CONFIG_ADDED: - case CONFIG_UPDATED: - event.config() - .map(config -> (IntReportConfig) config) - .ifPresent(config -> { - IntDeviceConfig intDeviceConfig = IntDeviceConfig.builder() - .withMinFlowHopLatencyChangeNs(config.minFlowHopLatencyChangeNs()) - .withCollectorPort(config.collectorPort()) - .withCollectorIp(config.collectorIp()) - .enabled(true) - .build(); - setConfig(intDeviceConfig); + eventExecutor.execute(() -> { + if (event.configClass() == IntReportConfig.class) { + switch (event.type()) { + case CONFIG_ADDED: + case CONFIG_UPDATED: + event.config() + .map(config -> (IntReportConfig) config) + .ifPresent(config -> { + IntDeviceConfig intDeviceConfig = IntDeviceConfig.builder() + .withMinFlowHopLatencyChangeNs(config.minFlowHopLatencyChangeNs()) + .withCollectorPort(config.collectorPort()) + .withCollectorIp(config.collectorIp()) + .enabled(true) + .build(); + setConfig(intDeviceConfig); - // For each watched subnet, we install two INT rules. - // One match on the source, another match on the destination. - intentMap.clear(); - config.watchSubnets().forEach(subnet -> { - IntIntent.Builder intIntentBuilder = IntIntent.builder() - .withReportType(IntIntent.IntReportType.TRACKED_FLOW) - .withReportType(IntIntent.IntReportType.DROPPED_PACKET) - .withReportType(IntIntent.IntReportType.CONGESTED_QUEUE) - .withTelemetryMode(IntIntent.TelemetryMode.POSTCARD); - if (subnet.prefixLength() == 0) { - // Special case, match any packet - installIntIntent(intIntentBuilder - .withSelector(DefaultTrafficSelector.emptySelector()) - .build()); - } else { - TrafficSelector selector = DefaultTrafficSelector.builder() - .matchIPSrc(subnet) - .build(); - installIntIntent(intIntentBuilder.withSelector(selector).build()); - selector = DefaultTrafficSelector.builder() - .matchIPDst(subnet) - .build(); - installIntIntent(intIntentBuilder.withSelector(selector).build()); - } - }); - }); - break; - // TODO: Support removing INT config. - default: - break; - } + // For each watched subnet, we install two INT rules. + // One match on the source, another match on the destination. + intentMap.clear(); + config.watchSubnets().forEach(subnet -> { + IntIntent.Builder intIntentBuilder = IntIntent.builder() + .withReportType(IntIntent.IntReportType.TRACKED_FLOW) + .withReportType(IntIntent.IntReportType.DROPPED_PACKET) + .withReportType(IntIntent.IntReportType.CONGESTED_QUEUE) + .withTelemetryMode(IntIntent.TelemetryMode.POSTCARD); + if (subnet.prefixLength() == 0) { + // Special case, match any packet + installIntIntent(intIntentBuilder + .withSelector(DefaultTrafficSelector.emptySelector()) + .build()); + } else { + TrafficSelector selector = DefaultTrafficSelector.builder() + .matchIPSrc(subnet) + .build(); + installIntIntent(intIntentBuilder.withSelector(selector).build()); + selector = DefaultTrafficSelector.builder() + .matchIPDst(subnet) + .build(); + installIntIntent(intIntentBuilder.withSelector(selector).build()); + } + }); + }); + break; + // TODO: Support removing INT config. + default: + break; + } + } + }); } - @Override - public boolean isRelevant(NetworkConfigEvent event) { - return event.configClass() == IntReportConfig.class; - } } } diff --git a/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java b/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java index 8519ed5e31..7fddf3cbb4 100644 --- a/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java +++ b/apps/inbandtelemetry/impl/src/test/java/org/onosproject/inbandtelemetry/impl/SimpleIntManagerTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; import org.easymock.Capture; import org.easymock.EasyMock; import org.junit.After; @@ -81,6 +82,7 @@ import static org.easymock.EasyMock.reset; import static org.easymock.EasyMock.verify; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.onlab.junit.TestTools.assertAfter; import static org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable.IntFunctionality.POSTCARD; import static org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable.IntFunctionality.SINK; import static org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable.IntFunctionality.SOURCE; @@ -151,6 +153,7 @@ public class SimpleIntManagerTest { manager.hostService = hostService; manager.netcfgService = networkConfigService; manager.netcfgRegistry = networkConfigRegistry; + manager.eventExecutor = MoreExecutors.newDirectExecutorService(); expect(coreService.registerApplication(APP_NAME)) .andReturn(APP_ID).anyTimes(); @@ -226,7 +229,7 @@ public class SimpleIntManagerTest { // The INT intent installation order can be random, so we need to collect // all expected INT intents and check if actual intent exists. - assertEquals(5, intentMap.size()); + assertAfter(50, 100, () -> assertEquals(5, intentMap.size())); intentMap.entrySet().forEach(entry -> { IntIntent actualIntIntent = entry.getValue().value(); assertTrue(expectedIntIntents.contains(actualIntIntent)); diff --git a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java index ede60db15a..5ae26289c3 100644 --- a/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java +++ b/apps/route-service/app/src/main/java/org/onosproject/routeservice/impl/RouteMonitor.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collection; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; @@ -65,6 +66,9 @@ public class RouteMonitor { private final ScheduledExecutorService reaperExecutor = newSingleThreadScheduledExecutor(groupedThreads("route/reaper", "", log)); + private final ExecutorService eventExecutor = newSingleThreadScheduledExecutor(groupedThreads( + "onos/routemonitor", "events-%d", log)); + /** * Creates a new route monitor. * @@ -94,6 +98,8 @@ public class RouteMonitor { public void shutdown() { stopProcessing(); clusterService.removeListener(clusterListener); + eventExecutor.shutdownNow(); + reaperExecutor.shutdownNow(); asyncLock.unlock(); } @@ -145,31 +151,33 @@ public class RouteMonitor { @Override public void event(ClusterEvent event) { - switch (event.type()) { - case INSTANCE_DEACTIVATED: - NodeId id = event.subject().id(); - log.info("Node {} deactivated", id); + eventExecutor.execute(() -> { + switch (event.type()) { + case INSTANCE_DEACTIVATED: + NodeId id = event.subject().id(); + log.info("Node {} deactivated", id); - // DistributedLock is introduced to guarantee that minority nodes won't try to remove - // routes that originated from majority nodes. - // Adding 15 seconds retry for the leadership election to be completed. - asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> { - if (result != null && result.isPresent()) { - log.debug("Lock obtained. Put {} into removal queue", id); - queue.addOne(id); - asyncLock.unlock(); - } else { - log.debug("Fail to obtain lock. Do not remove routes from {}", id); - } - }); - break; - case INSTANCE_ADDED: - case INSTANCE_REMOVED: - case INSTANCE_ACTIVATED: - case INSTANCE_READY: - default: - break; - } + // DistributedLock is introduced to guarantee that minority nodes won't try to remove + // routes that originated from majority nodes. + // Adding 15 seconds retry for the leadership election to be completed. + asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> { + if (result != null && result.isPresent()) { + log.debug("Lock obtained. Put {} into removal queue", id); + queue.addOne(id); + asyncLock.unlock(); + } else { + log.debug("Fail to obtain lock. Do not remove routes from {}", id); + } + }); + break; + case INSTANCE_ADDED: + case INSTANCE_REMOVED: + case INSTANCE_ACTIVATED: + case INSTANCE_READY: + default: + break; + } + }); } } } diff --git a/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java b/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java index f86e4a1eda..1092d8da34 100644 --- a/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java +++ b/core/net/src/main/java/org/onosproject/net/group/impl/GroupDriverProvider.java @@ -117,12 +117,16 @@ public class GroupDriverProvider extends AbstractProvider implements GroupProvid } private void pollGroups() { - deviceService.getAvailableDevices().forEach(device -> { - if (mastershipService.isLocalMaster(device.id()) && - device.is(GroupProgrammable.class)) { - pollDeviceGroups(device.id()); - } - }); + try { + deviceService.getAvailableDevices().forEach(device -> { + if (mastershipService.isLocalMaster(device.id()) && + device.is(GroupProgrammable.class)) { + pollDeviceGroups(device.id()); + } + }); + } catch (Exception e) { + log.warn("Exception thrown while polling groups", e); + } } private void pollDeviceGroups(DeviceId deviceId) { diff --git a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java index 39ef3d906e..363876c3d3 100644 --- a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java +++ b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java @@ -508,26 +508,29 @@ public class MeterManager @Override public void event(DeviceEvent event) { - switch (event.type()) { - case DEVICE_REMOVED: - case DEVICE_AVAILABILITY_CHANGED: - DeviceId deviceId = event.subject().id(); - if (!deviceService.isAvailable(deviceId)) { - BasicDeviceConfig cfg = netCfgService.getConfig(deviceId, BasicDeviceConfig.class); - //if purgeOnDisconnection is set for the device or it's a global configuration - // lets remove the meters. - boolean purge = cfg != null && cfg.isPurgeOnDisconnectionConfigured() ? - cfg.purgeOnDisconnection() : purgeOnDisconnection; - if (purge) { - log.info("PurgeOnDisconnection is requested for device {}, " + - "removing meters", deviceId); - store.purgeMeter(deviceId); + DeviceId deviceId = event.subject().id(); + meterInstallers.execute(() -> { + switch (event.type()) { + case DEVICE_REMOVED: + case DEVICE_AVAILABILITY_CHANGED: + if (!deviceService.isAvailable(deviceId)) { + BasicDeviceConfig cfg = netCfgService.getConfig(deviceId, BasicDeviceConfig.class); + //if purgeOnDisconnection is set for the device or it's a global configuration + // lets remove the meters. + boolean purge = cfg != null && cfg.isPurgeOnDisconnectionConfigured() ? + cfg.purgeOnDisconnection() : purgeOnDisconnection; + if (purge) { + log.info("PurgeOnDisconnection is requested for device {}, " + + "removing meters", deviceId); + store.purgeMeter(deviceId); + } } - } - break; - default: - break; - } + break; + default: + break; + } + }, deviceId.hashCode()); + } } diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java index d55ca7f748..f36e7a85ea 100644 --- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java +++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java @@ -52,9 +52,13 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static org.onlab.util.Tools.groupedThreads; + /** * Entity that manages gNMI subscription for devices using OpenConfig models and * that reports relevant events to the core. @@ -78,6 +82,8 @@ class GnmiDeviceStateSubscriber { private final Striped deviceLocks = Striped.lock(30); + private ExecutorService eventExecutor; + GnmiDeviceStateSubscriber(GnmiController gnmiController, DeviceService deviceService, MastershipService mastershipService, DeviceProviderService providerService) { @@ -88,6 +94,8 @@ class GnmiDeviceStateSubscriber { } public void activate() { + eventExecutor = newSingleThreadScheduledExecutor(groupedThreads( + "onos/gnmi", "events-%d", log)); deviceService.addListener(deviceEventListener); mastershipService.addListener(mastershipListener); gnmiController.addListener(gnmiEventListener); @@ -100,6 +108,8 @@ class GnmiDeviceStateSubscriber { deviceService.removeListener(deviceEventListener); mastershipService.removeListener(mastershipListener); gnmiController.removeListener(gnmiEventListener); + eventExecutor.shutdownNow(); + eventExecutor = null; } private void checkSubscription(DeviceId deviceId) { @@ -248,19 +258,21 @@ class GnmiDeviceStateSubscriber { @Override public void event(GnmiEvent event) { - if (!deviceSubscribed.containsKey(event.subject().deviceId())) { - log.warn("Received gNMI event from {}, but we did'nt expect to " + - "be subscribed to it! Discarding event...", - event.subject().deviceId()); - return; - } + eventExecutor.execute(() -> { + if (!deviceSubscribed.containsKey(event.subject().deviceId())) { + log.warn("Received gNMI event from {}, but we did'nt expect to " + + "be subscribed to it! Discarding event...", + event.subject().deviceId()); + return; + } - log.debug("Received gNMI event {}", event.toString()); - if (event.type() == GnmiEvent.Type.UPDATE) { - handleGnmiUpdate((GnmiUpdate) event.subject()); - } else { - log.debug("Unsupported gNMI event type: {}", event.type()); - } + log.debug("Received gNMI event {}", event.toString()); + if (event.type() == GnmiEvent.Type.UPDATE) { + handleGnmiUpdate((GnmiUpdate) event.subject()); + } else { + log.debug("Unsupported gNMI event type: {}", event.type()); + } + }); } } @@ -268,7 +280,7 @@ class GnmiDeviceStateSubscriber { @Override public void event(MastershipEvent event) { - checkSubscription(event.subject()); + eventExecutor.execute(() -> checkSubscription(event.subject())); } } @@ -276,18 +288,20 @@ class GnmiDeviceStateSubscriber { @Override public void event(DeviceEvent event) { - switch (event.type()) { - case DEVICE_ADDED: - case DEVICE_AVAILABILITY_CHANGED: - case DEVICE_UPDATED: - case DEVICE_REMOVED: - case PORT_ADDED: - case PORT_REMOVED: - checkSubscription(event.subject().id()); - break; - default: - break; - } + eventExecutor.execute(() -> { + switch (event.type()) { + case DEVICE_ADDED: + case DEVICE_AVAILABILITY_CHANGED: + case DEVICE_UPDATED: + case DEVICE_REMOVED: + case PORT_ADDED: + case PORT_REMOVED: + checkSubscription(event.subject().id()); + break; + default: + break; + } + }); } } }