Use failure detection for Netty messages to improve ability to adapt to changing network conditions

Change-Id: I2928072ffd9c662e1817d67453f19626681c0aff
This commit is contained in:
Jordan Halterman 2017-12-21 11:59:38 -08:00 committed by Thomas Vachuska
parent 914b0b1b2e
commit ef92f19c6c
2 changed files with 70 additions and 31 deletions

View File

@ -103,13 +103,13 @@ import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
@Component(immediate = true) @Component(immediate = true)
@Service @Service
public class NettyMessagingManager implements MessagingService { public class NettyMessagingManager implements MessagingService {
private static final long DEFAULT_TIMEOUT_MILLIS = 500;
private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis(); private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis();
private static final long MIN_TIMEOUT_MILLIS = 250;
private static final long MAX_TIMEOUT_MILLIS = 5000;
private static final long TIMEOUT_INTERVAL = 50; private static final long TIMEOUT_INTERVAL = 50;
private static final int WINDOW_SIZE = 100; private static final int WINDOW_SIZE = 100;
private static final double TIMEOUT_MULTIPLIER = 2.5; private static final int MIN_SAMPLES = 25;
private static final double PHI_FACTOR = 1.0 / Math.log(10.0);
private static final int PHI_FAILURE_THRESHOLD = 5;
private static final long MAX_TIMEOUT_MILLIS = 15000;
private static final int CHANNEL_POOL_SIZE = 8; private static final int CHANNEL_POOL_SIZE = 8;
private static final byte[] EMPTY_PAYLOAD = new byte[0]; private static final byte[] EMPTY_PAYLOAD = new byte[0];
@ -741,14 +741,14 @@ public class NettyMessagingManager implements MessagingService {
private abstract class AbstractClientConnection implements ClientConnection { private abstract class AbstractClientConnection implements ClientConnection {
private final Map<Long, Callback> futures = Maps.newConcurrentMap(); private final Map<Long, Callback> futures = Maps.newConcurrentMap();
private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false);
private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder() private final Cache<String, RequestMonitor> requestMonitors = CacheBuilder.newBuilder()
.expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS) .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
.build(); .build();
/** /**
* Times out callbacks for this connection. * Times out callbacks for this connection.
*/ */
protected void timeoutCallbacks() { void timeoutCallbacks() {
// Store the current time. // Store the current time.
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
@ -758,12 +758,11 @@ public class NettyMessagingManager implements MessagingService {
while (iterator.hasNext()) { while (iterator.hasNext()) {
Callback callback = iterator.next().getValue(); Callback callback = iterator.next().getValue();
try { try {
TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new); RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
long currentTimeout = timeoutHistory.currentTimeout;
if (currentTime - callback.time > currentTimeout) {
iterator.remove();
long elapsedTime = currentTime - callback.time; long elapsedTime = currentTime - callback.time;
timeoutHistory.addReplyTime(elapsedTime); if (elapsedTime > MAX_TIMEOUT_MILLIS || requestMonitor.isTimedOut(elapsedTime)) {
iterator.remove();
requestMonitor.addReplyTime(elapsedTime);
callback.completeExceptionally( callback.completeExceptionally(
new TimeoutException("Request timed out in " + elapsedTime + " milliseconds")); new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
} }
@ -771,11 +770,6 @@ public class NettyMessagingManager implements MessagingService {
throw new AssertionError(); throw new AssertionError();
} }
} }
// Iterate through all timeout histories and recompute the timeout.
for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
timeoutHistory.recomputeTimeoutMillis();
}
} }
protected void registerCallback(long id, String subject, CompletableFuture<byte[]> future) { protected void registerCallback(long id, String subject, CompletableFuture<byte[]> future) {
@ -786,8 +780,8 @@ public class NettyMessagingManager implements MessagingService {
Callback callback = futures.remove(id); Callback callback = futures.remove(id);
if (callback != null) { if (callback != null) {
try { try {
TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new); RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time); requestMonitor.addReplyTime(System.currentTimeMillis() - callback.time);
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw new AssertionError(); throw new AssertionError();
} }
@ -985,10 +979,8 @@ public class NettyMessagingManager implements MessagingService {
/** /**
* Request-reply timeout history tracker. * Request-reply timeout history tracker.
*/ */
private static final class TimeoutHistory { private static final class RequestMonitor {
private final DescriptiveStatistics timeoutHistory = new SynchronizedDescriptiveStatistics(WINDOW_SIZE); private final DescriptiveStatistics samples = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
private final AtomicLong maxReplyTime = new AtomicLong();
private volatile long currentTimeout = DEFAULT_TIMEOUT_MILLIS;
/** /**
* Adds a reply time to the history. * Adds a reply time to the history.
@ -996,19 +988,41 @@ public class NettyMessagingManager implements MessagingService {
* @param replyTime the reply time to add to the history * @param replyTime the reply time to add to the history
*/ */
void addReplyTime(long replyTime) { void addReplyTime(long replyTime) {
maxReplyTime.getAndAccumulate(replyTime, Math::max); samples.addValue(replyTime);
} }
/** /**
* Computes the current timeout. * Returns a boolean indicating whether the given request should be timed out according to the elapsed time.
*
* @param elapsedTime the elapsed request time
* @return indicates whether the request should be timed out
*/ */
private void recomputeTimeoutMillis() { boolean isTimedOut(long elapsedTime) {
double nextTimeout = maxReplyTime.getAndSet(0) * TIMEOUT_MULTIPLIER; return phi(elapsedTime) >= PHI_FAILURE_THRESHOLD;
timeoutHistory.addValue( }
Math.min(Math.max(nextTimeout, MIN_TIMEOUT_MILLIS), MAX_TIMEOUT_MILLIS));
if (timeoutHistory.getN() == WINDOW_SIZE) { /**
this.currentTimeout = (long) timeoutHistory.getMax(); * Compute phi for the specified node id.
} *
* @param elapsedTime the duration since the request was sent
* @return phi value
*/
private double phi(long elapsedTime) {
if (samples.getN() < MIN_SAMPLES) {
return 0.0;
}
return computePhi(samples, elapsedTime);
}
/**
* Computes the phi value from the given samples.
*
* @param samples the samples from which to compute phi
* @param elapsedTime the duration since the request was sent
* @return phi
*/
private double computePhi(DescriptiveStatistics samples, long elapsedTime) {
return samples.getN() > 0 ? PHI_FACTOR * elapsedTime / samples.getMean() : 100;
} }
} }
} }

View File

@ -43,6 +43,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@ -160,7 +161,7 @@ public class NettyMessagingManagerTest {
} }
@Test @Test
public void testSendTimeout() { public void testDefaultTimeout() {
String subject = nextSubject(); String subject = nextSubject();
BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = (ep, payload) -> new CompletableFuture<>(); BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = (ep, payload) -> new CompletableFuture<>();
netty2.registerHandler(subject, handler); netty2.registerHandler(subject, handler);
@ -173,6 +174,30 @@ public class NettyMessagingManagerTest {
} }
} }
@Test
public void testDynamicTimeout() {
String subject = nextSubject();
AtomicInteger counter = new AtomicInteger();
BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = (ep, payload) -> {
if (counter.incrementAndGet() <= 50) {
return CompletableFuture.completedFuture(new byte[0]);
} else {
return new CompletableFuture<>();
}
};
netty2.registerHandler(subject, handler);
for (int i = 0; i < 50; i++) {
netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
}
try {
netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
fail();
} catch (CompletionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
}
}
/* /*
* Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
* and response completion occurs on the expected thread. * and response completion occurs on the expected thread.