diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java index e71ac10fa6..e954d0f60c 100644 --- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java +++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java @@ -49,4 +49,10 @@ public class MessagingException extends IOException { */ public static class RemoteHandlerFailure extends MessagingException { } + + /** + * Exception indicating failure due to invalid message strucuture such as an incorrect preamble. + */ + public static class ProcotolException extends MessagingException { + } } \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java index e02ecc8ab0..0d96c09787 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java +++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java @@ -42,24 +42,31 @@ public final class InternalMessage { /** * Response status signifying an exception handling the message. */ - ERROR_HANDLER_EXCEPTION + ERROR_HANDLER_EXCEPTION, + + /** + * Reponse status signifying invalid message structure. + */ + PROTOCOL_EXCEPTION // NOTE: For backwards compatibility it important that new enum constants // be appended. // FIXME: We should remove this restriction in the future. } + private final int preamble; private final long id; private final Endpoint sender; private final String type; private final byte[] payload; private final Status status; - public InternalMessage(long id, Endpoint sender, String type, byte[] payload) { - this(id, sender, type, payload, Status.OK); + public InternalMessage(int preamble, long id, Endpoint sender, String type, byte[] payload) { + this(preamble, id, sender, type, payload, Status.OK); } - public InternalMessage(long id, Endpoint sender, String type, byte[] payload, Status status) { + public InternalMessage(int preamble, long id, Endpoint sender, String type, byte[] payload, Status status) { + this.preamble = preamble; this.id = id; this.sender = sender; this.type = type; @@ -67,6 +74,10 @@ public final class InternalMessage { this.status = status; } + public int preamble() { + return preamble; + } + public long id() { return id; } diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java index aea3e292b2..4743987e77 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java +++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java @@ -39,7 +39,6 @@ public class MessageDecoder extends ReplayingDecoder { private final Logger log = LoggerFactory.getLogger(getClass()); - private final int correctPreamble; private long messageId; private int preamble; private Version ipVersion; @@ -50,9 +49,8 @@ public class MessageDecoder extends ReplayingDecoder { private Status status; private int contentLength; - public MessageDecoder(int correctPreamble) { + public MessageDecoder() { super(DecoderState.READ_MESSAGE_PREAMBLE); - this.correctPreamble = correctPreamble; } @Override @@ -65,9 +63,6 @@ public class MessageDecoder extends ReplayingDecoder { switch (state()) { case READ_MESSAGE_PREAMBLE: preamble = buffer.readInt(); - if (preamble != correctPreamble) { - throw new IllegalStateException("This message had an incorrect preamble."); - } checkpoint(DecoderState.READ_MESSAGE_ID); case READ_MESSAGE_ID: messageId = buffer.readLong(); @@ -106,7 +101,8 @@ public class MessageDecoder extends ReplayingDecoder { } else { payload = new byte[0]; } - InternalMessage message = new InternalMessage(messageId, + InternalMessage message = new InternalMessage(preamble, + messageId, new Endpoint(senderIp, senderPort), messageType, payload, diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java index 53611f353b..8a32e6d417 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java +++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java @@ -217,7 +217,8 @@ public class NettyMessagingManager implements MessagingService { @Override public CompletableFuture sendAsync(Endpoint ep, String type, byte[] payload) { checkPermission(CLUSTER_WRITE); - InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(), + InternalMessage message = new InternalMessage(preamble, + messageIdGenerator.incrementAndGet(), localEp, type, payload); @@ -263,7 +264,7 @@ public class NettyMessagingManager implements MessagingService { Callback callback = new Callback(response, executor); Long messageId = messageIdGenerator.incrementAndGet(); callbacks.put(messageId, callback); - InternalMessage message = new InternalMessage(messageId, localEp, type, payload); + InternalMessage message = new InternalMessage(preamble, messageId, localEp, type, payload); return sendAsync(ep, message).whenComplete((r, e) -> { if (e != null) { callbacks.invalidate(messageId); @@ -425,7 +426,7 @@ public class NettyMessagingManager implements MessagingService { channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine)) .addLast("encoder", encoder) - .addLast("decoder", new MessageDecoder(preamble)) + .addLast("decoder", new MessageDecoder()) .addLast("handler", dispatcher); } } @@ -459,7 +460,7 @@ public class NettyMessagingManager implements MessagingService { channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine)) .addLast("encoder", encoder) - .addLast("decoder", new MessageDecoder(preamble)) + .addLast("decoder", new MessageDecoder()) .addLast("handler", dispatcher); } } @@ -473,7 +474,7 @@ public class NettyMessagingManager implements MessagingService { protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast("encoder", encoder) - .addLast("decoder", new MessageDecoder(preamble)) + .addLast("decoder", new MessageDecoder()) .addLast("handler", dispatcher); } } @@ -497,6 +498,10 @@ public class NettyMessagingManager implements MessagingService { } } private void dispatchLocally(InternalMessage message) throws IOException { + if (message.preamble() != preamble) { + log.debug("Received {} with invalid preamble from {}", message.type(), message.sender()); + sendReply(message, Status.PROTOCOL_EXCEPTION, Optional.empty()); + } String type = message.type(); if (REPLY_MESSAGE_TYPE.equals(type)) { try { @@ -509,6 +514,8 @@ public class NettyMessagingManager implements MessagingService { callback.completeExceptionally(new MessagingException.NoRemoteHandler()); } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) { callback.completeExceptionally(new MessagingException.RemoteHandlerFailure()); + } else if (message.status() == Status.PROTOCOL_EXCEPTION) { + callback.completeExceptionally(new MessagingException.ProcotolException()); } } else { log.debug("Received a reply for message id:[{}]. " @@ -530,7 +537,8 @@ public class NettyMessagingManager implements MessagingService { } private void sendReply(InternalMessage message, Status status, Optional responsePayload) { - InternalMessage response = new InternalMessage(message.id(), + InternalMessage response = new InternalMessage(preamble, + message.id(), localEp, REPLY_MESSAGE_TYPE, responsePayload.orElse(new byte[0]),