Merge remote-tracking branch 'origin/master'

This commit is contained in:
tom 2014-10-07 12:55:01 -07:00
commit 1daad7c63f
21 changed files with 184 additions and 153 deletions

View File

@ -11,7 +11,7 @@ public class ClusterMessage {
private final NodeId sender; private final NodeId sender;
private final MessageSubject subject; private final MessageSubject subject;
private final Object payload; private final byte[] payload;
// TODO: add field specifying Serializer for payload // TODO: add field specifying Serializer for payload
/** /**
@ -19,7 +19,7 @@ public class ClusterMessage {
* *
* @param subject message subject * @param subject message subject
*/ */
public ClusterMessage(NodeId sender, MessageSubject subject, Object payload) { public ClusterMessage(NodeId sender, MessageSubject subject, byte[] payload) {
this.sender = sender; this.sender = sender;
this.subject = subject; this.subject = subject;
this.payload = payload; this.payload = payload;
@ -48,7 +48,7 @@ public class ClusterMessage {
* *
* @return message payload. * @return message payload.
*/ */
public Object payload() { public byte[] payload() {
return payload; return payload;
} }
} }

View File

@ -23,6 +23,9 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage; import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject; import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoPool;
import org.onlab.netty.Endpoint; import org.onlab.netty.Endpoint;
import org.onlab.netty.Message; import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler; import org.onlab.netty.MessageHandler;
@ -48,6 +51,18 @@ public class ClusterCommunicationManager
//@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) //@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MessagingService messagingService; private MessagingService messagingService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
.register(ClusterMessage.class)
.register(ClusterMembershipEvent.class)
.build()
.populate(1);
}
};
@Activate @Activate
public void activate() { public void activate() {
// TODO: initialize messagingService // TODO: initialize messagingService
@ -92,7 +107,7 @@ public class ClusterCommunicationManager
checkArgument(node != null, "Unknown nodeId: %s", toNodeId); checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort()); Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try { try {
messagingService.sendAsync(nodeEp, message.subject().value(), message); messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
return true; return true;
} catch (IOException e) { } catch (IOException e) {
log.error("Failed to send cluster message to nodeId: " + toNodeId, e); log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
@ -126,7 +141,7 @@ public class ClusterCommunicationManager
broadcast(new ClusterMessage( broadcast(new ClusterMessage(
localNode.id(), localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))); SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
members.remove(node.id()); members.remove(node.id());
} }
@ -138,7 +153,7 @@ public class ClusterCommunicationManager
broadcast(new ClusterMessage( broadcast(new ClusterMessage(
localNode.id(), localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))); SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
} }
} }
@ -147,7 +162,7 @@ public class ClusterCommunicationManager
@Override @Override
public void handle(ClusterMessage message) { public void handle(ClusterMessage message) {
ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload(); ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
ControllerNode node = event.node(); ControllerNode node = event.node();
if (event.type() == ClusterMembershipEventType.HEART_BEAT) { if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
log.info("Node {} sent a hearbeat", node.id()); log.info("Node {} sent a hearbeat", node.id());
@ -172,7 +187,8 @@ public class ClusterCommunicationManager
@Override @Override
public void handle(Message message) { public void handle(Message message) {
handler.handle((ClusterMessage) message.payload()); ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
handler.handle(clusterMessage);
} }
} }
} }

View File

