diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java index f8c1be87bc..070ae8c649 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java @@ -3,13 +3,13 @@ package org.onlab.onos.store.service.impl; import static com.google.common.base.Verify.verifyNotNull; import static org.slf4j.LoggerFactory.getLogger; import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.SERIALIZER; +import static org.onlab.util.Tools.namedThreads; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -33,8 +33,6 @@ import org.onlab.onos.store.cluster.messaging.ClusterMessage; import org.onlab.onos.store.cluster.messaging.MessageSubject; import org.slf4j.Logger; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - /** * ONOS Cluster messaging based Copycat protocol client. */ @@ -42,9 +40,6 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { private final Logger log = getLogger(getClass()); - private static final ThreadFactory THREAD_FACTORY = - new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build(); - public static final long RETRY_INTERVAL_MILLIS = 2000; private final ClusterService clusterService; @@ -53,9 +48,9 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { private final TcpMember remoteMember; private ControllerNode remoteNode; - // FIXME: Thread pool sizing. - private static final ScheduledExecutorService THREAD_POOL = - new ScheduledThreadPoolExecutor(10, THREAD_FACTORY); + // TODO: make this non-static and stop on close + private static final ExecutorService THREAD_POOL + = Executors.newCachedThreadPool(namedThreads("copycat-netty-messaging-%d")); private volatile CompletableFuture appeared; @@ -173,7 +168,7 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { private CompletableFuture requestReply(I request) { CompletableFuture future = new CompletableFuture<>(); - THREAD_POOL.schedule(new RPCTask(request, future), 0, TimeUnit.MILLISECONDS); + THREAD_POOL.submit(new RPCTask(request, future)); return future; } @@ -198,7 +193,6 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { public void event(ClusterEvent event) { checkIfMemberAppeared(); } - } private class RPCTask implements Runnable { @@ -225,9 +219,13 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); future.complete(verifyNotNull(SERIALIZER.decode(response))); - } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) { + } catch (IOException | ExecutionException | TimeoutException e) { log.warn("RPCTask for {} failed.", request, e); future.completeExceptionally(e); + } catch (InterruptedException e) { + log.warn("RPCTask for {} was interrupted.", request, e); + future.completeExceptionally(e); + Thread.currentThread().interrupt(); } catch (Exception e) { log.warn("RPCTask for {} terribly failed.", request, e); future.completeExceptionally(e);