diff --git a/apps/events/src/main/java/org/onosproject/events/EventHistoryManager.java b/apps/events/src/main/java/org/onosproject/events/EventHistoryManager.java index 82d179ae24..7e4be177ce 100644 --- a/apps/events/src/main/java/org/onosproject/events/EventHistoryManager.java +++ b/apps/events/src/main/java/org/onosproject/events/EventHistoryManager.java @@ -115,7 +115,8 @@ public class EventHistoryManager appId = coreService.registerApplication("org.onosproject.events"); log.debug("Registered as {}", appId); - pruner = newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/events", "history-pruner"))); + pruner = newSingleThreadScheduledExecutor( + minPriority(groupedThreads("onos/events", "history-pruner", log))); pruner.scheduleWithFixedDelay(this::pruneEventHistoryTask, pruneInterval, pruneInterval, TimeUnit.SECONDS); diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java index 60fd95abd9..5ef70fcea3 100644 --- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java +++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java @@ -125,7 +125,8 @@ public class DeviceManager @Activate public void activate() { - backgroundService = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "manager-background")); + backgroundService = newSingleThreadScheduledExecutor( + groupedThreads("onos/device", "manager-background", log)); localNodeId = clusterService.getLocalNode().id(); store.setDelegate(delegate); @@ -499,7 +500,7 @@ public class DeviceManager deviceId, response, mastershipService.getLocalRole(deviceId)); // roleManager got the device to comply, but doesn't agree with // the store; use the store's view, then try to reassert. - backgroundService.submit(() -> reassertRole(deviceId, mastershipService.getLocalRole(deviceId))); + backgroundService.execute(() -> reassertRole(deviceId, mastershipService.getLocalRole(deviceId))); return; } } else { @@ -684,7 +685,7 @@ public class DeviceManager @Override public void event(MastershipEvent event) { - backgroundService.submit(() -> { + backgroundService.execute(() -> { try { handleMastershipEvent(event); } catch (Exception e) { diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java index 94a782a6ae..c3f1200ba0 100644 --- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java +++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java @@ -122,10 +122,10 @@ public class FlowRuleManager private final FlowRuleDriverProvider defaultProvider = new FlowRuleDriverProvider(); protected ExecutorService deviceInstallers = - Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "device-installer-%d")); + Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "device-installer-%d", log)); protected ExecutorService operationsService = - Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "operations-%d")); + Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "operations-%d, log")); private IdGenerator idGenerator; diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java index 61fc0df41a..1fc64d6a6f 100644 --- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java +++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java @@ -200,7 +200,7 @@ public class IntentManager if (newNumThreads != numThreads) { numThreads = newNumThreads; ExecutorService oldWorkerExecutor = workerExecutor; - workerExecutor = newFixedThreadPool(numThreads, groupedThreads("onos/intent", "worker-%d")); + workerExecutor = newFixedThreadPool(numThreads, groupedThreads("onos/intent", "worker-%d", log)); if (oldWorkerExecutor != null) { oldWorkerExecutor.shutdown(); } diff --git a/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceDeviceListener.java b/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceDeviceListener.java index 1792bce2a2..8e36ea948a 100644 --- a/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceDeviceListener.java +++ b/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceDeviceListener.java @@ -138,11 +138,11 @@ final class ResourceDeviceListener implements DeviceListener { } private void registerDeviceResource(Device device) { - executor.submit(() -> adminService.register(Resources.discrete(device.id()).resource())); + executor.execute(() -> adminService.register(Resources.discrete(device.id()).resource())); } private void unregisterDeviceResource(Device device) { - executor.submit(() -> { + executor.execute(() -> { DiscreteResource devResource = Resources.discrete(device.id()).resource(); List allResources = getDescendantResources(devResource); adminService.unregister(Lists.transform(allResources, Resource::id)); @@ -151,7 +151,7 @@ final class ResourceDeviceListener implements DeviceListener { private void registerPortResource(Device device, Port port) { Resource portPath = Resources.discrete(device.id(), port.number()).resource(); - executor.submit(() -> { + executor.execute(() -> { adminService.register(portPath); queryBandwidth(device.id(), port.number()) @@ -198,7 +198,7 @@ final class ResourceDeviceListener implements DeviceListener { } private void unregisterPortResource(Device device, Port port) { - executor.submit(() -> { + executor.execute(() -> { DiscreteResource portResource = Resources.discrete(device.id(), port.number()).resource(); List allResources = getDescendantResources(portResource); adminService.unregister(Lists.transform(allResources, Resource::id)); diff --git a/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceNetworkConfigListener.java b/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceNetworkConfigListener.java index d6f9fdb233..07561e4364 100644 --- a/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceNetworkConfigListener.java +++ b/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceNetworkConfigListener.java @@ -89,7 +89,7 @@ final class ResourceNetworkConfigListener implements NetworkConfigListener { @Override public void event(NetworkConfigEvent event) { if (event.configClass() == BandwidthCapacity.class) { - executor.submit(() -> { + executor.execute(() -> { try { handleBandwidthCapacity(event); } catch (Exception e) { diff --git a/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceRegistrar.java b/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceRegistrar.java index 1dbf19728f..77216ff5ba 100644 --- a/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceRegistrar.java +++ b/core/net/src/main/java/org/onosproject/net/newresource/impl/ResourceRegistrar.java @@ -84,7 +84,7 @@ public final class ResourceRegistrar { private DeviceListener deviceListener; private final ExecutorService executor = - Executors.newSingleThreadExecutor(groupedThreads("onos/resource", "registrar")); + Executors.newSingleThreadExecutor(groupedThreads("onos/resource", "registrar", log)); private NetworkConfigListener cfgListener; diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java index 51f759a241..36699b0add 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java @@ -68,6 +68,7 @@ import java.util.function.Function; import static com.google.common.collect.Multimaps.newSetMultimap; import static com.google.common.collect.Multimaps.synchronizedSetMultimap; import static com.google.common.io.ByteStreams.toByteArray; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.onlab.util.Tools.groupedThreads; import static org.onlab.util.Tools.randomDelay; @@ -138,10 +139,10 @@ public class GossipApplicationStore extends ApplicationArchive .register(MultiValuedTimestamp.class) .register(InternalState.class); - executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store")); + executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log)); messageHandlingExecutor = Executors.newSingleThreadExecutor( - groupedThreads("onos/store/app", "message-handler")); + groupedThreads("onos/store/app", "message-handler", log)); clusterCommunicator.addSubscriber(APP_BITS_REQUEST, bytes -> new String(bytes, Charsets.UTF_8), diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java index a10b79d288..e5f40b3dba 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java @@ -96,13 +96,13 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Predicates.notNull; import static com.google.common.base.Verify.verify; +import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; import static org.onlab.util.Tools.groupedThreads; @@ -200,10 +200,10 @@ public class GossipDeviceStore @Activate public void activate() { - executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d")); + executor = newCachedThreadPool(groupedThreads("onos/device", "fg-%d", log)); backgroundExecutor = - newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d"))); + newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d", log))); clusterCommunicator.addSubscriber( GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor); diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java index a2b9995c1e..7b94670292 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java @@ -305,7 +305,7 @@ public class NewDistributedFlowRuleStore msgHandlerPoolSize = newPoolSize; ExecutorService oldMsgHandler = messageHandlingExecutor; messageHandlingExecutor = Executors.newFixedThreadPool( - msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers")); + msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log)); // replace previously registered handlers. registerMessageHandlers(messageHandlingExecutor); diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java index bd5eea58b2..01f3cb2396 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java @@ -123,10 +123,10 @@ public class ConsistentDeviceMastershipStore public void activate() { messageHandlingExecutor = Executors.newSingleThreadExecutor( - groupedThreads("onos/store/device/mastership", "message-handler")); + groupedThreads("onos/store/device/mastership", "message-handler", log)); transferExecutor = Executors.newSingleThreadScheduledExecutor( - groupedThreads("onos/store/device/mastership", "mastership-transfer-executor")); + groupedThreads("onos/store/device/mastership", "mastership-transfer-executor", log)); clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT, SERIALIZER::decode, this::relinquishLocalRole, diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java index 8e59ef684b..c3f907a712 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java @@ -118,7 +118,7 @@ public class DistributedFlowStatisticStore implements FlowStatisticStore { messageHandlingExecutor = Executors.newFixedThreadPool( messageHandlerThreadPoolSize, - groupedThreads("onos/store/statistic", "message-handlers")); + groupedThreads("onos/store/statistic", "message-handlers", log)); clusterCommunicator.addSubscriber( GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode, @@ -200,6 +200,7 @@ public class DistributedFlowStatisticStore implements FlowStatisticStore { previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; }); } + @Override public synchronized void updateFlowStatistic(FlowEntry rule) { ConnectPoint cp = buildConnectPoint(rule); if (cp == null) {