@ -13,6 +13,7 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service; import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.AnnotationsUtil; import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.DefaultAnnotations; import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultDevice; import org.onlab.onos.net.DefaultDevice;
@ -37,7 +38,11 @@ import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage; import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
import org.onlab.onos.store.common.impl.Timestamped; import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoPool;
import org.onlab.util.NewConcurrentHashMap; import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -104,6 +109,24 @@ public class GossipDeviceStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator; protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
.register(InternalDeviceEvent.class)
.register(InternalPortEvent.class)
.register(InternalPortStatusEvent.class)
.register(Timestamped.class)
.register(MastershipBasedTimestamp.class)
.build()
.populate(1);
}
};
@Activate @Activate
public void activate() { public void activate() {
clusterCommunicator.addSubscriber( clusterCommunicator.addSubscriber(
@ -779,17 +802,26 @@ public class GossipDeviceStore
} }
private void notifyPeers(InternalDeviceEvent event) throws IOException { private void notifyPeers(InternalDeviceEvent event) throws IOException {
ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event); ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
SERIALIZER.encode(event));
clusterCommunicator.broadcast(message); clusterCommunicator.broadcast(message);
} }
private void notifyPeers(InternalPortEvent event) throws IOException { private void notifyPeers(InternalPortEvent event) throws IOException {
ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event); ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
GossipDeviceStoreMessageSubjects.PORT_UPDATE,
SERIALIZER.encode(event));
clusterCommunicator.broadcast(message); clusterCommunicator.broadcast(message);
} }
private void notifyPeers(InternalPortStatusEvent event) throws IOException { private void notifyPeers(InternalPortStatusEvent event) throws IOException {
ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event); ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
SERIALIZER.encode(event));
clusterCommunicator.broadcast(message); clusterCommunicator.broadcast(message);
} }
@ -797,7 +829,7 @@ public class GossipDeviceStore
@Override @Override
public void handle(ClusterMessage message) { public void handle(ClusterMessage message) {
log.info("Received device update event from peer: {}", message.sender()); log.info("Received device update event from peer: {}", message.sender());
InternalDeviceEvent event = (InternalDeviceEvent) message.payload(); InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId(); ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId(); DeviceId deviceId = event.deviceId();
Timestamped<DeviceDescription> deviceDescription = event.deviceDescription(); Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
@ -810,7 +842,7 @@ public class GossipDeviceStore
public void handle(ClusterMessage message) { public void handle(ClusterMessage message) {
log.info("Received port update event from peer: {}", message.sender()); log.info("Received port update event from peer: {}", message.sender());
InternalPortEvent event = (InternalPortEvent) message.payload(); InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId(); ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId(); DeviceId deviceId = event.deviceId();
@ -825,7 +857,7 @@ public class GossipDeviceStore
public void handle(ClusterMessage message) { public void handle(ClusterMessage message) {
log.info("Received port status update event from peer: {}", message.sender()); log.info("Received port status update event from peer: {}", message.sender());
InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload(); InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId(); ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId(); DeviceId deviceId = event.deviceId();

View File

@ -17,27 +17,20 @@ public final class ClusterMessageSerializer extends Serializer<ClusterMessage> {
} }
@Override @Override
public void write(Kryo kryo, Output output, ClusterMessage object) { public void write(Kryo kryo, Output output, ClusterMessage message) {
kryo.writeClassAndObject(output, object.sender()); kryo.writeClassAndObject(output, message.sender());
kryo.writeClassAndObject(output, object.subject()); kryo.writeClassAndObject(output, message.subject());
// TODO: write bytes serialized using ClusterMessage specified serializer output.writeInt(message.payload().length);
// write serialized payload size output.writeBytes(message.payload());
//output.writeInt(...);
// write serialized payload
//output.writeBytes(...);
} }
@Override @Override
public ClusterMessage read(Kryo kryo, Input input, public ClusterMessage read(Kryo kryo, Input input,
Class<ClusterMessage> type) { Class<ClusterMessage> type) {
// TODO Auto-generated method stub
NodeId sender = (NodeId) kryo.readClassAndObject(input); NodeId sender = (NodeId) kryo.readClassAndObject(input);
MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input); MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input);
int size = input.readInt(); int payloadSize = input.readInt();
byte[] payloadBytes = input.readBytes(size); byte[] payload = input.readBytes(payloadSize);
// TODO: deserialize payload using ClusterMessage specified serializer
Object payload = null;
return new ClusterMessage(sender, subject, payload); return new ClusterMessage(sender, subject, payload);
} }
}
}

View File

@ -20,6 +20,11 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.ControllerNode.State;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.MastershipTerm; import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId; import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Annotations; import org.onlab.onos.net.Annotations;
@ -42,6 +47,7 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage; import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject; import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.packet.IpPrefix;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -111,8 +117,9 @@ public class GossipDeviceStoreTest {
deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2)); deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService(); ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
ClusterService clusterService = new TestClusterService();
gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterCommunicator); gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterService, clusterCommunicator);
gossipDeviceStore.activate(); gossipDeviceStore.activate();
deviceStore = gossipDeviceStore; deviceStore = gossipDeviceStore;
} }
@ -548,8 +555,12 @@ public class GossipDeviceStoreTest {
private static final class TestGossipDeviceStore extends GossipDeviceStore { private static final class TestGossipDeviceStore extends GossipDeviceStore {
public TestGossipDeviceStore(ClockService clockService, ClusterCommunicationService clusterCommunicator) { public TestGossipDeviceStore(
ClockService clockService,
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator) {
this.clockService = clockService; this.clockService = clockService;
this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator; this.clusterCommunicator = clusterCommunicator;
} }
} }
@ -564,4 +575,45 @@ public class GossipDeviceStoreTest {
@Override @Override
public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {} public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
} }
private static final class TestClusterService implements ClusterService {
private static final ControllerNode ONOS1 =
new DefaultControllerNode(new NodeId("N1"), IpPrefix.valueOf("127.0.0.1"));
private final Map<NodeId, ControllerNode> nodes = new HashMap<>();
private final Map<NodeId, ControllerNode.State> nodeStates = new HashMap<>();
public TestClusterService() {
nodes.put(new NodeId("N1"), ONOS1);
nodeStates.put(new NodeId("N1"), ControllerNode.State.ACTIVE);
}
@Override
public ControllerNode getLocalNode() {
return ONOS1;
}
@Override
public Set<ControllerNode> getNodes() {
return Sets.newHashSet(nodes.values());
}
@Override
public ControllerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
@Override
public State getState(NodeId nodeId) {
return nodeStates.get(nodeId);
}
@Override
public void addListener(ClusterEventListener listener) {
}
@Override
public void removeListener(ClusterEventListener listener) {
}
}
} }

