diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java index dceb7c69d2..2bdf5a067d 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java @@ -11,7 +11,7 @@ public class ClusterMessage { private final NodeId sender; private final MessageSubject subject; - private final Object payload; + private final byte[] payload; // TODO: add field specifying Serializer for payload /** @@ -19,7 +19,7 @@ public class ClusterMessage { * * @param subject message subject */ - public ClusterMessage(NodeId sender, MessageSubject subject, Object payload) { + public ClusterMessage(NodeId sender, MessageSubject subject, byte[] payload) { this.sender = sender; this.subject = subject; this.payload = payload; @@ -48,7 +48,7 @@ public class ClusterMessage { * * @return message payload. */ - public Object payload() { + public byte[] payload() { return payload; } } diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java index 98be0b1020..babe4d31f6 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java @@ -23,6 +23,9 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; import org.onlab.onos.store.cluster.messaging.ClusterMessage; import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; import org.onlab.onos.store.cluster.messaging.MessageSubject; +import org.onlab.onos.store.serializers.KryoPoolUtil; +import org.onlab.onos.store.serializers.KryoSerializer; +import org.onlab.util.KryoPool; import org.onlab.netty.Endpoint; import org.onlab.netty.Message; import org.onlab.netty.MessageHandler; @@ -48,6 +51,18 @@ public class ClusterCommunicationManager //@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) private MessagingService messagingService; + private static final KryoSerializer SERIALIZER = new KryoSerializer() { + protected void setupKryoPool() { + serializerPool = KryoPool.newBuilder() + .register(KryoPoolUtil.API) + .register(ClusterMessage.class) + .register(ClusterMembershipEvent.class) + .build() + .populate(1); + } + + }; + @Activate public void activate() { // TODO: initialize messagingService @@ -92,7 +107,7 @@ public class ClusterCommunicationManager checkArgument(node != null, "Unknown nodeId: %s", toNodeId); Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort()); try { - messagingService.sendAsync(nodeEp, message.subject().value(), message); + messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message)); return true; } catch (IOException e) { log.error("Failed to send cluster message to nodeId: " + toNodeId, e); @@ -126,7 +141,7 @@ public class ClusterCommunicationManager broadcast(new ClusterMessage( localNode.id(), new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), - new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))); + SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)))); members.remove(node.id()); } @@ -138,7 +153,7 @@ public class ClusterCommunicationManager broadcast(new ClusterMessage( localNode.id(), new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), - new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))); + SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)))); } } @@ -147,7 +162,7 @@ public class ClusterCommunicationManager @Override public void handle(ClusterMessage message) { - ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload(); + ClusterMembershipEvent event = SERIALIZER.decode(message.payload()); ControllerNode node = event.node(); if (event.type() == ClusterMembershipEventType.HEART_BEAT) { log.info("Node {} sent a hearbeat", node.id()); @@ -172,7 +187,8 @@ public class ClusterCommunicationManager @Override public void handle(Message message) { - handler.handle((ClusterMessage) message.payload()); + ClusterMessage clusterMessage = SERIALIZER.decode(message.payload()); + handler.handle(clusterMessage); } } } diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java index ab9ae3c6e9..2f1e504242 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java @@ -13,6 +13,7 @@ import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; +import org.onlab.onos.cluster.ClusterService; import org.onlab.onos.net.AnnotationsUtil; import org.onlab.onos.net.DefaultAnnotations; import org.onlab.onos.net.DefaultDevice; @@ -37,7 +38,11 @@ import org.onlab.onos.store.Timestamp; import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; import org.onlab.onos.store.cluster.messaging.ClusterMessage; import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; +import org.onlab.onos.store.common.impl.MastershipBasedTimestamp; import org.onlab.onos.store.common.impl.Timestamped; +import org.onlab.onos.store.serializers.KryoPoolUtil; +import org.onlab.onos.store.serializers.KryoSerializer; +import org.onlab.util.KryoPool; import org.onlab.util.NewConcurrentHashMap; import org.slf4j.Logger; @@ -104,6 +109,24 @@ public class GossipDeviceStore @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ClusterCommunicationService clusterCommunicator; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + private static final KryoSerializer SERIALIZER = new KryoSerializer() { + protected void setupKryoPool() { + serializerPool = KryoPool.newBuilder() + .register(KryoPoolUtil.API) + .register(InternalDeviceEvent.class) + .register(InternalPortEvent.class) + .register(InternalPortStatusEvent.class) + .register(Timestamped.class) + .register(MastershipBasedTimestamp.class) + .build() + .populate(1); + } + + }; + @Activate public void activate() { clusterCommunicator.addSubscriber( @@ -779,17 +802,26 @@ public class GossipDeviceStore } private void notifyPeers(InternalDeviceEvent event) throws IOException { - ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event); + ClusterMessage message = new ClusterMessage( + clusterService.getLocalNode().id(), + GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, + SERIALIZER.encode(event)); clusterCommunicator.broadcast(message); } private void notifyPeers(InternalPortEvent event) throws IOException { - ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event); + ClusterMessage message = new ClusterMessage( + clusterService.getLocalNode().id(), + GossipDeviceStoreMessageSubjects.PORT_UPDATE, + SERIALIZER.encode(event)); clusterCommunicator.broadcast(message); } private void notifyPeers(InternalPortStatusEvent event) throws IOException { - ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event); + ClusterMessage message = new ClusterMessage( + clusterService.getLocalNode().id(), + GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, + SERIALIZER.encode(event)); clusterCommunicator.broadcast(message); } @@ -797,7 +829,7 @@ public class GossipDeviceStore @Override public void handle(ClusterMessage message) { log.info("Received device update event from peer: {}", message.sender()); - InternalDeviceEvent event = (InternalDeviceEvent) message.payload(); + InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload()); ProviderId providerId = event.providerId(); DeviceId deviceId = event.deviceId(); Timestamped deviceDescription = event.deviceDescription(); @@ -810,7 +842,7 @@ public class GossipDeviceStore public void handle(ClusterMessage message) { log.info("Received port update event from peer: {}", message.sender()); - InternalPortEvent event = (InternalPortEvent) message.payload(); + InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload()); ProviderId providerId = event.providerId(); DeviceId deviceId = event.deviceId(); @@ -825,7 +857,7 @@ public class GossipDeviceStore public void handle(ClusterMessage message) { log.info("Received port status update event from peer: {}", message.sender()); - InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload(); + InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload()); ProviderId providerId = event.providerId(); DeviceId deviceId = event.deviceId(); diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java index dbd88c38cd..f4dadad681 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java @@ -17,27 +17,20 @@ public final class ClusterMessageSerializer extends Serializer { } @Override - public void write(Kryo kryo, Output output, ClusterMessage object) { - kryo.writeClassAndObject(output, object.sender()); - kryo.writeClassAndObject(output, object.subject()); - // TODO: write bytes serialized using ClusterMessage specified serializer - // write serialized payload size - //output.writeInt(...); - // write serialized payload - //output.writeBytes(...); + public void write(Kryo kryo, Output output, ClusterMessage message) { + kryo.writeClassAndObject(output, message.sender()); + kryo.writeClassAndObject(output, message.subject()); + output.writeInt(message.payload().length); + output.writeBytes(message.payload()); } @Override public ClusterMessage read(Kryo kryo, Input input, Class type) { - // TODO Auto-generated method stub NodeId sender = (NodeId) kryo.readClassAndObject(input); MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input); - int size = input.readInt(); - byte[] payloadBytes = input.readBytes(size); - // TODO: deserialize payload using ClusterMessage specified serializer - Object payload = null; + int payloadSize = input.readInt(); + byte[] payload = input.readBytes(payloadSize); return new ClusterMessage(sender, subject, payload); } - -} +} \ No newline at end of file diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java index 361b071f0b..fa42a6b9d7 100644 --- a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java +++ b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java @@ -20,6 +20,11 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import org.onlab.onos.cluster.ClusterEventListener; +import org.onlab.onos.cluster.ClusterService; +import org.onlab.onos.cluster.ControllerNode; +import org.onlab.onos.cluster.ControllerNode.State; +import org.onlab.onos.cluster.DefaultControllerNode; import org.onlab.onos.cluster.MastershipTerm; import org.onlab.onos.cluster.NodeId; import org.onlab.onos.net.Annotations; @@ -42,6 +47,7 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; import org.onlab.onos.store.cluster.messaging.ClusterMessage; import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; import org.onlab.onos.store.cluster.messaging.MessageSubject; +import org.onlab.packet.IpPrefix; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; @@ -111,8 +117,9 @@ public class GossipDeviceStoreTest { deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2)); ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService(); + ClusterService clusterService = new TestClusterService(); - gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterCommunicator); + gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterService, clusterCommunicator); gossipDeviceStore.activate(); deviceStore = gossipDeviceStore; } @@ -548,8 +555,12 @@ public class GossipDeviceStoreTest { private static final class TestGossipDeviceStore extends GossipDeviceStore { - public TestGossipDeviceStore(ClockService clockService, ClusterCommunicationService clusterCommunicator) { + public TestGossipDeviceStore( + ClockService clockService, + ClusterService clusterService, + ClusterCommunicationService clusterCommunicator) { this.clockService = clockService; + this.clusterService = clusterService; this.clusterCommunicator = clusterCommunicator; } } @@ -564,4 +575,45 @@ public class GossipDeviceStoreTest { @Override public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {} } + + private static final class TestClusterService implements ClusterService { + + private static final ControllerNode ONOS1 = + new DefaultControllerNode(new NodeId("N1"), IpPrefix.valueOf("127.0.0.1")); + private final Map nodes = new HashMap<>(); + private final Map nodeStates = new HashMap<>(); + + public TestClusterService() { + nodes.put(new NodeId("N1"), ONOS1); + nodeStates.put(new NodeId("N1"), ControllerNode.State.ACTIVE); + } + + @Override + public ControllerNode getLocalNode() { + return ONOS1; + } + + @Override + public Set getNodes() { + return Sets.newHashSet(nodes.values()); + } + + @Override + public ControllerNode getNode(NodeId nodeId) { + return nodes.get(nodeId); + } + + @Override + public State getState(NodeId nodeId) { + return nodeStates.get(nodeId); + } + + @Override + public void addListener(ClusterEventListener listener) { + } + + @Override + public void removeListener(ClusterEventListener listener) { + } + } } diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java index f1a12fe857..0c33cfe50a 100644 --- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java +++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java @@ -2,6 +2,7 @@ package org.onlab.onos.store.serializers; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import org.onlab.onos.cluster.ControllerNode; @@ -21,6 +22,8 @@ import org.onlab.onos.net.LinkKey; import org.onlab.onos.net.MastershipRole; import org.onlab.onos.net.Port; import org.onlab.onos.net.PortNumber; +import org.onlab.onos.net.device.DefaultDeviceDescription; +import org.onlab.onos.net.device.DefaultPortDescription; import org.onlab.onos.net.provider.ProviderId; import org.onlab.packet.IpAddress; import org.onlab.packet.IpPrefix; @@ -47,6 +50,7 @@ public final class KryoPoolUtil { .register( // ArrayList.class, + Arrays.asList().getClass(), HashMap.class, // ControllerNode.State.class, @@ -54,8 +58,10 @@ public final class KryoPoolUtil { DefaultAnnotations.class, DefaultControllerNode.class, DefaultDevice.class, + DefaultDeviceDescription.class, MastershipRole.class, Port.class, + DefaultPortDescription.class, Element.class, Link.Type.class ) diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java index 19517f3c4a..93ee854cc8 100644 --- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java +++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java @@ -12,7 +12,7 @@ import java.nio.ByteBuffer; public class KryoSerializer implements Serializer { private final Logger log = LoggerFactory.getLogger(getClass()); - private KryoPool serializerPool; + protected KryoPool serializerPool; public KryoSerializer() { diff --git a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java index f4024a4e9a..1772a3c71f 100644 --- a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java +++ b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java @@ -8,16 +8,15 @@ import java.util.concurrent.TimeoutException; * This class provides a base implementation of Response, with methods to retrieve the * result and query to see if the result is ready. The result can only be retrieved when * it is ready and the get methods will block if the result is not ready yet. - * @param type of response. */ -public class AsyncResponse implements Response { +public class AsyncResponse implements Response { - private T value; + private byte[] value; private boolean done = false; private final long start = System.nanoTime(); @Override - public T get(long timeout, TimeUnit timeUnit) throws TimeoutException { + public byte[] get(long timeout, TimeUnit timeUnit) throws TimeoutException { timeout = timeUnit.toNanos(timeout); boolean interrupted = false; try { @@ -43,7 +42,7 @@ public class AsyncResponse implements Response { } @Override - public T get() throws InterruptedException { + public byte[] get() throws InterruptedException { throw new UnsupportedOperationException(); } @@ -57,11 +56,10 @@ public class AsyncResponse implements Response { * available. * @param data response data. */ - @SuppressWarnings("unchecked") - public synchronized void setResponse(Object data) { + public synchronized void setResponse(byte[] data) { if (!done) { done = true; - value = (T) data; + value = data; this.notifyAll(); } } diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java index 367ca91a82..938ec7b6c9 100644 --- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java +++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java @@ -13,11 +13,8 @@ public final class InternalMessage implements Message { private long id; private Endpoint sender; private String type; - private Object payload; - + private byte[] payload; private transient NettyMessagingService messagingService; - // TODO: add transient payload serializer or change payload type to - // byte[], ByteBuffer, etc. // Must be created using the Builder. private InternalMessage() {} @@ -35,7 +32,7 @@ public final class InternalMessage implements Message { } @Override - public Object payload() { + public byte[] payload() { return payload; } @@ -44,7 +41,7 @@ public final class InternalMessage implements Message { } @Override - public void respond(Object data) throws IOException { + public void respond(byte[] data) throws IOException { Builder builder = new Builder(messagingService); InternalMessage message = builder.withId(this.id) // FIXME: Sender should be messagingService.localEp. @@ -81,7 +78,7 @@ public final class InternalMessage implements Message { message.sender = sender; return this; } - public Builder withPayload(Object payload) { + public Builder withPayload(byte[] payload) { message.payload = payload; return this; } diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java index 4414d05f62..b8efb51271 100644 --- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java +++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java @@ -10,7 +10,7 @@ import java.util.HashMap; /** * Kryo Serializer. */ -public class KryoSerializer implements PayloadSerializer { +public class KryoSerializer { private KryoPool serializerPool; @@ -28,29 +28,26 @@ public class KryoSerializer implements PayloadSerializer { HashMap.class, ArrayList.class, InternalMessage.class, - Endpoint.class + Endpoint.class, + byte[].class ) .build() .populate(1); } - @Override public T decode(byte[] data) { return serializerPool.deserialize(data); } - @Override public byte[] encode(Object payload) { return serializerPool.serialize(payload); } - @Override public T decode(ByteBuffer buffer) { return serializerPool.deserialize(buffer); } - @Override public void encode(Object obj, ByteBuffer buffer) { serializerPool.serialize(obj, buffer); } diff --git a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java index 23c4073801..366898b9c2 100644 --- a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java +++ b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java @@ -12,6 +12,6 @@ public class LoggingHandler implements MessageHandler { @Override public void handle(Message message) { - log.info("Received message. Payload: " + message.payload()); + log.info("Received message. Payload has {} bytes", message.payload().length); } } diff --git a/utils/netty/src/main/java/org/onlab/netty/Message.java b/utils/netty/src/main/java/org/onlab/netty/Message.java index 54b95260d3..87a8bb6614 100644 --- a/utils/netty/src/main/java/org/onlab/netty/Message.java +++ b/utils/netty/src/main/java/org/onlab/netty/Message.java @@ -12,12 +12,12 @@ public interface Message { * Returns the payload of this message. * @return message payload. */ - public Object payload(); + public byte[] payload(); /** - * Sends a reply back to the sender of this messge. + * Sends a reply back to the sender of this message. * @param data payload of the response. * @throws IOException if there is a communication error. */ - public void respond(Object data) throws IOException; + public void respond(byte[] data) throws IOException; } diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java index f199019028..d4832e5b37 100644 --- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java +++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java @@ -14,14 +14,14 @@ import java.util.List; public class MessageDecoder extends ReplayingDecoder { private final NettyMessagingService messagingService; - private final PayloadSerializer payloadSerializer; + + private static final KryoSerializer SERIALIZER = new KryoSerializer(); private int contentLength; - public MessageDecoder(NettyMessagingService messagingService, PayloadSerializer payloadSerializer) { + public MessageDecoder(NettyMessagingService messagingService) { super(DecoderState.READ_HEADER_VERSION); this.messagingService = messagingService; - this.payloadSerializer = payloadSerializer; } @Override @@ -48,7 +48,7 @@ public class MessageDecoder extends ReplayingDecoder { checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version"); checkpoint(DecoderState.READ_CONTENT); case READ_CONTENT: - InternalMessage message = payloadSerializer.decode(buffer.readBytes(contentLength).nioBuffer()); + InternalMessage message = SERIALIZER.decode(buffer.readBytes(contentLength).nioBuffer()); message.setMessagingService(messagingService); out.add(message); checkpoint(DecoderState.READ_HEADER_VERSION); diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java index 0ee29cbb20..716efb92c0 100644 --- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java +++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java @@ -17,11 +17,7 @@ public class MessageEncoder extends MessageToByteEncoder { public static final int SERIALIZER_VERSION = 1; - private final PayloadSerializer payloadSerializer; - - public MessageEncoder(PayloadSerializer payloadSerializer) { - this.payloadSerializer = payloadSerializer; - } + private static final KryoSerializer SERIALIZER = new KryoSerializer(); @Override protected void encode( @@ -35,7 +31,12 @@ public class MessageEncoder extends MessageToByteEncoder { // write preamble out.writeBytes(PREAMBLE); - byte[] payload = payloadSerializer.encode(message); + try { + SERIALIZER.encode(message); + } catch (Exception e) { + e.printStackTrace(); + } + byte[] payload = SERIALIZER.encode(message); // write payload length out.writeInt(payload.length); diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java index fece74279d..08676ac278 100644 --- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java +++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java @@ -11,10 +11,10 @@ public interface MessagingService { * The message is specified using the type and payload. * @param ep end point to send the message to. * @param type type of message. - * @param payload message payload. + * @param payload message payload bytes. * @throws IOException */ - public void sendAsync(Endpoint ep, String type, Object payload) throws IOException; + public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException; /** * Sends a message synchronously and waits for a response. @@ -24,7 +24,7 @@ public interface MessagingService { * @return a response future * @throws IOException */ - public Response sendAndReceive(Endpoint ep, String type, Object payload) throws IOException; + public Response sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException; /** * Registers a new message handler for message type. @@ -38,12 +38,4 @@ public interface MessagingService { * @param type message type */ public void unregisterHandler(String type); - - // FIXME: remove me and add PayloadSerializer to all other methods - /** - * Specify the serializer to use for encoding/decoding payload. - * - * @param payloadSerializer payloadSerializer to use - */ - public void setPayloadSerializer(PayloadSerializer payloadSerializer); -} +} \ No newline at end of file diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java index 051482e586..48aeb305d7 100644 --- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java +++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java @@ -43,7 +43,7 @@ public class NettyMessagingService implements MessagingService { private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workerGroup = new NioEventLoopGroup(); private final ConcurrentMap handlers = new ConcurrentHashMap<>(); - private final Cache> responseFutures = CacheBuilder.newBuilder() + private final Cache responseFutures = CacheBuilder.newBuilder() .maximumSize(100000) .weakValues() // TODO: Once the entry expires, notify blocking threads (if any). @@ -52,8 +52,6 @@ public class NettyMessagingService implements MessagingService { private final GenericKeyedObjectPool channels = new GenericKeyedObjectPool(new OnosCommunicationChannelFactory()); - protected PayloadSerializer payloadSerializer; - public NettyMessagingService() { // TODO: Default port should be configurable. this(8080); @@ -83,7 +81,7 @@ public class NettyMessagingService implements MessagingService { } @Override - public void sendAsync(Endpoint ep, String type, Object payload) throws IOException { + public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException { InternalMessage message = new InternalMessage.Builder(this) .withId(RandomUtils.nextLong()) .withSender(localEp) @@ -108,9 +106,9 @@ public class NettyMessagingService implements MessagingService { } @Override - public Response sendAndReceive(Endpoint ep, String type, Object payload) + public Response sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException { - AsyncResponse futureResponse = new AsyncResponse(); + AsyncResponse futureResponse = new AsyncResponse(); Long messageId = RandomUtils.nextLong(); responseFutures.put(messageId, futureResponse); InternalMessage message = new InternalMessage.Builder(this) @@ -133,11 +131,6 @@ public class NettyMessagingService implements MessagingService { handlers.remove(type); } - @Override - public void setPayloadSerializer(PayloadSerializer payloadSerializer) { - this.payloadSerializer = payloadSerializer; - } - private MessageHandler getMessageHandler(String type) { return handlers.get(type); } @@ -202,13 +195,13 @@ public class NettyMessagingService implements MessagingService { private class OnosCommunicationChannelInitializer extends ChannelInitializer { private final ChannelHandler dispatcher = new InboundMessageDispatcher(); - private final ChannelHandler encoder = new MessageEncoder(payloadSerializer); + private final ChannelHandler encoder = new MessageEncoder(); @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast("encoder", encoder) - .addLast("decoder", new MessageDecoder(NettyMessagingService.this, payloadSerializer)) + .addLast("decoder", new MessageDecoder(NettyMessagingService.this)) .addLast("handler", dispatcher); } } @@ -237,7 +230,7 @@ public class NettyMessagingService implements MessagingService { String type = message.type(); if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) { try { - AsyncResponse futureResponse = + AsyncResponse futureResponse = NettyMessagingService.this.responseFutures.getIfPresent(message.id()); if (futureResponse != null) { futureResponse.setResponse(message.payload()); diff --git a/utils/netty/src/main/java/org/onlab/netty/PayloadSerializer.java b/utils/netty/src/main/java/org/onlab/netty/PayloadSerializer.java deleted file mode 100644 index 9874543bed..0000000000 --- a/utils/netty/src/main/java/org/onlab/netty/PayloadSerializer.java +++ /dev/null @@ -1,41 +0,0 @@ -package org.onlab.netty; - -import java.nio.ByteBuffer; - -/** - * Interface for encoding/decoding message payloads. - */ -public interface PayloadSerializer { - - /** - * Decodes the specified byte array to a POJO. - * - * @param data byte array. - * @return POJO - */ - public T decode(byte[] data); - - /** - * Encodes the specified POJO into a byte array. - * - * @param data POJO to be encoded - * @return byte array. - */ - public byte[] encode(Object data); - - /** - * Encodes the specified POJO into a byte buffer. - * - * @param data POJO to be encoded - * @param buffer to write serialized bytes - */ - public void encode(final Object data, ByteBuffer buffer); - - /** - * Decodes the specified byte buffer to a POJO. - * - * @param buffer bytes to be decoded - * @return POJO - */ - public T decode(final ByteBuffer buffer); -} diff --git a/utils/netty/src/main/java/org/onlab/netty/Response.java b/utils/netty/src/main/java/org/onlab/netty/Response.java index 04675ce28d..150755eb3f 100644 --- a/utils/netty/src/main/java/org/onlab/netty/Response.java +++ b/utils/netty/src/main/java/org/onlab/netty/Response.java @@ -7,26 +7,24 @@ import java.util.concurrent.TimeoutException; * Response object returned when making synchronous requests. * Can you used to check is a response is ready and/or wait for a response * to become available. - * - * @param type of response. */ -public interface Response { +public interface Response { /** * Gets the response waiting for a designated timeout period. * @param timeout timeout period (since request was sent out) * @param tu unit of time. - * @return response + * @return response payload * @throws TimeoutException if the timeout expires before the response arrives. */ - public T get(long timeout, TimeUnit tu) throws TimeoutException; + public byte[] get(long timeout, TimeUnit tu) throws TimeoutException; /** * Gets the response waiting for indefinite timeout period. - * @return response + * @return response payload * @throws InterruptedException if the thread is interrupted before the response arrives. */ - public T get() throws InterruptedException; + public byte[] get() throws InterruptedException; /** * Checks if the response is ready without blocking. diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java index 494d410fb1..3869948ded 100644 --- a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java +++ b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java @@ -24,7 +24,7 @@ public final class SimpleClient { final int warmup = 100; for (int i = 0; i < warmup; i++) { Timer.Context context = sendAsyncTimer.time(); - messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World"); + messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World".getBytes()); context.stop(); } metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer); @@ -33,10 +33,10 @@ public final class SimpleClient { final int iterations = 1000000; for (int i = 0; i < iterations; i++) { Timer.Context context = sendAndReceiveTimer.time(); - Response response = messaging + Response response = messaging .sendAndReceive(new Endpoint("localhost", 8080), "echo", - "Hello World"); - System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS)); + "Hello World".getBytes()); + System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS))); context.stop(); } metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer); @@ -45,8 +45,6 @@ public final class SimpleClient { public static class TestNettyMessagingService extends NettyMessagingService { public TestNettyMessagingService(int port) throws Exception { super(port); - PayloadSerializer payloadSerializer = new KryoSerializer(); - this.payloadSerializer = payloadSerializer; } } } diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java index 84984c1513..b8ae5b09eb 100644 --- a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java +++ b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java @@ -7,7 +7,6 @@ public final class SimpleServer { public static void main(String... args) throws Exception { NettyMessagingService server = new NettyMessagingService(8080); server.activate(); - server.setPayloadSerializer(new KryoSerializer()); server.registerHandler("simple", new LoggingHandler()); server.registerHandler("echo", new EchoHandler()); } diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java index 96b877e25c..36d2a1ebaa 100644 --- a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java +++ b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java @@ -2,7 +2,8 @@ package org.onlab.netty; import java.util.concurrent.TimeUnit; -import org.junit.Assert; +import org.apache.commons.lang3.RandomUtils; +import static org.junit.Assert.*; import org.junit.Test; /** @@ -17,11 +18,10 @@ public class PingPongTest { try { pinger.activate(); ponger.activate(); - pinger.setPayloadSerializer(new KryoSerializer()); - ponger.setPayloadSerializer(new KryoSerializer()); ponger.registerHandler("echo", new EchoHandler()); - Response response = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", "hello"); - Assert.assertEquals("hello", response.get(10000, TimeUnit.MILLISECONDS)); + byte[] payload = RandomUtils.nextBytes(100); + Response response = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", payload); + assertArrayEquals(payload, response.get(10000, TimeUnit.MILLISECONDS)); } finally { pinger.deactivate(); ponger.deactivate();