From 24f9efb49ef3bf7e9624d692b1cc5495a1cc7f0b Mon Sep 17 00:00:00 2001 From: Madan Jampani Date: Fri, 24 Oct 2014 18:56:23 -0700 Subject: [PATCH] sendAndReceive now returns a Future instead of Reponse --- .../org/onlab/onos/foo/SimpleNettyClient.java | 12 ++-- .../ClusterCommunicationService.java | 6 +- .../messaging/ClusterMessageResponse.java | 18 ----- .../impl/ClusterCommunicationManager.java | 72 ++----------------- .../flow/impl/DistributedFlowRuleStore.java | 32 +++++---- .../impl/DistributedStatisticStore.java | 12 ++-- .../java/org/onlab/netty/AsyncResponse.java | 66 ----------------- .../org/onlab/netty/MessagingService.java | 4 +- .../onlab/netty/NettyMessagingService.java | 19 ++--- .../main/java/org/onlab/netty/Response.java | 34 --------- .../java/org/onlab/netty/PingPongTest.java | 7 +- 11 files changed, 60 insertions(+), 222 deletions(-) delete mode 100644 core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java delete mode 100644 utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java delete mode 100644 utils/netty/src/main/java/org/onlab/netty/Response.java diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java index 6f77924e07..a14c568395 100644 --- a/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java +++ b/apps/foo/src/main/java/org/onlab/onos/foo/SimpleNettyClient.java @@ -4,6 +4,7 @@ import static java.lang.Thread.sleep; import java.io.IOException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -12,7 +13,6 @@ import org.onlab.metrics.MetricsFeature; import org.onlab.metrics.MetricsManager; import org.onlab.netty.Endpoint; import org.onlab.netty.NettyMessagingService; -import org.onlab.netty.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,10 +74,10 @@ private static Logger log = LoggerFactory.getLogger(SimpleNettyClient.class); for (int i = 0; i < warmup; i++) { messaging.sendAsync(endpoint, "simple", "Hello World".getBytes()); - Response response = messaging + Future responseFuture = messaging .sendAndReceive(endpoint, "echo", "Hello World".getBytes()); - response.get(100000, TimeUnit.MILLISECONDS); + responseFuture.get(100000, TimeUnit.MILLISECONDS); } log.info("measuring round-trip send & receive"); @@ -85,13 +85,13 @@ private static Logger log = LoggerFactory.getLogger(SimpleNettyClient.class); int timeouts = 0; for (int i = 0; i < iterations; i++) { - Response response; + Future responseFuture; Timer.Context context = sendAndReceiveTimer.time(); try { - response = messaging + responseFuture = messaging .sendAndReceive(endpoint, "echo", "Hello World".getBytes()); - response.get(10000, TimeUnit.MILLISECONDS); + responseFuture.get(10000, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { timeouts++; log.info("timeout:" + timeouts + " at iteration:" + i); diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java index 6fc150c2f1..2cff64ac49 100644 --- a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java +++ b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java @@ -5,6 +5,8 @@ import java.util.Set; import org.onlab.onos.cluster.NodeId; +import com.google.common.util.concurrent.ListenableFuture; + // TODO: remove IOExceptions? /** * Service for assisting communications between controller cluster nodes. @@ -40,10 +42,10 @@ public interface ClusterCommunicationService { * Sends a message synchronously. * @param message message to send * @param toNodeId recipient node identifier - * @return ClusterMessageResponse which is reply future. + * @return reply future. * @throws IOException */ - ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException; + ListenableFuture sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException; /** * Adds a new subscriber for the specified message subject. diff --git a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java b/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java deleted file mode 100644 index d2a0039f00..0000000000 --- a/core/api/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageResponse.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.onlab.onos.store.cluster.messaging; - -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.onlab.onos.cluster.NodeId; - -public interface ClusterMessageResponse extends Future { - - public NodeId sender(); - - // TODO InterruptedException, ExecutionException removed from original - // Future declaration. Revisit if we ever need those. - @Override - public byte[] get(long timeout, TimeUnit unit) throws TimeoutException; - -} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java index b2f679ce4c..d8e5fabd7b 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java @@ -4,9 +4,7 @@ import static com.google.common.base.Preconditions.checkArgument; import java.io.IOException; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; + import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -20,7 +18,6 @@ import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent; import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; import org.onlab.onos.store.cluster.messaging.ClusterMessage; import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; -import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse; import org.onlab.onos.store.cluster.messaging.MessageSubject; import org.onlab.onos.store.serializers.ClusterMessageSerializer; import org.onlab.onos.store.serializers.KryoNamespaces; @@ -32,10 +29,11 @@ import org.onlab.netty.Message; import org.onlab.netty.MessageHandler; import org.onlab.netty.MessagingService; import org.onlab.netty.NettyMessagingService; -import org.onlab.netty.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.ListenableFuture; + @Component(immediate = true) @Service public class ClusterCommunicationManager @@ -133,14 +131,12 @@ public class ClusterCommunicationManager } @Override - public ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException { + public ListenableFuture sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException { ControllerNode node = clusterService.getNode(toNodeId); checkArgument(node != null, "Unknown nodeId: %s", toNodeId); Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort()); try { - Response responseFuture = - messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message)); - return new InternalClusterMessageResponse(toNodeId, responseFuture); + return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message)); } catch (IOException e) { log.error("Failed interaction with remote nodeId: " + toNodeId, e); @@ -188,60 +184,4 @@ public class ClusterCommunicationManager rawMessage.respond(response); } } - - private static final class InternalClusterMessageResponse - implements ClusterMessageResponse { - - private final NodeId sender; - private final Response responseFuture; - private volatile boolean isCancelled = false; - private volatile boolean isDone = false; - - public InternalClusterMessageResponse(NodeId sender, Response responseFuture) { - this.sender = sender; - this.responseFuture = responseFuture; - } - @Override - public NodeId sender() { - return sender; - } - - @Override - public byte[] get(long timeout, TimeUnit timeunit) - throws TimeoutException { - final byte[] result = responseFuture.get(timeout, timeunit); - isDone = true; - return result; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (isDone()) { - return false; - } - // doing nothing for now - // when onlab.netty Response support cancel, call them. - isCancelled = true; - return true; - } - - @Override - public boolean isCancelled() { - return isCancelled; - } - - @Override - public boolean isDone() { - return this.isDone || isCancelled(); - } - - @Override - public byte[] get() throws InterruptedException, ExecutionException { - // TODO: consider forbidding this call and force the use of timed get - // to enforce handling of remote peer failure scenario - final byte[] result = responseFuture.get(); - isDone = true; - return result; - } - } -} +} \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java index a737868455..dbd2688ce7 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java @@ -12,6 +12,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -49,7 +50,6 @@ import org.onlab.onos.store.AbstractStore; import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; import org.onlab.onos.store.cluster.messaging.ClusterMessage; import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; -import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse; import org.onlab.onos.store.flow.ReplicaInfo; import org.onlab.onos.store.flow.ReplicaInfoService; import org.onlab.onos.store.serializers.DistributedStoreSerializers; @@ -57,6 +57,7 @@ import org.onlab.onos.store.serializers.KryoSerializer; import org.onlab.util.KryoNamespace; import org.slf4j.Logger; +import com.google.common.base.Function; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -213,9 +214,9 @@ public class DistributedFlowRuleStore SERIALIZER.encode(rule)); try { - ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); - return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); - } catch (IOException | TimeoutException e) { + Future responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); + return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); + } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) { // FIXME: throw a FlowStoreException throw new RuntimeException(e); } @@ -247,9 +248,9 @@ public class DistributedFlowRuleStore SERIALIZER.encode(deviceId)); try { - ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); - return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); - } catch (IOException | TimeoutException e) { + Future responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); + return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); + } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) { // FIXME: throw a FlowStoreException throw new RuntimeException(e); } @@ -291,14 +292,17 @@ public class DistributedFlowRuleStore SERIALIZER.encode(operation)); try { - ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); - response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - } catch (IOException | TimeoutException e) { - // FIXME: throw a FlowStoreException - throw new RuntimeException(e); + ListenableFuture responseFuture = + clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); + return Futures.transform(responseFuture, new Function() { + @Override + public CompletedBatchOperation apply(byte[] input) { + return SERIALIZER.decode(input); + } + }); + } catch (IOException e) { + return Futures.immediateFailedFuture(e); } - - return null; } private ListenableFuture storeBatchInternal(FlowRuleBatchOperation operation) { diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java index 273e3cc720..7106aefe84 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java @@ -4,6 +4,7 @@ import static org.onlab.onos.store.statistic.impl.StatisticStoreMessageSubjects. import static org.slf4j.LoggerFactory.getLogger; import com.google.common.collect.Sets; + import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -21,7 +22,6 @@ import org.onlab.onos.net.statistic.StatisticStore; import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; import org.onlab.onos.store.cluster.messaging.ClusterMessage; import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; -import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse; import org.onlab.onos.store.flow.ReplicaInfo; import org.onlab.onos.store.flow.ReplicaInfoService; import org.onlab.onos.store.serializers.KryoNamespaces; @@ -34,6 +34,8 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -184,11 +186,11 @@ public class DistributedStatisticStore implements StatisticStore { SERIALIZER.encode(connectPoint)); try { - ClusterMessageResponse response = + Future response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); - } catch (IOException | TimeoutException e) { + } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) { // FIXME: throw a StatsStoreException throw new RuntimeException(e); } @@ -212,11 +214,11 @@ public class DistributedStatisticStore implements StatisticStore { SERIALIZER.encode(connectPoint)); try { - ClusterMessageResponse response = + Future response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); - } catch (IOException | TimeoutException e) { + } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) { // FIXME: throw a StatsStoreException throw new RuntimeException(e); } diff --git a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java deleted file mode 100644 index 1772a3c71f..0000000000 --- a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java +++ /dev/null @@ -1,66 +0,0 @@ -package org.onlab.netty; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * An asynchronous response. - * This class provides a base implementation of Response, with methods to retrieve the - * result and query to see if the result is ready. The result can only be retrieved when - * it is ready and the get methods will block if the result is not ready yet. - */ -public class AsyncResponse implements Response { - - private byte[] value; - private boolean done = false; - private final long start = System.nanoTime(); - - @Override - public byte[] get(long timeout, TimeUnit timeUnit) throws TimeoutException { - timeout = timeUnit.toNanos(timeout); - boolean interrupted = false; - try { - synchronized (this) { - while (!done) { - try { - long timeRemaining = timeout - (System.nanoTime() - start); - if (timeRemaining <= 0) { - throw new TimeoutException("Operation timed out."); - } - TimeUnit.NANOSECONDS.timedWait(this, timeRemaining); - } catch (InterruptedException e) { - interrupted = true; - } - } - } - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - return value; - } - - @Override - public byte[] get() throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isReady() { - return done; - } - - /** - * Sets response value and unblocks any thread blocking on the response to become - * available. - * @param data response data. - */ - public synchronized void setResponse(byte[] data) { - if (!done) { - done = true; - value = data; - this.notifyAll(); - } - } -} diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java index 08676ac278..bf9333168e 100644 --- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java +++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java @@ -2,6 +2,8 @@ package org.onlab.netty; import java.io.IOException; +import com.google.common.util.concurrent.ListenableFuture; + /** * Interface for low level messaging primitives. */ @@ -24,7 +26,7 @@ public interface MessagingService { * @return a response future * @throws IOException */ - public Response sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException; + public ListenableFuture sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException; /** * Registers a new message handler for message type. diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java index 26d835d822..6e5aa89252 100644 --- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java +++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java @@ -5,6 +5,7 @@ import java.net.UnknownHostException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; @@ -26,7 +27,6 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import org.apache.commons.lang.math.RandomUtils; import org.apache.commons.pool.KeyedPoolableObjectFactory; import org.apache.commons.pool.impl.GenericKeyedObjectPool; import org.slf4j.Logger; @@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; /** * A Netty based implementation of MessagingService. @@ -44,7 +46,8 @@ public class NettyMessagingService implements MessagingService { private final Endpoint localEp; private final ConcurrentMap handlers = new ConcurrentHashMap<>(); - private final Cache responseFutures = CacheBuilder.newBuilder() + private final AtomicLong messageIdGenerator = new AtomicLong(0); + private final Cache> responseFutures = CacheBuilder.newBuilder() .maximumSize(100000) .weakValues() // TODO: Once the entry expires, notify blocking threads (if any). @@ -119,7 +122,7 @@ public class NettyMessagingService implements MessagingService { @Override public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException { InternalMessage message = new InternalMessage.Builder(this) - .withId(RandomUtils.nextLong()) + .withId(messageIdGenerator.incrementAndGet()) .withSender(localEp) .withType(type) .withPayload(payload) @@ -142,10 +145,10 @@ public class NettyMessagingService implements MessagingService { } @Override - public Response sendAndReceive(Endpoint ep, String type, byte[] payload) + public ListenableFuture sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException { - AsyncResponse futureResponse = new AsyncResponse(); - Long messageId = RandomUtils.nextLong(); + SettableFuture futureResponse = SettableFuture.create(); + Long messageId = messageIdGenerator.incrementAndGet(); responseFutures.put(messageId, futureResponse); InternalMessage message = new InternalMessage.Builder(this) .withId(messageId) @@ -267,10 +270,10 @@ public class NettyMessagingService implements MessagingService { String type = message.type(); if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) { try { - AsyncResponse futureResponse = + SettableFuture futureResponse = NettyMessagingService.this.responseFutures.getIfPresent(message.id()); if (futureResponse != null) { - futureResponse.setResponse(message.payload()); + futureResponse.set(message.payload()); } else { log.warn("Received a reply. But was unable to locate the request handle"); } diff --git a/utils/netty/src/main/java/org/onlab/netty/Response.java b/utils/netty/src/main/java/org/onlab/netty/Response.java deleted file mode 100644 index 150755eb3f..0000000000 --- a/utils/netty/src/main/java/org/onlab/netty/Response.java +++ /dev/null @@ -1,34 +0,0 @@ -package org.onlab.netty; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Response object returned when making synchronous requests. - * Can you used to check is a response is ready and/or wait for a response - * to become available. - */ -public interface Response { - - /** - * Gets the response waiting for a designated timeout period. - * @param timeout timeout period (since request was sent out) - * @param tu unit of time. - * @return response payload - * @throws TimeoutException if the timeout expires before the response arrives. - */ - public byte[] get(long timeout, TimeUnit tu) throws TimeoutException; - - /** - * Gets the response waiting for indefinite timeout period. - * @return response payload - * @throws InterruptedException if the thread is interrupted before the response arrives. - */ - public byte[] get() throws InterruptedException; - - /** - * Checks if the response is ready without blocking. - * @return true if response is ready, false otherwise. - */ - public boolean isReady(); -} diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java index 36d2a1ebaa..ddcdd6f5cf 100644 --- a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java +++ b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java @@ -1,9 +1,12 @@ package org.onlab.netty; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.RandomUtils; + import static org.junit.Assert.*; + import org.junit.Test; /** @@ -20,8 +23,8 @@ public class PingPongTest { ponger.activate(); ponger.registerHandler("echo", new EchoHandler()); byte[] payload = RandomUtils.nextBytes(100); - Response response = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", payload); - assertArrayEquals(payload, response.get(10000, TimeUnit.MILLISECONDS)); + Future responseFuture = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", payload); + assertArrayEquals(payload, responseFuture.get(10000, TimeUnit.MILLISECONDS)); } finally { pinger.deactivate(); ponger.deactivate();