Improve Executors related logging behavior

- Specify Logger for the Executor
- Use Executor#execute instead of ExecutorService#submit for
  fire and forget type of usage.
   Note: submit() will swallow thrown Exception

Change-Id: I507b841dc3feedf4ad20a746c304518d68fb846a
This commit is contained in:
HIGUCHI Yuta 2016-03-11 19:16:35 -08:00 committed by Gerrit Code Review
parent 4a24a3e06f
commit 060da9a13a
12 changed files with 26 additions and 22 deletions

View File

@ -115,7 +115,8 @@ public class EventHistoryManager
appId = coreService.registerApplication("org.onosproject.events"); appId = coreService.registerApplication("org.onosproject.events");
log.debug("Registered as {}", appId); log.debug("Registered as {}", appId);
pruner = newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/events", "history-pruner"))); pruner = newSingleThreadScheduledExecutor(
minPriority(groupedThreads("onos/events", "history-pruner", log)));
pruner.scheduleWithFixedDelay(this::pruneEventHistoryTask, pruner.scheduleWithFixedDelay(this::pruneEventHistoryTask,
pruneInterval, pruneInterval, TimeUnit.SECONDS); pruneInterval, pruneInterval, TimeUnit.SECONDS);

View File

@ -125,7 +125,8 @@ public class DeviceManager
@Activate @Activate
public void activate() { public void activate() {
backgroundService = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "manager-background")); backgroundService = newSingleThreadScheduledExecutor(
groupedThreads("onos/device", "manager-background", log));
localNodeId = clusterService.getLocalNode().id(); localNodeId = clusterService.getLocalNode().id();
store.setDelegate(delegate); store.setDelegate(delegate);
@ -499,7 +500,7 @@ public class DeviceManager
deviceId, response, mastershipService.getLocalRole(deviceId)); deviceId, response, mastershipService.getLocalRole(deviceId));
// roleManager got the device to comply, but doesn't agree with // roleManager got the device to comply, but doesn't agree with
// the store; use the store's view, then try to reassert. // the store; use the store's view, then try to reassert.
backgroundService.submit(() -> reassertRole(deviceId, mastershipService.getLocalRole(deviceId))); backgroundService.execute(() -> reassertRole(deviceId, mastershipService.getLocalRole(deviceId)));
return; return;
} }
} else { } else {
@ -684,7 +685,7 @@ public class DeviceManager
@Override @Override
public void event(MastershipEvent event) { public void event(MastershipEvent event) {
backgroundService.submit(() -> { backgroundService.execute(() -> {
try { try {
handleMastershipEvent(event); handleMastershipEvent(event);
} catch (Exception e) { } catch (Exception e) {

View File

@ -122,10 +122,10 @@ public class FlowRuleManager
private final FlowRuleDriverProvider defaultProvider = new FlowRuleDriverProvider(); private final FlowRuleDriverProvider defaultProvider = new FlowRuleDriverProvider();
protected ExecutorService deviceInstallers = protected ExecutorService deviceInstallers =
Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "device-installer-%d")); Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "device-installer-%d", log));
protected ExecutorService operationsService = protected ExecutorService operationsService =
Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "operations-%d")); Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "operations-%d, log"));
private IdGenerator idGenerator; private IdGenerator idGenerator;

View File

@ -200,7 +200,7 @@ public class IntentManager
if (newNumThreads != numThreads) { if (newNumThreads != numThreads) {
numThreads = newNumThreads; numThreads = newNumThreads;
ExecutorService oldWorkerExecutor = workerExecutor; ExecutorService oldWorkerExecutor = workerExecutor;
workerExecutor = newFixedThreadPool(numThreads, groupedThreads("onos/intent", "worker-%d")); workerExecutor = newFixedThreadPool(numThreads, groupedThreads("onos/intent", "worker-%d", log));
if (oldWorkerExecutor != null) { if (oldWorkerExecutor != null) {
oldWorkerExecutor.shutdown(); oldWorkerExecutor.shutdown();
} }

View File

@ -138,11 +138,11 @@ final class ResourceDeviceListener implements DeviceListener {
} }
private void registerDeviceResource(Device device) { private void registerDeviceResource(Device device) {
executor.submit(() -> adminService.register(Resources.discrete(device.id()).resource())); executor.execute(() -> adminService.register(Resources.discrete(device.id()).resource()));
} }
private void unregisterDeviceResource(Device device) { private void unregisterDeviceResource(Device device) {
executor.submit(() -> { executor.execute(() -> {
DiscreteResource devResource = Resources.discrete(device.id()).resource(); DiscreteResource devResource = Resources.discrete(device.id()).resource();
List<Resource> allResources = getDescendantResources(devResource); List<Resource> allResources = getDescendantResources(devResource);
adminService.unregister(Lists.transform(allResources, Resource::id)); adminService.unregister(Lists.transform(allResources, Resource::id));
@ -151,7 +151,7 @@ final class ResourceDeviceListener implements DeviceListener {
private void registerPortResource(Device device, Port port) { private void registerPortResource(Device device, Port port) {
Resource portPath = Resources.discrete(device.id(), port.number()).resource(); Resource portPath = Resources.discrete(device.id(), port.number()).resource();
executor.submit(() -> { executor.execute(() -> {
adminService.register(portPath); adminService.register(portPath);
queryBandwidth(device.id(), port.number()) queryBandwidth(device.id(), port.number())
@ -198,7 +198,7 @@ final class ResourceDeviceListener implements DeviceListener {
} }
private void unregisterPortResource(Device device, Port port) { private void unregisterPortResource(Device device, Port port) {
executor.submit(() -> { executor.execute(() -> {
DiscreteResource portResource = Resources.discrete(device.id(), port.number()).resource(); DiscreteResource portResource = Resources.discrete(device.id(), port.number()).resource();
List<Resource> allResources = getDescendantResources(portResource); List<Resource> allResources = getDescendantResources(portResource);
adminService.unregister(Lists.transform(allResources, Resource::id)); adminService.unregister(Lists.transform(allResources, Resource::id));

View File

@ -89,7 +89,7 @@ final class ResourceNetworkConfigListener implements NetworkConfigListener {
@Override @Override
public void event(NetworkConfigEvent event) { public void event(NetworkConfigEvent event) {
if (event.configClass() == BandwidthCapacity.class) { if (event.configClass() == BandwidthCapacity.class) {
executor.submit(() -> { executor.execute(() -> {
try { try {
handleBandwidthCapacity(event); handleBandwidthCapacity(event);
} catch (Exception e) { } catch (Exception e) {

View File

@ -84,7 +84,7 @@ public final class ResourceRegistrar {
private DeviceListener deviceListener; private DeviceListener deviceListener;
private final ExecutorService executor = private final ExecutorService executor =
Executors.newSingleThreadExecutor(groupedThreads("onos/resource", "registrar")); Executors.newSingleThreadExecutor(groupedThreads("onos/resource", "registrar", log));
private NetworkConfigListener cfgListener; private NetworkConfigListener cfgListener;

View File

@ -68,6 +68,7 @@ import java.util.function.Function;
import static com.google.common.collect.Multimaps.newSetMultimap; import static com.google.common.collect.Multimaps.newSetMultimap;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap; import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static com.google.common.io.ByteStreams.toByteArray; import static com.google.common.io.ByteStreams.toByteArray;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.groupedThreads; import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.randomDelay; import static org.onlab.util.Tools.randomDelay;
@ -138,10 +139,10 @@ public class GossipApplicationStore extends ApplicationArchive
.register(MultiValuedTimestamp.class) .register(MultiValuedTimestamp.class)
.register(InternalState.class); .register(InternalState.class);
executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store")); executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
messageHandlingExecutor = Executors.newSingleThreadExecutor( messageHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/store/app", "message-handler")); groupedThreads("onos/store/app", "message-handler", log));
clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST, clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
bytes -> new String(bytes, Charsets.UTF_8), bytes -> new String(bytes, Charsets.UTF_8),

View File

@ -96,13 +96,13 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Predicates.notNull; import static com.google.common.base.Predicates.notNull;
import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verify;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.util.Tools.groupedThreads; import static org.onlab.util.Tools.groupedThreads;
@ -200,10 +200,10 @@ public class GossipDeviceStore
@Activate @Activate
public void activate() { public void activate() {
executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d")); executor = newCachedThreadPool(groupedThreads("onos/device", "fg-%d", log));
backgroundExecutor = backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d"))); newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d", log)));
clusterCommunicator.addSubscriber( clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor); GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);

View File

@ -305,7 +305,7 @@ public class NewDistributedFlowRuleStore
msgHandlerPoolSize = newPoolSize; msgHandlerPoolSize = newPoolSize;
ExecutorService oldMsgHandler = messageHandlingExecutor; ExecutorService oldMsgHandler = messageHandlingExecutor;
messageHandlingExecutor = Executors.newFixedThreadPool( messageHandlingExecutor = Executors.newFixedThreadPool(
msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers")); msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
// replace previously registered handlers. // replace previously registered handlers.
registerMessageHandlers(messageHandlingExecutor); registerMessageHandlers(messageHandlingExecutor);

View File

@ -123,10 +123,10 @@ public class ConsistentDeviceMastershipStore
public void activate() { public void activate() {
messageHandlingExecutor = messageHandlingExecutor =
Executors.newSingleThreadExecutor( Executors.newSingleThreadExecutor(
groupedThreads("onos/store/device/mastership", "message-handler")); groupedThreads("onos/store/device/mastership", "message-handler", log));
transferExecutor = transferExecutor =
Executors.newSingleThreadScheduledExecutor( Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/store/device/mastership", "mastership-transfer-executor")); groupedThreads("onos/store/device/mastership", "mastership-transfer-executor", log));
clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT, clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
SERIALIZER::decode, SERIALIZER::decode,
this::relinquishLocalRole, this::relinquishLocalRole,

View File

@ -118,7 +118,7 @@ public class DistributedFlowStatisticStore implements FlowStatisticStore {
messageHandlingExecutor = Executors.newFixedThreadPool( messageHandlingExecutor = Executors.newFixedThreadPool(
messageHandlerThreadPoolSize, messageHandlerThreadPoolSize,
groupedThreads("onos/store/statistic", "message-handlers")); groupedThreads("onos/store/statistic", "message-handlers", log));
clusterCommunicator.addSubscriber( clusterCommunicator.addSubscriber(
GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode, GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
@ -200,6 +200,7 @@ public class DistributedFlowStatisticStore implements FlowStatisticStore {
previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; }); previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
} }
@Override
public synchronized void updateFlowStatistic(FlowEntry rule) { public synchronized void updateFlowStatistic(FlowEntry rule) {
ConnectPoint cp = buildConnectPoint(rule); ConnectPoint cp = buildConnectPoint(rule);
if (cp == null) { if (cp == null) {