From b6ee9e966f85d6ee32eff6bff77bb61d7c6e40d4 Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Wed, 21 Jun 2017 15:26:28 -0700 Subject: [PATCH] Prevent serialization errors from causing recursion in the Copycat transport Change-Id: I0a1b0737d6cda3d7ab63bb26a7547d2f9124a434 --- .../messaging/impl/NettyMessagingManager.java | 9 +++++- .../impl/CopycatTransportConnection.java | 31 ++++++++++++++----- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java index fae957a6dd..f3b028c5d4 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java +++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java @@ -303,6 +303,7 @@ public class NettyMessagingManager implements MessagingService { try { responsePayload = handler.apply(message.sender(), message.payload()); } catch (Exception e) { + log.debug("An error occurred in a message handler: {}", e); status = Status.ERROR_HANDLER_EXCEPTION; } sendReply(message, status, Optional.ofNullable(responsePayload)); @@ -314,7 +315,13 @@ public class NettyMessagingManager implements MessagingService { checkPermission(CLUSTER_WRITE); handlers.put(type, message -> { handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> { - Status status = error == null ? Status.OK : Status.ERROR_HANDLER_EXCEPTION; + Status status; + if (error == null) { + status = Status.OK; + } else { + log.debug("An error occurred in a message handler: {}", error); + status = Status.ERROR_HANDLER_EXCEPTION; + } sendReply(message, status, Optional.ofNullable(result)); }); }); diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java index b8596ae7f3..a3a8539f00 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java @@ -56,6 +56,8 @@ import static org.onosproject.store.primitives.impl.CopycatTransport.SUCCESS; * Base Copycat Transport connection. */ public class CopycatTransportConnection implements Connection { + private static final int MAX_MESSAGE_SIZE = 1024 * 1024; + private final Logger log = LoggerFactory.getLogger(getClass()); private final long connectionId; private final String localSubject; @@ -97,7 +99,11 @@ public class CopycatTransportConnection implements Connection { ((ReferenceCounted) message).release(); } - messagingService.sendAsync(endpoint, remoteSubject, baos.toByteArray()) + byte[] bytes = baos.toByteArray(); + if (bytes.length > MAX_MESSAGE_SIZE) { + throw new IllegalArgumentException(message + " exceeds maximum message size " + MAX_MESSAGE_SIZE); + } + messagingService.sendAsync(endpoint, remoteSubject, bytes) .whenComplete((r, e) -> { if (e != null) { context.executor().execute(() -> future.completeExceptionally(e)); @@ -122,9 +128,14 @@ public class CopycatTransportConnection implements Connection { if (message instanceof ReferenceCounted) { ((ReferenceCounted) message).release(); } + + byte[] bytes = baos.toByteArray(); + if (bytes.length > MAX_MESSAGE_SIZE) { + throw new IllegalArgumentException(message + " exceeds maximum message size " + MAX_MESSAGE_SIZE); + } messagingService.sendAndReceive(endpoint, remoteSubject, - baos.toByteArray(), + bytes, context.executor()) .whenComplete((response, error) -> handleResponse(response, error, future)); } catch (SerializationException | IOException e) { @@ -142,11 +153,11 @@ public class CopycatTransportConnection implements Connection { CompletableFuture future) { if (error != null) { Throwable rootCause = Throwables.getRootCause(error); - if (rootCause instanceof MessagingException || rootCause instanceof SocketException) { + if (rootCause instanceof MessagingException.NoRemoteHandler) { + future.completeExceptionally(new TransportException(error)); + close(rootCause); + } else if (rootCause instanceof SocketException) { future.completeExceptionally(new TransportException(error)); - if (rootCause instanceof MessagingException.NoRemoteHandler) { - close(rootCause); - } } else { future.completeExceptionally(error); } @@ -211,7 +222,11 @@ public class CopycatTransportConnection implements Connection { try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { baos.write(error != null ? FAILURE : SUCCESS); context.serializer().writeObject(error != null ? error : result, baos); - return baos.toByteArray(); + byte[] bytes = baos.toByteArray(); + if (bytes.length > MAX_MESSAGE_SIZE) { + throw new IllegalArgumentException("response exceeds maximum message size " + MAX_MESSAGE_SIZE); + } + return bytes; } catch (IOException e) { Throwables.propagate(e); return null; @@ -278,7 +293,7 @@ public class CopycatTransportConnection implements Connection { Throwable wrappedError = error; if (error != null) { Throwable rootCause = Throwables.getRootCause(error); - if (MessagingException.class.isAssignableFrom(rootCause.getClass())) { + if (rootCause instanceof MessagingException.NoRemoteHandler) { wrappedError = new TransportException(error); } future.completeExceptionally(wrappedError);