Assigning names to various threads

- specify nearby logger when possible

Change-Id: Ia8925ab1459c4d1da922becd0e201388d44d4294
This commit is contained in:
Yuta HIGUCHI 2016-07-21 16:54:33 -07:00 committed by Yuta HIGUCHI
parent c7258f9675
commit 1624df1f30
28 changed files with 81 additions and 45 deletions

View File

@ -47,6 +47,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.get; import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads; import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
@ -100,7 +101,9 @@ public class PollingAlarmProvider extends AbstractProvider implements AlarmProvi
@Activate @Activate
public void activate(ComponentContext context) { public void activate(ComponentContext context) {
alarmsExecutor = Executors.newScheduledThreadPool(CORE_POOL_SIZE); alarmsExecutor = newScheduledThreadPool(CORE_POOL_SIZE,
groupedThreads("onos/pollingalarmprovider",
"alarm-executor-%d", log));
eventHandlingExecutor = eventHandlingExecutor =
Executors.newFixedThreadPool(CORE_POOL_SIZE, Executors.newFixedThreadPool(CORE_POOL_SIZE,
groupedThreads("onos/pollingalarmprovider", groupedThreads("onos/pollingalarmprovider",

View File

@ -36,12 +36,13 @@ import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService; import org.onosproject.mastership.MastershipService;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
/** /**
@ -87,7 +88,7 @@ public class MastershipLoadBalancer {
//Ensures that all executions do not interfere with one another (single thread) //Ensures that all executions do not interfere with one another (single thread)
private ListeningScheduledExecutorService executorService = MoreExecutors. private ListeningScheduledExecutorService executorService = MoreExecutors.
listeningDecorator(Executors.newSingleThreadScheduledExecutor()); listeningDecorator(newSingleThreadScheduledExecutor(groupedThreads("MastershipLoadBalancer", "%d", log)));
@Activate @Activate
public void activate() { public void activate() {

View File

@ -126,7 +126,7 @@ public class OpenstackInterfaceManager implements OpenstackInterfaceService {
private InternalConfigListener internalConfigListener = new InternalConfigListener(); private InternalConfigListener internalConfigListener = new InternalConfigListener();
private ExecutorService networkEventExcutorService = private ExecutorService networkEventExcutorService =
Executors.newSingleThreadExecutor(groupedThreads("onos/openstackinterface", "config-event")); Executors.newSingleThreadExecutor(groupedThreads("onos/openstackinterface", "config-event", log));
private final Set<ConfigFactory> factories = ImmutableSet.of( private final Set<ConfigFactory> factories = ImmutableSet.of(
new ConfigFactory<ApplicationId, OpenstackInterfaceConfig>(APP_SUBJECT_FACTORY, new ConfigFactory<ApplicationId, OpenstackInterfaceConfig>(APP_SUBJECT_FACTORY,
@ -293,6 +293,7 @@ public class OpenstackInterfaceManager implements OpenstackInterfaceService {
* @param id Security Group ID * @param id Security Group ID
* @return OpenstackSecurityGroup object or null if fails * @return OpenstackSecurityGroup object or null if fails
*/ */
@Override
public OpenstackSecurityGroup securityGroup(String id) { public OpenstackSecurityGroup securityGroup(String id) {
Invocation.Builder builder = getClientBuilder(neutronUrl, URI_SECURITY_GROUPS + "/" + id); Invocation.Builder builder = getClientBuilder(neutronUrl, URI_SECURITY_GROUPS + "/" + id);
if (builder == null) { if (builder == null) {

View File

@ -31,6 +31,8 @@ import org.onlab.packet.IpPrefix;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.onlab.util.Tools.groupedThreads;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
@ -58,7 +60,7 @@ public class BgpSession extends SimpleChannelHandler {
private final BgpSessionInfo remoteInfo; // BGP session remote info private final BgpSessionInfo remoteInfo; // BGP session remote info
// Timers state // Timers state
private Timer timer = new HashedWheelTimer(); private Timer timer = new HashedWheelTimer(groupedThreads("BgpSession", "timer-%d", log));
private volatile Timeout keepaliveTimeout; // Periodic KEEPALIVE private volatile Timeout keepaliveTimeout; // Periodic KEEPALIVE
private volatile Timeout sessionTimeout; // Session timeout private volatile Timeout sessionTimeout; // Session timeout

View File

@ -136,6 +136,7 @@ public class BgpSessionManager implements BgpInfoService {
* *
* @return the BGP sessions * @return the BGP sessions
*/ */
@Override
public Collection<BgpSession> getBgpSessions() { public Collection<BgpSession> getBgpSessions() {
return bgpSessions.values(); return bgpSessions.values();
} }
@ -145,6 +146,7 @@ public class BgpSessionManager implements BgpInfoService {
* *
* @return the selected IPv4 BGP routes among all BGP sessions * @return the selected IPv4 BGP routes among all BGP sessions
*/ */
@Override
public Collection<BgpRouteEntry> getBgpRoutes4() { public Collection<BgpRouteEntry> getBgpRoutes4() {
return bgpRoutes4.values(); return bgpRoutes4.values();
} }
@ -154,6 +156,7 @@ public class BgpSessionManager implements BgpInfoService {
* *
* @return the selected IPv6 BGP routes among all BGP sessions * @return the selected IPv6 BGP routes among all BGP sessions
*/ */
@Override
public Collection<BgpRouteEntry> getBgpRoutes6() { public Collection<BgpRouteEntry> getBgpRoutes6() {
return bgpRoutes6.values(); return bgpRoutes6.values();
} }
@ -309,8 +312,8 @@ public class BgpSessionManager implements BgpInfoService {
isShutdown = false; isShutdown = false;
ChannelFactory channelFactory = new NioServerSocketChannelFactory( ChannelFactory channelFactory = new NioServerSocketChannelFactory(
newCachedThreadPool(groupedThreads("onos/bgp", "sm-boss-%d")), newCachedThreadPool(groupedThreads("onos/bgp", "sm-boss-%d", log)),
newCachedThreadPool(groupedThreads("onos/bgp", "sm-worker-%d"))); newCachedThreadPool(groupedThreads("onos/bgp", "sm-worker-%d", log)));
ChannelPipelineFactory pipelineFactory = () -> { ChannelPipelineFactory pipelineFactory = () -> {
// Allocate a new session per connection // Allocate a new session per connection
BgpSession bgpSessionHandler = BgpSession bgpSessionHandler =

View File

@ -121,8 +121,8 @@ public class FpmManager implements FpmInfoService {
private void startServer() { private void startServer() {
ChannelFactory channelFactory = new NioServerSocketChannelFactory( ChannelFactory channelFactory = new NioServerSocketChannelFactory(
newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d")), newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d", log)),
newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d"))); newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d", log)));
ChannelPipelineFactory pipelineFactory = () -> { ChannelPipelineFactory pipelineFactory = () -> {
// Allocate a new session per connection // Allocate a new session per connection
FpmSessionHandler fpmSessionHandler = FpmSessionHandler fpmSessionHandler =

View File

@ -30,11 +30,13 @@ import org.onosproject.segmentrouting.config.DeviceConfiguration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -58,7 +60,8 @@ public class DefaultRoutingHandler {
private DeviceConfiguration config; private DeviceConfiguration config;
private final Lock statusLock = new ReentrantLock(); private final Lock statusLock = new ReentrantLock();
private volatile Status populationStatus; private volatile Status populationStatus;
private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); private ScheduledExecutorService executorService
= newScheduledThreadPool(1, groupedThreads("RoutingHandler", "retry-%d", log));
/** /**
* Represents the default routing population status. * Represents the default routing population status.

View File

@ -102,6 +102,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static org.onlab.util.Tools.groupedThreads;
/** /**
@ -178,12 +179,12 @@ public class SegmentRoutingManager implements SegmentRoutingService {
private final InternalCordConfigListener cordConfigListener = new InternalCordConfigListener(); private final InternalCordConfigListener cordConfigListener = new InternalCordConfigListener();
private ScheduledExecutorService executorService = Executors private ScheduledExecutorService executorService = Executors
.newScheduledThreadPool(1); .newScheduledThreadPool(1, groupedThreads("SegmentRoutingManager", "event-%d", log));
@SuppressWarnings("unused") @SuppressWarnings("unused")
private static ScheduledFuture<?> eventHandlerFuture = null; private static ScheduledFuture<?> eventHandlerFuture = null;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private ConcurrentLinkedQueue<Event> eventQueue = new ConcurrentLinkedQueue<Event>(); private ConcurrentLinkedQueue<Event> eventQueue = new ConcurrentLinkedQueue<>();
private Map<DeviceId, DefaultGroupHandler> groupHandlerMap = private Map<DeviceId, DefaultGroupHandler> groupHandlerMap =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
/** /**
@ -712,7 +713,7 @@ public class SegmentRoutingManager implements SegmentRoutingService {
} else if (event.type() == DeviceEvent.Type.PORT_ADDED || } else if (event.type() == DeviceEvent.Type.PORT_ADDED ||
event.type() == DeviceEvent.Type.PORT_UPDATED) { event.type() == DeviceEvent.Type.PORT_UPDATED) {
log.info("** PORT ADDED OR UPDATED {}/{} -> {}", log.info("** PORT ADDED OR UPDATED {}/{} -> {}",
(Device) event.subject(), event.subject(),
((DeviceEvent) event).port(), ((DeviceEvent) event).port(),
event.type()); event.type());
/* XXX create method for single port filtering rules /* XXX create method for single port filtering rules

View File

@ -25,7 +25,6 @@ import java.io.InputStream;
import java.net.URL; import java.net.URL;
import java.net.URLConnection; import java.net.URLConnection;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -65,6 +64,7 @@ import com.google.common.collect.Sets;
import com.google.common.io.Files; import com.google.common.io.Files;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
/** /**
* Provider of {@link ClusterMetadata cluster metadata} sourced from a local config file. * Provider of {@link ClusterMetadata cluster metadata} sourced from a local config file.
@ -89,7 +89,7 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr
private static final ProviderId PROVIDER_ID = new ProviderId("file", "none"); private static final ProviderId PROVIDER_ID = new ProviderId("file", "none");
private final AtomicReference<Versioned<ClusterMetadata>> cachedMetadata = new AtomicReference<>(); private final AtomicReference<Versioned<ClusterMetadata>> cachedMetadata = new AtomicReference<>();
private final ScheduledExecutorService configFileChangeDetector = private final ScheduledExecutorService configFileChangeDetector =
Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/cluster/metadata/config-watcher", "")); newSingleThreadScheduledExecutor(groupedThreads("onos/cluster/metadata/config-watcher", "", log));
private String metadataUrl; private String metadataUrl;
private ObjectMapper mapper; private ObjectMapper mapper;

View File

@ -42,12 +42,13 @@ import org.slf4j.LoggerFactory;
import java.util.Collection; import java.util.Collection;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.google.common.collect.ImmutableSet.copyOf; import static com.google.common.collect.ImmutableSet.copyOf;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.device.DeviceEvent.Type.*; import static org.onosproject.net.device.DeviceEvent.Type.*;
import static org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation.*; import static org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation.*;
@ -67,7 +68,8 @@ class FlowRuleDriverProvider extends AbstractProvider implements FlowRuleProvide
private MastershipService mastershipService; private MastershipService mastershipService;
private InternalDeviceListener deviceListener = new InternalDeviceListener(); private InternalDeviceListener deviceListener = new InternalDeviceListener();
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); private ScheduledExecutorService executor
= newSingleThreadScheduledExecutor(groupedThreads("FlowRuleDriverProvider", "%d", log));
private ScheduledFuture<?> poller = null; private ScheduledFuture<?> poller = null;
/** /**

View File

@ -145,7 +145,7 @@ public class FlowObjectiveCompositionManager implements FlowObjectiveService {
@Activate @Activate
protected void activate() { protected void activate() {
executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d")); executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d", log));
flowObjectiveStore.setDelegate(delegate); flowObjectiveStore.setDelegate(delegate);
mastershipService.addListener(mastershipListener); mastershipService.addListener(mastershipListener);
deviceService.addListener(deviceListener); deviceService.addListener(deviceListener);

View File

@ -61,7 +61,6 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -69,6 +68,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap; import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads; import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.isNullOrEmpty; import static org.onlab.util.Tools.isNullOrEmpty;
@ -117,8 +117,8 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
private ExecutorService executorService = private ExecutorService executorService =
newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker", log)); newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker", log));
private ScheduledExecutorService executor = Executors private ScheduledExecutorService executor =
.newScheduledThreadPool(1); newScheduledThreadPool(1, groupedThreads("onos/intent", "scheduledIntentUpdate", log));
private TopologyListener listener = new InternalTopologyListener(); private TopologyListener listener = new InternalTopologyListener();
private ResourceListener resourceListener = new InternalResourceListener(); private ResourceListener resourceListener = new InternalResourceListener();

View File

@ -29,7 +29,6 @@ import org.apache.karaf.features.BundleInfo;
import org.apache.karaf.features.Feature; import org.apache.karaf.features.Feature;
import org.apache.karaf.features.FeaturesService; import org.apache.karaf.features.FeaturesService;
import org.onlab.util.KryoNamespace; import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.app.ApplicationAdminService; import org.onosproject.app.ApplicationAdminService;
import org.onosproject.core.Application; import org.onosproject.core.Application;
import org.onosproject.core.ApplicationId; import org.onosproject.core.ApplicationId;
@ -50,9 +49,10 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.security.store.SecurityModeState.*; import static org.onosproject.security.store.SecurityModeState.*;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
@ -102,7 +102,7 @@ public class DistributedSecurityModeStore
@Activate @Activate
public void activate() { public void activate() {
eventHandler = Executors.newSingleThreadExecutor(Tools.groupedThreads("onos/security/store", "event-handler")); eventHandler = newSingleThreadExecutor(groupedThreads("onos/security/store", "event-handler", log));
states = storageService.<ApplicationId, SecurityInfo>consistentMapBuilder() states = storageService.<ApplicationId, SecurityInfo>consistentMapBuilder()
.withName("smonos-sdata") .withName("smonos-sdata")
.withSerializer(STATE_SERIALIZER) .withSerializer(STATE_SERIALIZER)

View File

@ -36,6 +36,8 @@ import org.onosproject.net.intent.Key;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.onlab.util.Tools.groupedThreads;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -77,7 +79,7 @@ public class IntentPartitionManager implements IntentPartitionService {
private LeadershipEventListener leaderListener = new InternalLeadershipListener(); private LeadershipEventListener leaderListener = new InternalLeadershipListener();
private ScheduledExecutorService executor = Executors private ScheduledExecutorService executor = Executors
.newScheduledThreadPool(1); .newScheduledThreadPool(1, groupedThreads("IntentPartition", "balancer-%d", log));
@Activate @Activate
public void activate() { public void activate() {

View File

@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.onlab.util.Tools.get; import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads; import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
@ -284,7 +285,8 @@ public class DistributedPacketStore
*/ */
private void restartMessageHandlerThreadPool() { private void restartMessageHandlerThreadPool() {
ExecutorService prevExecutor = messageHandlingExecutor; ExecutorService prevExecutor = messageHandlingExecutor;
messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize()); messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
groupedThreads("DistPktStore", "messageHandling-%d", log));
prevExecutor.shutdown(); prevExecutor.shutdown();
} }

View File

@ -59,6 +59,7 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.onlab.util.Tools.get; import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads; import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
@ -338,7 +339,8 @@ public class DistributedFlowStatisticStore implements FlowStatisticStore {
*/ */
private void restartMessageHandlerThreadPool() { private void restartMessageHandlerThreadPool() {
ExecutorService prevExecutor = messageHandlingExecutor; ExecutorService prevExecutor = messageHandlingExecutor;
messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize()); messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
groupedThreads("DistFlowStats", "messageHandling-%d", log));
prevExecutor.shutdown(); prevExecutor.shutdown();
} }

View File

@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.onlab.util.Tools.get; import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads; import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
@ -357,7 +358,8 @@ public class DistributedStatisticStore implements StatisticStore {
*/ */
private void restartMessageHandlerThreadPool() { private void restartMessageHandlerThreadPool() {
ExecutorService prevExecutor = messageHandlingExecutor; ExecutorService prevExecutor = messageHandlingExecutor;
messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize()); messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
groupedThreads("DistStatsStore", "messageHandling-%d", log));
prevExecutor.shutdown(); prevExecutor.shutdown();
} }

View File

@ -15,12 +15,13 @@
*/ */
package org.onosproject.store.primitives.impl; package org.onosproject.store.primitives.impl;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
import java.net.ConnectException; import java.net.ConnectException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -62,7 +63,7 @@ public class OnosCopycatClient extends DelegatingCopycatClient {
super(client); super(client);
this.maxRetries = maxRetries; this.maxRetries = maxRetries;
this.delayBetweenRetriesMillis = delayBetweenRetriesMillis; this.delayBetweenRetriesMillis = delayBetweenRetriesMillis;
this.executor = Executors.newSingleThreadScheduledExecutor(); this.executor = newSingleThreadScheduledExecutor(groupedThreads("OnosCopycat", "client", log));
} }
@Override @Override

View File

@ -15,6 +15,8 @@
*/ */
package org.onosproject.store.primitives.resources.impl; package org.onosproject.store.primitives.resources.impl;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection; import java.util.Collection;
@ -24,7 +26,6 @@ import java.util.Timer;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -58,7 +59,7 @@ public class AtomixWorkQueue extends AbstractResource<AtomixWorkQueue>
private final Logger log = getLogger(getClass()); private final Logger log = getLogger(getClass());
public static final String TASK_AVAILABLE = "task-available"; public static final String TASK_AVAILABLE = "task-available";
private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final ExecutorService executor = newSingleThreadExecutor(groupedThreads("AtomixWorkQueue", "%d", log));
private final AtomicReference<TaskProcessor> taskProcessor = new AtomicReference<>(); private final AtomicReference<TaskProcessor> taskProcessor = new AtomicReference<>();
private final Timer timer = new Timer("atomix-work-queue-completer"); private final Timer timer = new Timer("atomix-work-queue-completer");
private final AtomicBoolean isRegistered = new AtomicBoolean(false); private final AtomicBoolean isRegistered = new AtomicBoolean(false);

View File

@ -111,7 +111,7 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp
private ScheduledExecutorService groupChecker = private ScheduledExecutorService groupChecker =
Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner", Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner",
"ovs-corsa-%d")); "ovs-corsa-%d", log));
protected static final int CONTROLLER_PRIORITY = 255; protected static final int CONTROLLER_PRIORITY = 255;
protected static final int DROP_PRIORITY = 0; protected static final int DROP_PRIORITY = 0;

View File

@ -62,7 +62,7 @@ public class ComponentManager implements ComponentService {
components = Sets.newSetFromMap(new ConcurrentHashMap<>()); components = Sets.newSetFromMap(new ConcurrentHashMap<>());
executor = Executors.newScheduledThreadPool(NUM_THREADS, executor = Executors.newScheduledThreadPool(NUM_THREADS,
groupedThreads("onos/component", "%d")); groupedThreads("onos/component", "%d", log));
executor.scheduleAtFixedRate(() -> components.forEach(this::enableComponent), executor.scheduleAtFixedRate(() -> components.forEach(this::enableComponent),
0, POLLING_PERIOD_MS, TimeUnit.MILLISECONDS); 0, POLLING_PERIOD_MS, TimeUnit.MILLISECONDS);

View File

@ -85,7 +85,7 @@ public class RouteManager implements ListenerService<RouteEvent, RouteListener>,
@Activate @Activate
protected void activate() { protected void activate() {
threadFactory = groupedThreads("onos/route", "listener-%d"); threadFactory = groupedThreads("onos/route", "listener-%d", log);
routeStore.setDelegate(delegate); routeStore.setDelegate(delegate);
hostService.addListener(hostListener); hostService.addListener(hostListener);

View File

@ -120,7 +120,8 @@ public class DistributedLabelResourceStore
messageHandlingExecutor = Executors messageHandlingExecutor = Executors
.newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE, .newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/flow", groupedThreads("onos/store/flow",
"message-handlers")); "message-handlers",
log));
clusterCommunicator clusterCommunicator
.addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED, .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED,
SERIALIZER::<LabelResourcePool>decode, SERIALIZER::<LabelResourcePool>decode,

View File

@ -39,6 +39,8 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import static org.onlab.util.Tools.groupedThreads;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -47,6 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.onlab.packet.IpAddress; import org.onlab.packet.IpAddress;
import org.onlab.packet.TpPort; import org.onlab.packet.TpPort;
import org.onlab.util.Tools;
import org.onosproject.ovsdb.controller.OvsdbConstant; import org.onosproject.ovsdb.controller.OvsdbConstant;
import org.onosproject.ovsdb.controller.OvsdbNodeId; import org.onosproject.ovsdb.controller.OvsdbNodeId;
import org.onosproject.ovsdb.controller.driver.DefaultOvsdbClient; import org.onosproject.ovsdb.controller.driver.DefaultOvsdbClient;
@ -70,7 +73,7 @@ public class Controller {
private Callback monitorCallback; private Callback monitorCallback;
private final ExecutorService executorService = Executors private final ExecutorService executorService = Executors
.newFixedThreadPool(10); .newFixedThreadPool(10, groupedThreads("OVSDB-C", "executor-%d", log));
private EventLoopGroup bossGroup; private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup; private EventLoopGroup workerGroup;
@ -83,8 +86,8 @@ public class Controller {
* Initialization. * Initialization.
*/ */
private void initEventLoopGroup() { private void initEventLoopGroup() {
bossGroup = new NioEventLoopGroup(); bossGroup = new NioEventLoopGroup(0, Tools.groupedThreads("OVSDB-C", "boss-%d", log));
workerGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(0, Tools.groupedThreads("OVSDB-C", "worker-%d", log));
serverChannelClass = NioServerSocketChannel.class; serverChannelClass = NioServerSocketChannel.class;
} }
@ -118,6 +121,7 @@ public class Controller {
*/ */
private class OnosCommunicationChannelInitializer private class OnosCommunicationChannelInitializer
extends ChannelInitializer<SocketChannel> { extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception { protected void initChannel(SocketChannel channel) throws Exception {
log.info("New channel created"); log.info("New channel created");
channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));

View File

@ -71,6 +71,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads; import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY; import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
import static org.slf4j.LoggerFactory.getLogger; import static org.slf4j.LoggerFactory.getLogger;
@ -125,7 +126,10 @@ public class NetconfDeviceProvider extends AbstractProvider
private final ExecutorService executor = private final ExecutorService executor =
Executors.newFixedThreadPool(5, groupedThreads("onos/netconfdeviceprovider", Executors.newFixedThreadPool(5, groupedThreads("onos/netconfdeviceprovider",
"device-installer-%d", log)); "device-installer-%d", log));
protected ScheduledExecutorService connectionExecutor = Executors.newScheduledThreadPool(CORE_POOL_SIZE); protected ScheduledExecutorService connectionExecutor
= newScheduledThreadPool(CORE_POOL_SIZE,
groupedThreads("onos/netconfdeviceprovider",
"connection-executor-%d", log));
private DeviceProviderService providerService; private DeviceProviderService providerService;
private NetconfDeviceListener innerNodeListener = new InnerNetconfDeviceListener(); private NetconfDeviceListener innerNodeListener = new InnerNetconfDeviceListener();

View File

@ -18,13 +18,14 @@ package org.onlab.util;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
/** /**
* Maintains a sliding window of value counts. The sliding window counter is * Maintains a sliding window of value counts. The sliding window counter is
@ -62,7 +63,7 @@ public final class SlidingWindowCounter {
.map(AtomicLong::new) .map(AtomicLong::new)
.collect(Collectors.toCollection(ArrayList::new)); .collect(Collectors.toCollection(ArrayList::new));
background = Executors.newSingleThreadScheduledExecutor(); background = newSingleThreadScheduledExecutor(groupedThreads("SlidingWindowCounter", "bg-%d"));
background.scheduleWithFixedDelay(this::advanceHead, 0, background.scheduleWithFixedDelay(this::advanceHead, 0,
SLIDE_WINDOW_PERIOD_SECONDS, TimeUnit.SECONDS); SLIDE_WINDOW_PERIOD_SECONDS, TimeUnit.SECONDS);
} }

View File

@ -177,7 +177,7 @@ public class TopologyViewMessageHandler extends TopologyViewMessageHandlerBase {
private final Accumulator<Event> eventAccummulator = new InternalEventAccummulator(); private final Accumulator<Event> eventAccummulator = new InternalEventAccummulator();
private final ExecutorService msgSender = private final ExecutorService msgSender =
newSingleThreadExecutor(groupedThreads("onos/gui", "msg-sender")); newSingleThreadExecutor(groupedThreads("onos/gui", "msg-sender", log));
private TopoOverlayCache overlayCache; private TopoOverlayCache overlayCache;
private TrafficMonitor traffic; private TrafficMonitor traffic;

View File

@ -134,7 +134,7 @@ public final class UiSharedTopologyModel
@Activate @Activate
protected void activate() { protected void activate() {
cache = new ModelCache(new DefaultServiceBundle(), eventDispatcher); cache = new ModelCache(new DefaultServiceBundle(), eventDispatcher);
eventHandler = Executors.newSingleThreadExecutor(Tools.groupedThreads("onos/ui/topo", "event-handler")); eventHandler = Executors.newSingleThreadExecutor(Tools.groupedThreads("onos/ui/topo", "event-handler", log));
eventDispatcher.addSink(UiModelEvent.class, listenerRegistry); eventDispatcher.addSink(UiModelEvent.class, listenerRegistry);