diff --git a/apps/faultmanagement/fmmgr/src/main/java/org/onosproject/faultmanagement/impl/PollingAlarmProvider.java b/apps/faultmanagement/fmmgr/src/main/java/org/onosproject/faultmanagement/impl/PollingAlarmProvider.java index eae9a6354e..70f60d1c26 100644 --- a/apps/faultmanagement/fmmgr/src/main/java/org/onosproject/faultmanagement/impl/PollingAlarmProvider.java +++ b/apps/faultmanagement/fmmgr/src/main/java/org/onosproject/faultmanagement/impl/PollingAlarmProvider.java @@ -47,6 +47,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static com.google.common.base.Strings.isNullOrEmpty; +import static java.util.concurrent.Executors.newScheduledThreadPool; import static org.onlab.util.Tools.get; import static org.onlab.util.Tools.groupedThreads; import static org.slf4j.LoggerFactory.getLogger; @@ -100,7 +101,9 @@ public class PollingAlarmProvider extends AbstractProvider implements AlarmProvi @Activate public void activate(ComponentContext context) { - alarmsExecutor = Executors.newScheduledThreadPool(CORE_POOL_SIZE); + alarmsExecutor = newScheduledThreadPool(CORE_POOL_SIZE, + groupedThreads("onos/pollingalarmprovider", + "alarm-executor-%d", log)); eventHandlingExecutor = Executors.newFixedThreadPool(CORE_POOL_SIZE, groupedThreads("onos/pollingalarmprovider", diff --git a/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java b/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java index 4eeca07b75..6c194f774e 100644 --- a/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java +++ b/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java @@ -36,12 +36,13 @@ import org.onosproject.mastership.MastershipListener; import org.onosproject.mastership.MastershipService; import org.slf4j.Logger; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static org.onlab.util.Tools.groupedThreads; import static org.slf4j.LoggerFactory.getLogger; /** @@ -87,7 +88,7 @@ public class MastershipLoadBalancer { //Ensures that all executions do not interfere with one another (single thread) private ListeningScheduledExecutorService executorService = MoreExecutors. - listeningDecorator(Executors.newSingleThreadScheduledExecutor()); + listeningDecorator(newSingleThreadScheduledExecutor(groupedThreads("MastershipLoadBalancer", "%d", log))); @Activate public void activate() { diff --git a/apps/openstackinterface/app/src/main/java/org/onosproject/openstackinterface/impl/OpenstackInterfaceManager.java b/apps/openstackinterface/app/src/main/java/org/onosproject/openstackinterface/impl/OpenstackInterfaceManager.java index 152d3e2ad7..1eef7c88a6 100644 --- a/apps/openstackinterface/app/src/main/java/org/onosproject/openstackinterface/impl/OpenstackInterfaceManager.java +++ b/apps/openstackinterface/app/src/main/java/org/onosproject/openstackinterface/impl/OpenstackInterfaceManager.java @@ -126,7 +126,7 @@ public class OpenstackInterfaceManager implements OpenstackInterfaceService { private InternalConfigListener internalConfigListener = new InternalConfigListener(); private ExecutorService networkEventExcutorService = - Executors.newSingleThreadExecutor(groupedThreads("onos/openstackinterface", "config-event")); + Executors.newSingleThreadExecutor(groupedThreads("onos/openstackinterface", "config-event", log)); private final Set factories = ImmutableSet.of( new ConfigFactory(APP_SUBJECT_FACTORY, @@ -293,6 +293,7 @@ public class OpenstackInterfaceManager implements OpenstackInterfaceService { * @param id Security Group ID * @return OpenstackSecurityGroup object or null if fails */ + @Override public OpenstackSecurityGroup securityGroup(String id) { Invocation.Builder builder = getClientBuilder(neutronUrl, URI_SECURITY_GROUPS + "/" + id); if (builder == null) { diff --git a/apps/routing/src/main/java/org/onosproject/routing/bgp/BgpSession.java b/apps/routing/src/main/java/org/onosproject/routing/bgp/BgpSession.java index 0352520477..f9d2a20acf 100644 --- a/apps/routing/src/main/java/org/onosproject/routing/bgp/BgpSession.java +++ b/apps/routing/src/main/java/org/onosproject/routing/bgp/BgpSession.java @@ -31,6 +31,8 @@ import org.onlab.packet.IpPrefix; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.onlab.util.Tools.groupedThreads; + import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Collection; @@ -58,7 +60,7 @@ public class BgpSession extends SimpleChannelHandler { private final BgpSessionInfo remoteInfo; // BGP session remote info // Timers state - private Timer timer = new HashedWheelTimer(); + private Timer timer = new HashedWheelTimer(groupedThreads("BgpSession", "timer-%d", log)); private volatile Timeout keepaliveTimeout; // Periodic KEEPALIVE private volatile Timeout sessionTimeout; // Session timeout diff --git a/apps/routing/src/main/java/org/onosproject/routing/bgp/BgpSessionManager.java b/apps/routing/src/main/java/org/onosproject/routing/bgp/BgpSessionManager.java index 92448b4de4..fbecd68bc3 100644 --- a/apps/routing/src/main/java/org/onosproject/routing/bgp/BgpSessionManager.java +++ b/apps/routing/src/main/java/org/onosproject/routing/bgp/BgpSessionManager.java @@ -136,6 +136,7 @@ public class BgpSessionManager implements BgpInfoService { * * @return the BGP sessions */ + @Override public Collection getBgpSessions() { return bgpSessions.values(); } @@ -145,6 +146,7 @@ public class BgpSessionManager implements BgpInfoService { * * @return the selected IPv4 BGP routes among all BGP sessions */ + @Override public Collection getBgpRoutes4() { return bgpRoutes4.values(); } @@ -154,6 +156,7 @@ public class BgpSessionManager implements BgpInfoService { * * @return the selected IPv6 BGP routes among all BGP sessions */ + @Override public Collection getBgpRoutes6() { return bgpRoutes6.values(); } @@ -309,8 +312,8 @@ public class BgpSessionManager implements BgpInfoService { isShutdown = false; ChannelFactory channelFactory = new NioServerSocketChannelFactory( - newCachedThreadPool(groupedThreads("onos/bgp", "sm-boss-%d")), - newCachedThreadPool(groupedThreads("onos/bgp", "sm-worker-%d"))); + newCachedThreadPool(groupedThreads("onos/bgp", "sm-boss-%d", log)), + newCachedThreadPool(groupedThreads("onos/bgp", "sm-worker-%d", log))); ChannelPipelineFactory pipelineFactory = () -> { // Allocate a new session per connection BgpSession bgpSessionHandler = diff --git a/apps/routing/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/src/main/java/org/onosproject/routing/fpm/FpmManager.java index c30e71d831..90650443e4 100644 --- a/apps/routing/src/main/java/org/onosproject/routing/fpm/FpmManager.java +++ b/apps/routing/src/main/java/org/onosproject/routing/fpm/FpmManager.java @@ -121,8 +121,8 @@ public class FpmManager implements FpmInfoService { private void startServer() { ChannelFactory channelFactory = new NioServerSocketChannelFactory( - newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d")), - newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d"))); + newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d", log)), + newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d", log))); ChannelPipelineFactory pipelineFactory = () -> { // Allocate a new session per connection FpmSessionHandler fpmSessionHandler = diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java index 358fa75c3a..870ea06892 100644 --- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java +++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java @@ -30,11 +30,13 @@ import org.onosproject.segmentrouting.config.DeviceConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.concurrent.Executors.newScheduledThreadPool; +import static org.onlab.util.Tools.groupedThreads; + import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -58,7 +60,8 @@ public class DefaultRoutingHandler { private DeviceConfiguration config; private final Lock statusLock = new ReentrantLock(); private volatile Status populationStatus; - private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + private ScheduledExecutorService executorService + = newScheduledThreadPool(1, groupedThreads("RoutingHandler", "retry-%d", log)); /** * Represents the default routing population status. diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java index c115704884..fce63fe925 100644 --- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java +++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java @@ -102,6 +102,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkState; +import static org.onlab.util.Tools.groupedThreads; /** @@ -178,12 +179,12 @@ public class SegmentRoutingManager implements SegmentRoutingService { private final InternalCordConfigListener cordConfigListener = new InternalCordConfigListener(); private ScheduledExecutorService executorService = Executors - .newScheduledThreadPool(1); + .newScheduledThreadPool(1, groupedThreads("SegmentRoutingManager", "event-%d", log)); @SuppressWarnings("unused") private static ScheduledFuture eventHandlerFuture = null; @SuppressWarnings("rawtypes") - private ConcurrentLinkedQueue eventQueue = new ConcurrentLinkedQueue(); + private ConcurrentLinkedQueue eventQueue = new ConcurrentLinkedQueue<>(); private Map groupHandlerMap = new ConcurrentHashMap<>(); /** @@ -712,7 +713,7 @@ public class SegmentRoutingManager implements SegmentRoutingService { } else if (event.type() == DeviceEvent.Type.PORT_ADDED || event.type() == DeviceEvent.Type.PORT_UPDATED) { log.info("** PORT ADDED OR UPDATED {}/{} -> {}", - (Device) event.subject(), + event.subject(), ((DeviceEvent) event).port(), event.type()); /* XXX create method for single port filtering rules diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java b/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java index 2571204a41..6db036595a 100644 --- a/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java +++ b/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java @@ -25,7 +25,6 @@ import java.io.InputStream; import java.net.URL; import java.net.URLConnection; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -65,6 +64,7 @@ import com.google.common.collect.Sets; import com.google.common.io.Files; import static com.google.common.base.Preconditions.checkState; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; /** * Provider of {@link ClusterMetadata cluster metadata} sourced from a local config file. @@ -89,7 +89,7 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr private static final ProviderId PROVIDER_ID = new ProviderId("file", "none"); private final AtomicReference> cachedMetadata = new AtomicReference<>(); private final ScheduledExecutorService configFileChangeDetector = - Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/cluster/metadata/config-watcher", "")); + newSingleThreadScheduledExecutor(groupedThreads("onos/cluster/metadata/config-watcher", "", log)); private String metadataUrl; private ObjectMapper mapper; diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleDriverProvider.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleDriverProvider.java index e617ae92a0..a0ad43989f 100644 --- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleDriverProvider.java +++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleDriverProvider.java @@ -42,12 +42,13 @@ import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static com.google.common.collect.ImmutableSet.copyOf; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static org.onlab.util.Tools.groupedThreads; import static org.onosproject.net.device.DeviceEvent.Type.*; import static org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation.*; @@ -67,7 +68,8 @@ class FlowRuleDriverProvider extends AbstractProvider implements FlowRuleProvide private MastershipService mastershipService; private InternalDeviceListener deviceListener = new InternalDeviceListener(); - private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private ScheduledExecutorService executor + = newSingleThreadScheduledExecutor(groupedThreads("FlowRuleDriverProvider", "%d", log)); private ScheduledFuture poller = null; /** diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/composition/FlowObjectiveCompositionManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/composition/FlowObjectiveCompositionManager.java index 032fcd3022..c33a2f4f5b 100644 --- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/composition/FlowObjectiveCompositionManager.java +++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/composition/FlowObjectiveCompositionManager.java @@ -145,7 +145,7 @@ public class FlowObjectiveCompositionManager implements FlowObjectiveService { @Activate protected void activate() { - executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d")); + executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d", log)); flowObjectiveStore.setDelegate(delegate); mastershipService.addListener(mastershipListener); deviceService.addListener(deviceListener); diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java b/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java index 45a2af492a..24abda4455 100644 --- a/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java +++ b/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java @@ -61,7 +61,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -69,6 +68,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.Multimaps.synchronizedSetMultimap; +import static java.util.concurrent.Executors.newScheduledThreadPool; import static java.util.concurrent.Executors.newSingleThreadExecutor; import static org.onlab.util.Tools.groupedThreads; import static org.onlab.util.Tools.isNullOrEmpty; @@ -117,8 +117,8 @@ public class ObjectiveTracker implements ObjectiveTrackerService { private ExecutorService executorService = newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker", log)); - private ScheduledExecutorService executor = Executors - .newScheduledThreadPool(1); + private ScheduledExecutorService executor = + newScheduledThreadPool(1, groupedThreads("onos/intent", "scheduledIntentUpdate", log)); private TopologyListener listener = new InternalTopologyListener(); private ResourceListener resourceListener = new InternalResourceListener(); diff --git a/core/security/src/main/java/org/onosproject/security/store/DistributedSecurityModeStore.java b/core/security/src/main/java/org/onosproject/security/store/DistributedSecurityModeStore.java index 144900fc14..a0ce69b8c9 100644 --- a/core/security/src/main/java/org/onosproject/security/store/DistributedSecurityModeStore.java +++ b/core/security/src/main/java/org/onosproject/security/store/DistributedSecurityModeStore.java @@ -29,7 +29,6 @@ import org.apache.karaf.features.BundleInfo; import org.apache.karaf.features.Feature; import org.apache.karaf.features.FeaturesService; import org.onlab.util.KryoNamespace; -import org.onlab.util.Tools; import org.onosproject.app.ApplicationAdminService; import org.onosproject.core.Application; import org.onosproject.core.ApplicationId; @@ -50,9 +49,10 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Collectors; +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static org.onlab.util.Tools.groupedThreads; import static org.onosproject.security.store.SecurityModeState.*; import static org.slf4j.LoggerFactory.getLogger; @@ -102,7 +102,7 @@ public class DistributedSecurityModeStore @Activate public void activate() { - eventHandler = Executors.newSingleThreadExecutor(Tools.groupedThreads("onos/security/store", "event-handler")); + eventHandler = newSingleThreadExecutor(groupedThreads("onos/security/store", "event-handler", log)); states = storageService.consistentMapBuilder() .withName("smonos-sdata") .withSerializer(STATE_SERIALIZER) diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java index 2d09bcfdf2..ab266b6e76 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java +++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java @@ -36,6 +36,8 @@ import org.onosproject.net.intent.Key; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.onlab.util.Tools.groupedThreads; + import java.util.List; import java.util.Objects; import java.util.concurrent.Executors; @@ -77,7 +79,7 @@ public class IntentPartitionManager implements IntentPartitionService { private LeadershipEventListener leaderListener = new InternalLeadershipListener(); private ScheduledExecutorService executor = Executors - .newScheduledThreadPool(1); + .newScheduledThreadPool(1, groupedThreads("IntentPartition", "balancer-%d", log)); @Activate public void activate() { diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java index 27b0973657..25429ade76 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java @@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; +import static java.util.concurrent.Executors.newFixedThreadPool; import static org.onlab.util.Tools.get; import static org.onlab.util.Tools.groupedThreads; import static org.slf4j.LoggerFactory.getLogger; @@ -284,7 +285,8 @@ public class DistributedPacketStore */ private void restartMessageHandlerThreadPool() { ExecutorService prevExecutor = messageHandlingExecutor; - messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize()); + messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(), + groupedThreads("DistPktStore", "messageHandling-%d", log)); prevExecutor.shutdown(); } diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java index 74ab40b112..21371c9c47 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java @@ -59,6 +59,7 @@ import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; +import static java.util.concurrent.Executors.newFixedThreadPool; import static org.onlab.util.Tools.get; import static org.onlab.util.Tools.groupedThreads; import static org.slf4j.LoggerFactory.getLogger; @@ -338,7 +339,8 @@ public class DistributedFlowStatisticStore implements FlowStatisticStore { */ private void restartMessageHandlerThreadPool() { ExecutorService prevExecutor = messageHandlingExecutor; - messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize()); + messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(), + groupedThreads("DistFlowStats", "messageHandling-%d", log)); prevExecutor.shutdown(); } diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java index 51552f0bbb..78c6d789e7 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java @@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; +import static java.util.concurrent.Executors.newFixedThreadPool; import static org.onlab.util.Tools.get; import static org.onlab.util.Tools.groupedThreads; import static org.slf4j.LoggerFactory.getLogger; @@ -357,7 +358,8 @@ public class DistributedStatisticStore implements StatisticStore { */ private void restartMessageHandlerThreadPool() { ExecutorService prevExecutor = messageHandlingExecutor; - messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize()); + messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(), + groupedThreads("DistStatsStore", "messageHandling-%d", log)); prevExecutor.shutdown(); } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/OnosCopycatClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/OnosCopycatClient.java index 46b85b966b..27d513a666 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/OnosCopycatClient.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/OnosCopycatClient.java @@ -15,12 +15,13 @@ */ package org.onosproject.store.primitives.impl; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static org.onlab.util.Tools.groupedThreads; import static org.slf4j.LoggerFactory.getLogger; import java.net.ConnectException; import java.nio.channels.ClosedChannelException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -62,7 +63,7 @@ public class OnosCopycatClient extends DelegatingCopycatClient { super(client); this.maxRetries = maxRetries; this.delayBetweenRetriesMillis = delayBetweenRetriesMillis; - this.executor = Executors.newSingleThreadScheduledExecutor(); + this.executor = newSingleThreadScheduledExecutor(groupedThreads("OnosCopycat", "client", log)); } @Override diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java index 7b4ad473d6..879cbb35d7 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueue.java @@ -15,6 +15,8 @@ */ package org.onosproject.store.primitives.resources.impl; +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static org.onlab.util.Tools.groupedThreads; import static org.slf4j.LoggerFactory.getLogger; import java.util.Collection; @@ -24,7 +26,6 @@ import java.util.Timer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -58,7 +59,7 @@ public class AtomixWorkQueue extends AbstractResource private final Logger log = getLogger(getClass()); public static final String TASK_AVAILABLE = "task-available"; - private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private final ExecutorService executor = newSingleThreadExecutor(groupedThreads("AtomixWorkQueue", "%d", log)); private final AtomicReference taskProcessor = new AtomicReference<>(); private final Timer timer = new Timer("atomix-work-queue-completer"); private final AtomicBoolean isRegistered = new AtomicBoolean(false); diff --git a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java index e76d9fe480..ebc1da4dd8 100644 --- a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java +++ b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java @@ -111,7 +111,7 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp private ScheduledExecutorService groupChecker = Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner", - "ovs-corsa-%d")); + "ovs-corsa-%d", log)); protected static final int CONTROLLER_PRIORITY = 255; protected static final int DROP_PRIORITY = 0; diff --git a/incubator/core/src/main/java/org/onosproject/incubator/component/impl/ComponentManager.java b/incubator/core/src/main/java/org/onosproject/incubator/component/impl/ComponentManager.java index 9fd440f6e5..f8ae422642 100644 --- a/incubator/core/src/main/java/org/onosproject/incubator/component/impl/ComponentManager.java +++ b/incubator/core/src/main/java/org/onosproject/incubator/component/impl/ComponentManager.java @@ -62,7 +62,7 @@ public class ComponentManager implements ComponentService { components = Sets.newSetFromMap(new ConcurrentHashMap<>()); executor = Executors.newScheduledThreadPool(NUM_THREADS, - groupedThreads("onos/component", "%d")); + groupedThreads("onos/component", "%d", log)); executor.scheduleAtFixedRate(() -> components.forEach(this::enableComponent), 0, POLLING_PERIOD_MS, TimeUnit.MILLISECONDS); diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/routing/impl/RouteManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/routing/impl/RouteManager.java index 004a550123..684cc34d47 100644 --- a/incubator/net/src/main/java/org/onosproject/incubator/net/routing/impl/RouteManager.java +++ b/incubator/net/src/main/java/org/onosproject/incubator/net/routing/impl/RouteManager.java @@ -85,7 +85,7 @@ public class RouteManager implements ListenerService, @Activate protected void activate() { - threadFactory = groupedThreads("onos/route", "listener-%d"); + threadFactory = groupedThreads("onos/route", "listener-%d", log); routeStore.setDelegate(delegate); hostService.addListener(hostListener); diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java index fda0d83dcf..3da65ee34f 100644 --- a/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java +++ b/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java @@ -120,7 +120,8 @@ public class DistributedLabelResourceStore messageHandlingExecutor = Executors .newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE, groupedThreads("onos/store/flow", - "message-handlers")); + "message-handlers", + log)); clusterCommunicator .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED, SERIALIZER::decode, diff --git a/protocols/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/Controller.java b/protocols/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/Controller.java index 1f0e452960..924ad871a0 100644 --- a/protocols/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/Controller.java +++ b/protocols/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/Controller.java @@ -39,6 +39,8 @@ import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.CharsetUtil; +import static org.onlab.util.Tools.groupedThreads; + import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -47,6 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.onlab.packet.IpAddress; import org.onlab.packet.TpPort; +import org.onlab.util.Tools; import org.onosproject.ovsdb.controller.OvsdbConstant; import org.onosproject.ovsdb.controller.OvsdbNodeId; import org.onosproject.ovsdb.controller.driver.DefaultOvsdbClient; @@ -70,7 +73,7 @@ public class Controller { private Callback monitorCallback; private final ExecutorService executorService = Executors - .newFixedThreadPool(10); + .newFixedThreadPool(10, groupedThreads("OVSDB-C", "executor-%d", log)); private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; @@ -83,8 +86,8 @@ public class Controller { * Initialization. */ private void initEventLoopGroup() { - bossGroup = new NioEventLoopGroup(); - workerGroup = new NioEventLoopGroup(); + bossGroup = new NioEventLoopGroup(0, Tools.groupedThreads("OVSDB-C", "boss-%d", log)); + workerGroup = new NioEventLoopGroup(0, Tools.groupedThreads("OVSDB-C", "worker-%d", log)); serverChannelClass = NioServerSocketChannel.class; } @@ -118,6 +121,7 @@ public class Controller { */ private class OnosCommunicationChannelInitializer extends ChannelInitializer { + @Override protected void initChannel(SocketChannel channel) throws Exception { log.info("New channel created"); channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); diff --git a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java index eec8b29976..9bd8e7847f 100644 --- a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java +++ b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java @@ -71,6 +71,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import static java.util.concurrent.Executors.newScheduledThreadPool; import static org.onlab.util.Tools.groupedThreads; import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY; import static org.slf4j.LoggerFactory.getLogger; @@ -125,7 +126,10 @@ public class NetconfDeviceProvider extends AbstractProvider private final ExecutorService executor = Executors.newFixedThreadPool(5, groupedThreads("onos/netconfdeviceprovider", "device-installer-%d", log)); - protected ScheduledExecutorService connectionExecutor = Executors.newScheduledThreadPool(CORE_POOL_SIZE); + protected ScheduledExecutorService connectionExecutor + = newScheduledThreadPool(CORE_POOL_SIZE, + groupedThreads("onos/netconfdeviceprovider", + "connection-executor-%d", log)); private DeviceProviderService providerService; private NetconfDeviceListener innerNodeListener = new InnerNetconfDeviceListener(); diff --git a/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java b/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java index 59bf6d327a..23254ad213 100644 --- a/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java +++ b/utils/misc/src/main/java/org/onlab/util/SlidingWindowCounter.java @@ -18,13 +18,14 @@ package org.onlab.util; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static org.onlab.util.Tools.groupedThreads; /** * Maintains a sliding window of value counts. The sliding window counter is @@ -62,7 +63,7 @@ public final class SlidingWindowCounter { .map(AtomicLong::new) .collect(Collectors.toCollection(ArrayList::new)); - background = Executors.newSingleThreadScheduledExecutor(); + background = newSingleThreadScheduledExecutor(groupedThreads("SlidingWindowCounter", "bg-%d")); background.scheduleWithFixedDelay(this::advanceHead, 0, SLIDE_WINDOW_PERIOD_SECONDS, TimeUnit.SECONDS); } diff --git a/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandler.java b/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandler.java index 873184e2a9..71c017dc5f 100644 --- a/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandler.java +++ b/web/gui/src/main/java/org/onosproject/ui/impl/TopologyViewMessageHandler.java @@ -177,7 +177,7 @@ public class TopologyViewMessageHandler extends TopologyViewMessageHandlerBase { private final Accumulator eventAccummulator = new InternalEventAccummulator(); private final ExecutorService msgSender = - newSingleThreadExecutor(groupedThreads("onos/gui", "msg-sender")); + newSingleThreadExecutor(groupedThreads("onos/gui", "msg-sender", log)); private TopoOverlayCache overlayCache; private TrafficMonitor traffic; diff --git a/web/gui/src/main/java/org/onosproject/ui/impl/topo/model/UiSharedTopologyModel.java b/web/gui/src/main/java/org/onosproject/ui/impl/topo/model/UiSharedTopologyModel.java index debdde91fd..50ebb06050 100644 --- a/web/gui/src/main/java/org/onosproject/ui/impl/topo/model/UiSharedTopologyModel.java +++ b/web/gui/src/main/java/org/onosproject/ui/impl/topo/model/UiSharedTopologyModel.java @@ -134,7 +134,7 @@ public final class UiSharedTopologyModel @Activate protected void activate() { cache = new ModelCache(new DefaultServiceBundle(), eventDispatcher); - eventHandler = Executors.newSingleThreadExecutor(Tools.groupedThreads("onos/ui/topo", "event-handler")); + eventHandler = Executors.newSingleThreadExecutor(Tools.groupedThreads("onos/ui/topo", "event-handler", log)); eventDispatcher.addSink(UiModelEvent.class, listenerRegistry);