View File

@ -2,6 +2,7 @@ package org.onlab.onos.store.serializers;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import org.onlab.onos.cluster.ControllerNode; import org.onlab.onos.cluster.ControllerNode;
@ -21,6 +22,8 @@ import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole; import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port; import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber; import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
import org.onlab.onos.net.provider.ProviderId; import org.onlab.onos.net.provider.ProviderId;
import org.onlab.packet.IpAddress; import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix; import org.onlab.packet.IpPrefix;
@ -47,6 +50,7 @@ public final class KryoPoolUtil {
.register( .register(
// //
ArrayList.class, ArrayList.class,
Arrays.asList().getClass(),
HashMap.class, HashMap.class,
// //
ControllerNode.State.class, ControllerNode.State.class,
@ -54,8 +58,10 @@ public final class KryoPoolUtil {
DefaultAnnotations.class, DefaultAnnotations.class,
DefaultControllerNode.class, DefaultControllerNode.class,
DefaultDevice.class, DefaultDevice.class,
DefaultDeviceDescription.class,
MastershipRole.class, MastershipRole.class,
Port.class, Port.class,
DefaultPortDescription.class,
Element.class, Element.class,
Link.Type.class Link.Type.class
) )

View File

@ -12,7 +12,7 @@ import java.nio.ByteBuffer;
public class KryoSerializer implements Serializer { public class KryoSerializer implements Serializer {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
private KryoPool serializerPool; protected KryoPool serializerPool;
public KryoSerializer() { public KryoSerializer() {

View File

@ -8,16 +8,15 @@ import java.util.concurrent.TimeoutException;
* This class provides a base implementation of Response, with methods to retrieve the * This class provides a base implementation of Response, with methods to retrieve the
* result and query to see if the result is ready. The result can only be retrieved when * result and query to see if the result is ready. The result can only be retrieved when
* it is ready and the get methods will block if the result is not ready yet. * it is ready and the get methods will block if the result is not ready yet.
* @param <T> type of response.
*/ */
public class AsyncResponse<T> implements Response<T> { public class AsyncResponse implements Response {
private T value; private byte[] value;
private boolean done = false; private boolean done = false;
private final long start = System.nanoTime(); private final long start = System.nanoTime();
@Override @Override
public T get(long timeout, TimeUnit timeUnit) throws TimeoutException { public byte[] get(long timeout, TimeUnit timeUnit) throws TimeoutException {
timeout = timeUnit.toNanos(timeout); timeout = timeUnit.toNanos(timeout);
boolean interrupted = false; boolean interrupted = false;
try { try {
@ -43,7 +42,7 @@ public class AsyncResponse<T> implements Response<T> {
} }
@Override @Override
public T get() throws InterruptedException { public byte[] get() throws InterruptedException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@ -57,11 +56,10 @@ public class AsyncResponse<T> implements Response<T> {
* available. * available.
* @param data response data. * @param data response data.
*/ */
@SuppressWarnings("unchecked") public synchronized void setResponse(byte[] data) {
public synchronized void setResponse(Object data) {
if (!done) { if (!done) {
done = true; done = true;
value = (T) data; value = data;
this.notifyAll(); this.notifyAll();
} }
} }

View File

@ -13,11 +13,8 @@ public final class InternalMessage implements Message {
private long id; private long id;
private Endpoint sender; private Endpoint sender;
private String type; private String type;
private Object payload; private byte[] payload;
private transient NettyMessagingService messagingService; private transient NettyMessagingService messagingService;
// TODO: add transient payload serializer or change payload type to
// byte[], ByteBuffer, etc.
// Must be created using the Builder. // Must be created using the Builder.
private InternalMessage() {} private InternalMessage() {}
@ -35,7 +32,7 @@ public final class InternalMessage implements Message {
} }
@Override @Override
public Object payload() { public byte[] payload() {
return payload; return payload;
} }
@ -44,7 +41,7 @@ public final class InternalMessage implements Message {
} }
@Override @Override
public void respond(Object data) throws IOException { public void respond(byte[] data) throws IOException {
Builder builder = new Builder(messagingService); Builder builder = new Builder(messagingService);
InternalMessage message = builder.withId(this.id) InternalMessage message = builder.withId(this.id)
// FIXME: Sender should be messagingService.localEp. // FIXME: Sender should be messagingService.localEp.
@ -81,7 +78,7 @@ public final class InternalMessage implements Message {
message.sender = sender; message.sender = sender;
return this; return this;
} }
public Builder withPayload(Object payload) { public Builder withPayload(byte[] payload) {
message.payload = payload; message.payload = payload;
return this; return this;
} }

View File

@ -10,7 +10,7 @@ import java.util.HashMap;
/** /**
* Kryo Serializer. * Kryo Serializer.
*/ */
public class KryoSerializer implements PayloadSerializer { public class KryoSerializer {
private KryoPool serializerPool; private KryoPool serializerPool;
@ -28,29 +28,26 @@ public class KryoSerializer implements PayloadSerializer {
HashMap.class, HashMap.class,
ArrayList.class, ArrayList.class,
InternalMessage.class, InternalMessage.class,
Endpoint.class Endpoint.class,
byte[].class
) )
.build() .build()
.populate(1); .populate(1);
} }
@Override
public <T> T decode(byte[] data) { public <T> T decode(byte[] data) {
return serializerPool.deserialize(data); return serializerPool.deserialize(data);
} }
@Override
public byte[] encode(Object payload) { public byte[] encode(Object payload) {
return serializerPool.serialize(payload); return serializerPool.serialize(payload);
} }
@Override
public <T> T decode(ByteBuffer buffer) { public <T> T decode(ByteBuffer buffer) {
return serializerPool.deserialize(buffer); return serializerPool.deserialize(buffer);
} }
@Override
public void encode(Object obj, ByteBuffer buffer) { public void encode(Object obj, ByteBuffer buffer) {
serializerPool.serialize(obj, buffer); serializerPool.serialize(obj, buffer);
} }

View File

@ -12,6 +12,6 @@ public class LoggingHandler implements MessageHandler {
@Override @Override
public void handle(Message message) { public void handle(Message message) {
log.info("Received message. Payload: " + message.payload()); log.info("Received message. Payload has {} bytes", message.payload().length);
} }
} }

View File

@ -12,12 +12,12 @@ public interface Message {
* Returns the payload of this message. * Returns the payload of this message.
* @return message payload. * @return message payload.
*/ */
public Object payload(); public byte[] payload();
/** /**
* Sends a reply back to the sender of this messge. * Sends a reply back to the sender of this message.
* @param data payload of the response. * @param data payload of the response.
* @throws IOException if there is a communication error. * @throws IOException if there is a communication error.
*/ */
public void respond(Object data) throws IOException; public void respond(byte[] data) throws IOException;
} }

View File

@ -14,14 +14,14 @@ import java.util.List;
public class MessageDecoder extends ReplayingDecoder<DecoderState> { public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private final NettyMessagingService messagingService; private final NettyMessagingService messagingService;
private final PayloadSerializer payloadSerializer;
private static final KryoSerializer SERIALIZER = new KryoSerializer();
private int contentLength; private int contentLength;
public MessageDecoder(NettyMessagingService messagingService, PayloadSerializer payloadSerializer) { public MessageDecoder(NettyMessagingService messagingService) {
super(DecoderState.READ_HEADER_VERSION); super(DecoderState.READ_HEADER_VERSION);
this.messagingService = messagingService; this.messagingService = messagingService;
this.payloadSerializer = payloadSerializer;
} }
@Override @Override
@ -48,7 +48,7 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version"); checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
checkpoint(DecoderState.READ_CONTENT); checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT: case READ_CONTENT:
InternalMessage message = payloadSerializer.decode(buffer.readBytes(contentLength).nioBuffer()); InternalMessage message = SERIALIZER.decode(buffer.readBytes(contentLength).nioBuffer());
message.setMessagingService(messagingService); message.setMessagingService(messagingService);
out.add(message); out.add(message);
checkpoint(DecoderState.READ_HEADER_VERSION); checkpoint(DecoderState.READ_HEADER_VERSION);

View File

@ -17,11 +17,7 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
public static final int SERIALIZER_VERSION = 1; public static final int SERIALIZER_VERSION = 1;
private final PayloadSerializer payloadSerializer; private static final KryoSerializer SERIALIZER = new KryoSerializer();
public MessageEncoder(PayloadSerializer payloadSerializer) {
this.payloadSerializer = payloadSerializer;
}
@Override @Override
protected void encode( protected void encode(
@ -35,7 +31,12 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
// write preamble // write preamble
out.writeBytes(PREAMBLE); out.writeBytes(PREAMBLE);
byte[] payload = payloadSerializer.encode(message); try {
SERIALIZER.encode(message);
} catch (Exception e) {
e.printStackTrace();
}
byte[] payload = SERIALIZER.encode(message);
// write payload length // write payload length
out.writeInt(payload.length); out.writeInt(payload.length);

View File

@ -11,10 +11,10 @@ public interface MessagingService {
* The message is specified using the type and payload. * The message is specified using the type and payload.
* @param ep end point to send the message to. * @param ep end point to send the message to.
* @param type type of message. * @param type type of message.
* @param payload message payload. * @param payload message payload bytes.
* @throws IOException * @throws IOException
*/ */
public void sendAsync(Endpoint ep, String type, Object payload) throws IOException; public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException;
/** /**
* Sends a message synchronously and waits for a response. * Sends a message synchronously and waits for a response.
@ -24,7 +24,7 @@ public interface MessagingService {
* @return a response future * @return a response future
* @throws IOException * @throws IOException
*/ */
public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) throws IOException; public Response sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
/** /**
* Registers a new message handler for message type. * Registers a new message handler for message type.
@ -38,12 +38,4 @@ public interface MessagingService {
* @param type message type * @param type message type
*/ */
public void unregisterHandler(String type); public void unregisterHandler(String type);
}
// FIXME: remove me and add PayloadSerializer to all other methods
/**
* Specify the serializer to use for encoding/decoding payload.
*
* @param payloadSerializer payloadSerializer to use
*/
public void setPayloadSerializer(PayloadSerializer payloadSerializer);
}

View File

@ -43,7 +43,7 @@ public class NettyMessagingService implements MessagingService {
private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup(); private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>(); private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private final Cache<Long, AsyncResponse<?>> responseFutures = CacheBuilder.newBuilder() private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000) .maximumSize(100000)
.weakValues() .weakValues()
// TODO: Once the entry expires, notify blocking threads (if any). // TODO: Once the entry expires, notify blocking threads (if any).
@ -52,8 +52,6 @@ public class NettyMessagingService implements MessagingService {
private final GenericKeyedObjectPool<Endpoint, Channel> channels private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory()); = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
protected PayloadSerializer payloadSerializer;
public NettyMessagingService() { public NettyMessagingService() {
// TODO: Default port should be configurable. // TODO: Default port should be configurable.
this(8080); this(8080);
@ -83,7 +81,7 @@ public class NettyMessagingService implements MessagingService {
} }
@Override @Override
public void sendAsync(Endpoint ep, String type, Object payload) throws IOException { public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
InternalMessage message = new InternalMessage.Builder(this) InternalMessage message = new InternalMessage.Builder(this)
.withId(RandomUtils.nextLong()) .withId(RandomUtils.nextLong())
.withSender(localEp) .withSender(localEp)
@ -108,9 +106,9 @@ public class NettyMessagingService implements MessagingService {
} }
@Override @Override
public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) public Response sendAndReceive(Endpoint ep, String type, byte[] payload)
throws IOException { throws IOException {
AsyncResponse<T> futureResponse = new AsyncResponse<T>(); AsyncResponse futureResponse = new AsyncResponse();
Long messageId = RandomUtils.nextLong(); Long messageId = RandomUtils.nextLong();
responseFutures.put(messageId, futureResponse); responseFutures.put(messageId, futureResponse);
InternalMessage message = new InternalMessage.Builder(this) InternalMessage message = new InternalMessage.Builder(this)
@ -133,11 +131,6 @@ public class NettyMessagingService implements MessagingService {
handlers.remove(type); handlers.remove(type);
} }
@Override
public void setPayloadSerializer(PayloadSerializer payloadSerializer) {
this.payloadSerializer = payloadSerializer;
}
private MessageHandler getMessageHandler(String type) { private MessageHandler getMessageHandler(String type) {
return handlers.get(type); return handlers.get(type);
} }
@ -202,13 +195,13 @@ public class NettyMessagingService implements MessagingService {
private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> { private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher(); private final ChannelHandler dispatcher = new InboundMessageDispatcher();
private final ChannelHandler encoder = new MessageEncoder(payloadSerializer); private final ChannelHandler encoder = new MessageEncoder();
@Override @Override
protected void initChannel(SocketChannel channel) throws Exception { protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline() channel.pipeline()
.addLast("encoder", encoder) .addLast("encoder", encoder)
.addLast("decoder", new MessageDecoder(NettyMessagingService.this, payloadSerializer)) .addLast("decoder", new MessageDecoder(NettyMessagingService.this))
.addLast("handler", dispatcher); .addLast("handler", dispatcher);
} }
} }
@ -237,7 +230,7 @@ public class NettyMessagingService implements MessagingService {
String type = message.type(); String type = message.type();
if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) { if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
try { try {
AsyncResponse<?> futureResponse = AsyncResponse futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id()); NettyMessagingService.this.responseFutures.getIfPresent(message.id());
if (futureResponse != null) { if (futureResponse != null) {
futureResponse.setResponse(message.payload()); futureResponse.setResponse(message.payload());

View File

@ -1,41 +0,0 @@
package org.onlab.netty;
import java.nio.ByteBuffer;
/**
* Interface for encoding/decoding message payloads.
*/
public interface PayloadSerializer {
/**
* Decodes the specified byte array to a POJO.
*
* @param data byte array.
* @return POJO
*/
public <T> T decode(byte[] data);
/**
* Encodes the specified POJO into a byte array.
*
* @param data POJO to be encoded
* @return byte array.
*/
public byte[] encode(Object data);
/**
* Encodes the specified POJO into a byte buffer.
*
* @param data POJO to be encoded
* @param buffer to write serialized bytes
*/
public void encode(final Object data, ByteBuffer buffer);
/**
* Decodes the specified byte buffer to a POJO.
*
* @param buffer bytes to be decoded
* @return POJO
*/
public <T> T decode(final ByteBuffer buffer);
}

View File

@ -7,26 +7,24 @@ import java.util.concurrent.TimeoutException;
* Response object returned when making synchronous requests. * Response object returned when making synchronous requests.
* Can you used to check is a response is ready and/or wait for a response * Can you used to check is a response is ready and/or wait for a response
* to become available. * to become available.
*
* @param <T> type of response.
*/ */
public interface Response<T> { public interface Response {
/** /**
* Gets the response waiting for a designated timeout period. * Gets the response waiting for a designated timeout period.
* @param timeout timeout period (since request was sent out) * @param timeout timeout period (since request was sent out)
* @param tu unit of time. * @param tu unit of time.
* @return response * @return response payload
* @throws TimeoutException if the timeout expires before the response arrives. * @throws TimeoutException if the timeout expires before the response arrives.
*/ */
public T get(long timeout, TimeUnit tu) throws TimeoutException; public byte[] get(long timeout, TimeUnit tu) throws TimeoutException;
/** /**
* Gets the response waiting for indefinite timeout period. * Gets the response waiting for indefinite timeout period.
* @return response * @return response payload
* @throws InterruptedException if the thread is interrupted before the response arrives. * @throws InterruptedException if the thread is interrupted before the response arrives.
*/ */
public T get() throws InterruptedException; public byte[] get() throws InterruptedException;
/** /**
* Checks if the response is ready without blocking. * Checks if the response is ready without blocking.

View File

@ -24,7 +24,7 @@ public final class SimpleClient {
final int warmup = 100; final int warmup = 100;
for (int i = 0; i < warmup; i++) { for (int i = 0; i < warmup; i++) {
Timer.Context context = sendAsyncTimer.time(); Timer.Context context = sendAsyncTimer.time();
messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World"); messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World".getBytes());
context.stop(); context.stop();
} }
metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer); metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer);
@ -33,10 +33,10 @@ public final class SimpleClient {
final int iterations = 1000000; final int iterations = 1000000;
for (int i = 0; i < iterations; i++) { for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAndReceiveTimer.time(); Timer.Context context = sendAndReceiveTimer.time();
Response<String> response = messaging Response response = messaging
.sendAndReceive(new Endpoint("localhost", 8080), "echo", .sendAndReceive(new Endpoint("localhost", 8080), "echo",
"Hello World"); "Hello World".getBytes());
System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS)); System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
context.stop(); context.stop();
} }
metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer); metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer);
@ -45,8 +45,6 @@ public final class SimpleClient {
public static class TestNettyMessagingService extends NettyMessagingService { public static class TestNettyMessagingService extends NettyMessagingService {
public TestNettyMessagingService(int port) throws Exception { public TestNettyMessagingService(int port) throws Exception {
super(port); super(port);
PayloadSerializer payloadSerializer = new KryoSerializer();
this.payloadSerializer = payloadSerializer;
} }
} }
} }

View File

@ -7,7 +7,6 @@ public final class SimpleServer {
public static void main(String... args) throws Exception { public static void main(String... args) throws Exception {
NettyMessagingService server = new NettyMessagingService(8080); NettyMessagingService server = new NettyMessagingService(8080);
server.activate(); server.activate();
server.setPayloadSerializer(new KryoSerializer());
server.registerHandler("simple", new LoggingHandler()); server.registerHandler("simple", new LoggingHandler());
server.registerHandler("echo", new EchoHandler()); server.registerHandler("echo", new EchoHandler());
} }

View File

@ -2,7 +2,8 @@ package org.onlab.netty;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.junit.Assert; import org.apache.commons.lang3.RandomUtils;
import static org.junit.Assert.*;
import org.junit.Test; import org.junit.Test;
/** /**
@ -17,11 +18,10 @@ public class PingPongTest {
try { try {
pinger.activate(); pinger.activate();
ponger.activate(); ponger.activate();
pinger.setPayloadSerializer(new KryoSerializer());
ponger.setPayloadSerializer(new KryoSerializer());
ponger.registerHandler("echo", new EchoHandler()); ponger.registerHandler("echo", new EchoHandler());
Response<String> response = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", "hello"); byte[] payload = RandomUtils.nextBytes(100);
Assert.assertEquals("hello", response.get(10000, TimeUnit.MILLISECONDS)); Response response = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", payload);
assertArrayEquals(payload, response.get(10000, TimeUnit.MILLISECONDS));
} finally { } finally {
pinger.deactivate(); pinger.deactivate();
ponger.deactivate(); ponger.deactivate();