Ensure client events are handled in the correct thread in consistent primitive tests.

Change-Id: Ic6db44ec8fc393d6194eeba8b3f84c28245a254a
This commit is contained in:
Jordan Halterman 2018-02-27 10:48:50 -08:00 committed by Ray Milkey
parent f3f050ab74
commit 931d3e7a4a

View File

@ -127,8 +127,8 @@ public class TestClusterCommunicationService implements ClusterCommunicationServ
public <M, R> void addSubscriber(
MessageSubject subject,
Function<byte[], M> decoder,
Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
Function<M, CompletableFuture<R>> handler,
Function<R, byte[]> encoder) {
subscribers.put(subject, message -> {
CompletableFuture<byte[]> future = new CompletableFuture<>();
try {
@ -153,12 +153,16 @@ public class TestClusterCommunicationService implements ClusterCommunicationServ
Consumer<M> handler,
Executor executor) {
subscribers.put(subject, message -> {
try {
handler.accept(decoder.apply(message));
} catch (Exception e) {
return Futures.exceptionalFuture(new MessagingException.RemoteHandlerFailure());
}
return Futures.completedFuture(null);
CompletableFuture<byte[]> future = new CompletableFuture<>();
executor.execute(() -> {
try {
handler.accept(decoder.apply(message));
future.complete(null);
} catch (Exception e) {
future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
}
});
return future;
});
}