From a9e70a632e43b08aba153bd1b9505a39d0d28548 Mon Sep 17 00:00:00 2001 From: Madan Jampani Date: Wed, 2 Mar 2016 16:28:18 -0800 Subject: [PATCH] [Falcon] Adds a status field to InternalMessage and support for replying with appropriate status when handler errors occur Change-Id: I995bdd6c67b88b6d7729887d32083315213fb79f --- .../cluster/messaging/MessagingException.java | 52 +++++++++++++++ .../cluster/messaging/impl/DecoderState.java | 1 + .../messaging/impl/InternalMessage.java | 35 ++++++++++ .../messaging/impl/MessageDecoder.java | 21 ++++-- .../messaging/impl/MessageEncoder.java | 3 + .../messaging/impl/NettyMessagingManager.java | 64 +++++++++++-------- 6 files changed, 146 insertions(+), 30 deletions(-) create mode 100644 core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java new file mode 100644 index 0000000000..e71ac10fa6 --- /dev/null +++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java @@ -0,0 +1,52 @@ +/* + * Copyright 2016 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.cluster.messaging; + +import java.io.IOException; + +/** + * Top level exception for MessagingService failures. + */ +@SuppressWarnings("serial") +public class MessagingException extends IOException { + + public MessagingException() { + } + + public MessagingException(String message) { + super(message); + } + + public MessagingException(String message, Throwable t) { + super(message, t); + } + + public MessagingException(Throwable t) { + super(t); + } + + /** + * Exception indicating no remote registered remote handler. + */ + public static class NoRemoteHandler extends MessagingException { + } + + /** + * Exception indicating handler failure. + */ + public static class RemoteHandlerFailure extends MessagingException { + } +} \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java index e113a3f707..608b4e07e8 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java +++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java @@ -26,6 +26,7 @@ public enum DecoderState { READ_SENDER_PORT, READ_MESSAGE_TYPE_LENGTH, READ_MESSAGE_TYPE, + READ_MESSAGE_STATUS, READ_CONTENT_LENGTH, READ_CONTENT } diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java index 9deec669bd..e02ecc8ab0 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java +++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java @@ -25,16 +25,46 @@ import org.onosproject.store.cluster.messaging.Endpoint; */ public final class InternalMessage { + /** + * Message status. + */ + public enum Status { + /** + * All ok. + */ + OK, + + /** + * Response status signifying no registered handler. + */ + ERROR_NO_HANDLER, + + /** + * Response status signifying an exception handling the message. + */ + ERROR_HANDLER_EXCEPTION + + // NOTE: For backwards compatibility it important that new enum constants + // be appended. + // FIXME: We should remove this restriction in the future. + } + private final long id; private final Endpoint sender; private final String type; private final byte[] payload; + private final Status status; public InternalMessage(long id, Endpoint sender, String type, byte[] payload) { + this(id, sender, type, payload, Status.OK); + } + + public InternalMessage(long id, Endpoint sender, String type, byte[] payload, Status status) { this.id = id; this.sender = sender; this.type = type; this.payload = payload; + this.status = status; } public long id() { @@ -53,12 +83,17 @@ public final class InternalMessage { return payload; } + public Status status() { + return status; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) .add("id", id) .add("type", type) .add("sender", sender) + .add("status", status) .add("payload", ByteArraySizeHashPrinter.of(payload)) .toString(); } diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java index 149b7063a1..aea3e292b2 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java +++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java @@ -16,12 +16,15 @@ package org.onosproject.store.cluster.messaging.impl; import com.google.common.base.Charsets; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; + import org.onlab.packet.IpAddress; import org.onlab.packet.IpAddress.Version; import org.onosproject.store.cluster.messaging.Endpoint; +import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +47,7 @@ public class MessageDecoder extends ReplayingDecoder { private int senderPort; private int messageTypeLength; private String messageType; + private Status status; private int contentLength; public MessageDecoder(int correctPreamble) { @@ -86,18 +90,27 @@ public class MessageDecoder extends ReplayingDecoder { byte[] messageTypeBytes = new byte[messageTypeLength]; buffer.readBytes(messageTypeBytes); messageType = new String(messageTypeBytes, Charsets.UTF_8); + checkpoint(DecoderState.READ_MESSAGE_STATUS); + case READ_MESSAGE_STATUS: + status = Status.values()[buffer.readInt()]; checkpoint(DecoderState.READ_CONTENT_LENGTH); case READ_CONTENT_LENGTH: contentLength = buffer.readInt(); checkpoint(DecoderState.READ_CONTENT); case READ_CONTENT: - //TODO Perform a sanity check on the size before allocating - byte[] payload = new byte[contentLength]; - buffer.readBytes(payload); + byte[] payload; + if (contentLength > 0) { + //TODO Perform a sanity check on the size before allocating + payload = new byte[contentLength]; + buffer.readBytes(payload); + } else { + payload = new byte[0]; + } InternalMessage message = new InternalMessage(messageId, new Endpoint(senderIp, senderPort), messageType, - payload); + payload, + status); out.add(message); checkpoint(DecoderState.READ_MESSAGE_PREAMBLE); break; diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java index 48c75ddc11..461155451b 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java +++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java @@ -75,6 +75,9 @@ public class MessageEncoder extends MessageToByteEncoder { // write message type bytes out.writeBytes(messageTypeBytes); + // write message status value + out.writeInt(message.status().ordinal()); + byte[] payload = message.payload(); // write payload length diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java index 72ba2eab7a..2f883e1e51 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java +++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java @@ -16,12 +16,12 @@ package org.onosproject.store.cluster.messaging.impl; import com.google.common.base.Strings; - import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import com.google.common.util.concurrent.MoreExecutors; + import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; @@ -41,6 +41,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; + import org.apache.commons.pool.KeyedPoolableObjectFactory; import org.apache.commons.pool.impl.GenericKeyedObjectPool; import org.apache.felix.scr.annotations.Activate; @@ -53,7 +54,9 @@ import org.onlab.util.Tools; import org.onosproject.cluster.ClusterMetadataService; import org.onosproject.cluster.ControllerNode; import org.onosproject.store.cluster.messaging.Endpoint; +import org.onosproject.store.cluster.messaging.MessagingException; import org.onosproject.store.cluster.messaging.MessagingService; +import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,10 +64,12 @@ import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import javax.net.ssl.TrustManagerFactory; + import java.io.FileInputStream; import java.io.IOException; import java.security.KeyStore; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -267,18 +272,14 @@ public class NettyMessagingManager implements MessagingService { @Override public void registerHandler(String type, BiFunction handler, Executor executor) { handlers.put(type, message -> executor.execute(() -> { - byte[] responsePayload = handler.apply(message.sender(), message.payload()); - if (responsePayload != null) { - InternalMessage response = new InternalMessage(message.id(), - localEp, - REPLY_MESSAGE_TYPE, - responsePayload); - sendAsync(message.sender(), response).whenComplete((result, error) -> { - if (error != null) { - log.debug("Failed to respond", error); - } - }); + byte[] responsePayload = null; + Status status = Status.OK; + try { + responsePayload = handler.apply(message.sender(), message.payload()); + } catch (Exception e) { + status = Status.ERROR_HANDLER_EXCEPTION; } + sendReply(message, status, Optional.ofNullable(responsePayload)); })); } @@ -286,17 +287,8 @@ public class NettyMessagingManager implements MessagingService { public void registerHandler(String type, BiFunction> handler) { handlers.put(type, message -> { handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> { - if (error == null) { - InternalMessage response = new InternalMessage(message.id(), - localEp, - REPLY_MESSAGE_TYPE, - result); - sendAsync(message.sender(), response).whenComplete((r, e) -> { - if (e != null) { - log.debug("Failed to respond", e); - } - }); - } + Status status = error == null ? Status.OK : Status.ERROR_HANDLER_EXCEPTION; + sendReply(message, status, Optional.ofNullable(result)); }); }); } @@ -500,9 +492,15 @@ public class NettyMessagingManager implements MessagingService { Callback callback = callbacks.getIfPresent(message.id()); if (callback != null) { - callback.complete(message.payload()); + if (message.status() == Status.OK) { + callback.complete(message.payload()); + } else if (message.status() == Status.ERROR_NO_HANDLER) { + callback.completeExceptionally(new MessagingException.NoRemoteHandler()); + } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) { + callback.completeExceptionally(new MessagingException.RemoteHandlerFailure()); + } } else { - log.warn("Received a reply for message id:[{}]. " + log.debug("Received a reply for message id:[{}]. " + " from {}. But was unable to locate the" + " request handle", message.id(), message.sender()); } @@ -515,10 +513,24 @@ public class NettyMessagingManager implements MessagingService { if (handler != null) { handler.accept(message); } else { - log.debug("No handler registered for {}", type); + log.debug("No handler for message type {}", message.type(), message.sender()); + sendReply(message, Status.ERROR_NO_HANDLER, Optional.empty()); } } + private void sendReply(InternalMessage message, Status status, Optional responsePayload) { + InternalMessage response = new InternalMessage(message.id(), + localEp, + REPLY_MESSAGE_TYPE, + responsePayload.orElse(new byte[0]), + status); + sendAsync(message.sender(), response).whenComplete((result, error) -> { + if (error != null) { + log.debug("Failed to respond", error); + } + }); + } + private final class Callback { private final CompletableFuture future; private final Executor executor;