From ef92f19c6cdda44dd1fd012893b1907c83879c78 Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Thu, 21 Dec 2017 11:59:38 -0800 Subject: [PATCH] Use failure detection for Netty messages to improve ability to adapt to changing network conditions Change-Id: I2928072ffd9c662e1817d67453f19626681c0aff --- .../messaging/impl/NettyMessagingManager.java | 74 +++++++++++-------- .../impl/NettyMessagingManagerTest.java | 27 ++++++- 2 files changed, 70 insertions(+), 31 deletions(-) diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java index 82ba95f862..284251bd64 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java +++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java @@ -103,13 +103,13 @@ import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE; @Component(immediate = true) @Service public class NettyMessagingManager implements MessagingService { - private static final long DEFAULT_TIMEOUT_MILLIS = 500; private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis(); - private static final long MIN_TIMEOUT_MILLIS = 250; - private static final long MAX_TIMEOUT_MILLIS = 5000; private static final long TIMEOUT_INTERVAL = 50; private static final int WINDOW_SIZE = 100; - private static final double TIMEOUT_MULTIPLIER = 2.5; + private static final int MIN_SAMPLES = 25; + private static final double PHI_FACTOR = 1.0 / Math.log(10.0); + private static final int PHI_FAILURE_THRESHOLD = 5; + private static final long MAX_TIMEOUT_MILLIS = 15000; private static final int CHANNEL_POOL_SIZE = 8; private static final byte[] EMPTY_PAYLOAD = new byte[0]; @@ -741,14 +741,14 @@ public class NettyMessagingManager implements MessagingService { private abstract class AbstractClientConnection implements ClientConnection { private final Map futures = Maps.newConcurrentMap(); private final AtomicBoolean closed = new AtomicBoolean(false); - private final Cache timeoutHistories = CacheBuilder.newBuilder() + private final Cache requestMonitors = CacheBuilder.newBuilder() .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS) .build(); /** * Times out callbacks for this connection. */ - protected void timeoutCallbacks() { + void timeoutCallbacks() { // Store the current time. long currentTime = System.currentTimeMillis(); @@ -758,12 +758,11 @@ public class NettyMessagingManager implements MessagingService { while (iterator.hasNext()) { Callback callback = iterator.next().getValue(); try { - TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new); - long currentTimeout = timeoutHistory.currentTimeout; - if (currentTime - callback.time > currentTimeout) { + RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new); + long elapsedTime = currentTime - callback.time; + if (elapsedTime > MAX_TIMEOUT_MILLIS || requestMonitor.isTimedOut(elapsedTime)) { iterator.remove(); - long elapsedTime = currentTime - callback.time; - timeoutHistory.addReplyTime(elapsedTime); + requestMonitor.addReplyTime(elapsedTime); callback.completeExceptionally( new TimeoutException("Request timed out in " + elapsedTime + " milliseconds")); } @@ -771,11 +770,6 @@ public class NettyMessagingManager implements MessagingService { throw new AssertionError(); } } - - // Iterate through all timeout histories and recompute the timeout. - for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) { - timeoutHistory.recomputeTimeoutMillis(); - } } protected void registerCallback(long id, String subject, CompletableFuture future) { @@ -786,8 +780,8 @@ public class NettyMessagingManager implements MessagingService { Callback callback = futures.remove(id); if (callback != null) { try { - TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new); - timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time); + RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new); + requestMonitor.addReplyTime(System.currentTimeMillis() - callback.time); } catch (ExecutionException e) { throw new AssertionError(); } @@ -985,10 +979,8 @@ public class NettyMessagingManager implements MessagingService { /** * Request-reply timeout history tracker. */ - private static final class TimeoutHistory { - private final DescriptiveStatistics timeoutHistory = new SynchronizedDescriptiveStatistics(WINDOW_SIZE); - private final AtomicLong maxReplyTime = new AtomicLong(); - private volatile long currentTimeout = DEFAULT_TIMEOUT_MILLIS; + private static final class RequestMonitor { + private final DescriptiveStatistics samples = new SynchronizedDescriptiveStatistics(WINDOW_SIZE); /** * Adds a reply time to the history. @@ -996,19 +988,41 @@ public class NettyMessagingManager implements MessagingService { * @param replyTime the reply time to add to the history */ void addReplyTime(long replyTime) { - maxReplyTime.getAndAccumulate(replyTime, Math::max); + samples.addValue(replyTime); } /** - * Computes the current timeout. + * Returns a boolean indicating whether the given request should be timed out according to the elapsed time. + * + * @param elapsedTime the elapsed request time + * @return indicates whether the request should be timed out */ - private void recomputeTimeoutMillis() { - double nextTimeout = maxReplyTime.getAndSet(0) * TIMEOUT_MULTIPLIER; - timeoutHistory.addValue( - Math.min(Math.max(nextTimeout, MIN_TIMEOUT_MILLIS), MAX_TIMEOUT_MILLIS)); - if (timeoutHistory.getN() == WINDOW_SIZE) { - this.currentTimeout = (long) timeoutHistory.getMax(); + boolean isTimedOut(long elapsedTime) { + return phi(elapsedTime) >= PHI_FAILURE_THRESHOLD; + } + + /** + * Compute phi for the specified node id. + * + * @param elapsedTime the duration since the request was sent + * @return phi value + */ + private double phi(long elapsedTime) { + if (samples.getN() < MIN_SAMPLES) { + return 0.0; } + return computePhi(samples, elapsedTime); + } + + /** + * Computes the phi value from the given samples. + * + * @param samples the samples from which to compute phi + * @param elapsedTime the duration since the request was sent + * @return phi + */ + private double computePhi(DescriptiveStatistics samples, long elapsedTime) { + return samples.getN() > 0 ? PHI_FACTOR * elapsedTime / samples.getMean() : 100; } } } diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java index 435f6bd5dc..e48b225d9f 100644 --- a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java +++ b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java @@ -43,6 +43,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -160,7 +161,7 @@ public class NettyMessagingManagerTest { } @Test - public void testSendTimeout() { + public void testDefaultTimeout() { String subject = nextSubject(); BiFunction> handler = (ep, payload) -> new CompletableFuture<>(); netty2.registerHandler(subject, handler); @@ -173,6 +174,30 @@ public class NettyMessagingManagerTest { } } + @Test + public void testDynamicTimeout() { + String subject = nextSubject(); + AtomicInteger counter = new AtomicInteger(); + BiFunction> handler = (ep, payload) -> { + if (counter.incrementAndGet() <= 50) { + return CompletableFuture.completedFuture(new byte[0]); + } else { + return new CompletableFuture<>(); + } + }; + netty2.registerHandler(subject, handler); + + for (int i = 0; i < 50; i++) { + netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join(); + } + try { + netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join(); + fail(); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof TimeoutException); + } + } + /* * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling * and response completion occurs on the expected thread.