From e138e27e2a3804a36d93f774fadde5876b2109f7 Mon Sep 17 00:00:00 2001 From: pankaj Date: Thu, 2 Oct 2014 15:17:11 -0700 Subject: [PATCH 01/10] cleanup the constructors --- .../java/org/onlab/metrics/MetricsManager.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java b/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java index 2b13efb125..3b66df3e46 100644 --- a/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java +++ b/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java @@ -1,7 +1,6 @@ package org.onlab.metrics; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import com.codahale.metrics.Counter; @@ -50,19 +49,18 @@ public final class MetricsManager implements MetricsService { /** * Registry to hold the Components defined in the system. */ - private ConcurrentMap componentsRegistry = - new ConcurrentHashMap<>(); + private ConcurrentMap componentsRegistry; /** * Registry for the Metrics objects created in the system. */ private final MetricRegistry metricsRegistry = new MetricRegistry(); - /** - * Hide constructor. The only way to get the registry is through the - * singleton getter. - */ - private MetricsManager() {} + public MetricsManager( + ConcurrentMap componentsRegistry) { + this.componentsRegistry = componentsRegistry; + + } /** * Registers a component. From 208221de386f53562a5df6ca7d9689db3e5d2803 Mon Sep 17 00:00:00 2001 From: pankaj Date: Thu, 2 Oct 2014 15:17:52 -0700 Subject: [PATCH 02/10] move to stable metrics 3.1.0 --- features/features.xml | 2 +- utils/misc/pom.xml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/features/features.xml b/features/features.xml index f008c14df1..68fa8c32a6 100644 --- a/features/features.xml +++ b/features/features.xml @@ -11,7 +11,7 @@ mvn:io.netty/netty/3.9.2.Final mvn:com.hazelcast/hazelcast/3.3 - mvn:com.codahale.metrics/metrics-core/3.0.2 + mvn:io.dropwizard.metrics/metrics-core/3.1.0 mvn:com.eclipsesource.minimal-json/minimal-json/0.9.1 mvn:com.esotericsoftware.kryo/kryo/2.24.0 diff --git a/utils/misc/pom.xml b/utils/misc/pom.xml index bb25635bf2..ae47a38b43 100644 --- a/utils/misc/pom.xml +++ b/utils/misc/pom.xml @@ -56,9 +56,9 @@ objenesis - com.codahale.metrics + io.dropwizard.metrics metrics-core - 3.0.2 + 3.1.0 From 5de8411733fece5bd871088b2af2e83e391be2f3 Mon Sep 17 00:00:00 2001 From: pankaj Date: Thu, 2 Oct 2014 15:33:28 -0700 Subject: [PATCH 03/10] create a default CSV reporter --- .../org/onlab/metrics/MetricsManager.java | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java b/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java index 3b66df3e46..a54093f015 100644 --- a/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java +++ b/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java @@ -1,9 +1,14 @@ package org.onlab.metrics; +import java.io.File; +import java.util.Locale; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import com.codahale.metrics.Counter; +import com.codahale.metrics.CsvReporter; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; @@ -54,12 +59,24 @@ public final class MetricsManager implements MetricsService { /** * Registry for the Metrics objects created in the system. */ - private final MetricRegistry metricsRegistry = new MetricRegistry(); + private final MetricRegistry metricsRegistry; - public MetricsManager( - ConcurrentMap componentsRegistry) { - this.componentsRegistry = componentsRegistry; + /** + * Default Reporter for this metrics manager. + */ + private final CsvReporter reporter; + public MetricsManager() { + this.componentsRegistry = new ConcurrentHashMap<>(); + this.metricsRegistry = new MetricRegistry(); + + this.reporter = CsvReporter.forRegistry(metricsRegistry) + .formatFor(Locale.US) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MICROSECONDS) + .build(new File("/tmp/")); + + reporter.start(10, TimeUnit.SECONDS); } /** From ab6d311b83d1bc840d8a649211f797b94e715c20 Mon Sep 17 00:00:00 2001 From: Madan Jampani Date: Thu, 2 Oct 2014 16:30:14 -0700 Subject: [PATCH 04/10] Adding missing files under onlab-netty --- .../java/org/onlab/netty/AsyncResponse.java | 68 +++++ .../java/org/onlab/netty/EchoHandler.java | 15 ++ .../main/java/org/onlab/netty/Endpoint.java | 62 +++++ .../java/org/onlab/netty/InternalMessage.java | 85 ++++++ .../java/org/onlab/netty/KryoSerializer.java | 47 ++++ .../java/org/onlab/netty/LoggingHandler.java | 12 + .../main/java/org/onlab/netty/Message.java | 23 ++ .../java/org/onlab/netty/MessageDecoder.java | 58 +++++ .../java/org/onlab/netty/MessageEncoder.java | 60 +++++ .../java/org/onlab/netty/MessageHandler.java | 16 ++ .../org/onlab/netty/MessagingService.java | 41 +++ .../onlab/netty/NettyMessagingService.java | 244 ++++++++++++++++++ .../main/java/org/onlab/netty/Response.java | 36 +++ .../main/java/org/onlab/netty/Serializer.java | 24 ++ .../java/org/onlab/netty/SimpleClient.java | 24 ++ .../java/org/onlab/netty/SimpleServer.java | 19 ++ 16 files changed, 834 insertions(+) create mode 100644 utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java create mode 100644 utils/netty/src/main/java/org/onlab/netty/EchoHandler.java create mode 100644 utils/netty/src/main/java/org/onlab/netty/Endpoint.java create mode 100644 utils/netty/src/main/java/org/onlab/netty/InternalMessage.java create mode 100644 utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java create mode 100644 utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java create mode 100644 utils/netty/src/main/java/org/onlab/netty/Message.java create mode 100644 utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java create mode 100644 utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java create mode 100644 utils/netty/src/main/java/org/onlab/netty/MessageHandler.java create mode 100644 utils/netty/src/main/java/org/onlab/netty/MessagingService.java create mode 100644 utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java create mode 100644 utils/netty/src/main/java/org/onlab/netty/Response.java create mode 100644 utils/netty/src/main/java/org/onlab/netty/Serializer.java create mode 100644 utils/netty/src/main/java/org/onlab/netty/SimpleClient.java create mode 100644 utils/netty/src/main/java/org/onlab/netty/SimpleServer.java diff --git a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java new file mode 100644 index 0000000000..b2b490e88f --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java @@ -0,0 +1,68 @@ +package org.onlab.netty; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * An asynchronous response. + * This class provides a base implementation of Response, with methods to retrieve the + * result and query to see if the result is ready. The result can only be retrieved when + * it is ready and the get methods will block if the result is not ready yet. + * @param type of response. + */ +public class AsyncResponse implements Response { + + private T value; + private boolean done = false; + private final long start = System.nanoTime(); + + @Override + public T get(long timeout, TimeUnit tu) throws TimeoutException { + timeout = tu.toNanos(timeout); + boolean interrupted = false; + try { + synchronized (this) { + while (!done) { + try { + long timeRemaining = timeout - (System.nanoTime() - start); + if (timeRemaining <= 0) { + throw new TimeoutException("Operation timed out."); + } + TimeUnit.NANOSECONDS.timedWait(this, timeRemaining); + } catch (InterruptedException e) { + interrupted = true; + } + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + return value; + } + + @Override + public T get() throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReady() { + return done; + } + + /** + * Sets response value and unblocks any thread blocking on the response to become + * available. + * @param data response data. + */ + @SuppressWarnings("unchecked") + public synchronized void setResponse(Object data) { + if (!done) { + done = true; + value = (T) data; + this.notifyAll(); + } + } +} diff --git a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java new file mode 100644 index 0000000000..313a448693 --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java @@ -0,0 +1,15 @@ +package org.onlab.netty; + +import java.io.IOException; + +/** + * Message handler that echos the message back to the sender. + */ +public class EchoHandler implements MessageHandler { + + @Override + public void handle(Message message) throws IOException { + System.out.println("Received: " + message.payload() + ". Echoing it back to the sender."); + message.respond(message.payload()); + } +} diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java new file mode 100644 index 0000000000..8681093ba0 --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java @@ -0,0 +1,62 @@ +package org.onlab.netty; + +/** + * Representation of a TCP/UDP communication end point. + */ +public class Endpoint { + + private final int port; + private final String host; + + public Endpoint(String host, int port) { + this.host = host; + this.port = port; + } + + public String host() { + return host; + } + + public int port() { + return port; + } + + @Override + public String toString() { + return "Endpoint [port=" + port + ", host=" + host + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((host == null) ? 0 : host.hashCode()); + result = prime * result + port; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Endpoint other = (Endpoint) obj; + if (host == null) { + if (other.host != null) { + return false; + } + } else if (!host.equals(other.host)) { + return false; + } + if (port != other.port) { + return false; + } + return true; + } +} diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java new file mode 100644 index 0000000000..bcf6f52c84 --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java @@ -0,0 +1,85 @@ +package org.onlab.netty; + +import java.io.IOException; + +/** + * Internal message representation with additional attributes + * for supporting, synchronous request/reply behavior. + */ +public final class InternalMessage implements Message { + + private long id; + private Endpoint sender; + private String type; + private Object payload; + private transient NettyMessagingService messagingService; + public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY"; + + // Must be created using the Builder. + private InternalMessage() {} + + public long id() { + return id; + } + + public String type() { + return type; + } + + public Endpoint sender() { + return sender; + } + + @Override + public Object payload() { + return payload; + } + + @Override + public void respond(Object data) throws IOException { + Builder builder = new Builder(messagingService); + InternalMessage message = builder.withId(this.id) + // FIXME: Sender should be messagingService.localEp. + .withSender(this.sender) + .withPayload(data) + .withType(REPLY_MESSAGE_TYPE) + .build(); + messagingService.sendAsync(sender, message); + } + + + /** + * Builder for InternalMessages. + */ + public static class Builder { + private InternalMessage message; + + public Builder(NettyMessagingService messagingService) { + message = new InternalMessage(); + message.messagingService = messagingService; + } + + public Builder withId(long id) { + message.id = id; + return this; + } + + public Builder withType(String type) { + message.type = type; + return this; + } + + public Builder withSender(Endpoint sender) { + message.sender = sender; + return this; + } + public Builder withPayload(Object payload) { + message.payload = payload; + return this; + } + + public InternalMessage build() { + return message; + } + } +} diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java new file mode 100644 index 0000000000..73c01a0add --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java @@ -0,0 +1,47 @@ +package org.onlab.netty; + +import org.onlab.util.KryoPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; + +/** + * Kryo Serializer. + */ +public class KryoSerializer implements Serializer { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private KryoPool serializerPool; + + public KryoSerializer() { + setupKryoPool(); + } + + /** + * Sets up the common serialzers pool. + */ + protected void setupKryoPool() { + // FIXME Slice out types used in common to separate pool/namespace. + serializerPool = KryoPool.newBuilder() + .register(ArrayList.class, + HashMap.class, + ArrayList.class + ) + .build() + .populate(1); + } + + + @Override + public Object decode(byte[] data) { + return serializerPool.deserialize(data); + } + + @Override + public byte[] encode(Object payload) { + return serializerPool.serialize(payload); + } +} diff --git a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java new file mode 100644 index 0000000000..ed6cdb4d0b --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java @@ -0,0 +1,12 @@ +package org.onlab.netty; + +/** + * A MessageHandler that simply logs the information. + */ +public class LoggingHandler implements MessageHandler { + + @Override + public void handle(Message message) { + System.out.println("Received: " + message.payload()); + } +} diff --git a/utils/netty/src/main/java/org/onlab/netty/Message.java b/utils/netty/src/main/java/org/onlab/netty/Message.java new file mode 100644 index 0000000000..54b95260d3 --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/Message.java @@ -0,0 +1,23 @@ +package org.onlab.netty; + +import java.io.IOException; + +/** + * A unit of communication. + * Has a payload. Also supports a feature to respond back to the sender. + */ +public interface Message { + + /** + * Returns the payload of this message. + * @return message payload. + */ + public Object payload(); + + /** + * Sends a reply back to the sender of this messge. + * @param data payload of the response. + * @throws IOException if there is a communication error. + */ + public void respond(Object data) throws IOException; +} diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java new file mode 100644 index 0000000000..ecf2d62998 --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java @@ -0,0 +1,58 @@ +package org.onlab.netty; + +import java.util.Arrays; +import java.util.List; + +import static com.google.common.base.Preconditions.checkState; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +/** + * Decode bytes into a InternalMessage. + */ +public class MessageDecoder extends ByteToMessageDecoder { + + private final NettyMessagingService messagingService; + private final Serializer serializer; + + public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) { + this.messagingService = messagingService; + this.serializer = serializer; + } + + @Override + protected void decode(ChannelHandlerContext context, ByteBuf in, + List messages) throws Exception { + + byte[] preamble = in.readBytes(MessageEncoder.PREAMBLE.length).array(); + checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble"); + + // read message Id. + long id = in.readLong(); + + // read message type; first read size and then bytes. + String type = new String(in.readBytes(in.readInt()).array()); + + // read sender host name; first read size and then bytes. + String host = new String(in.readBytes(in.readInt()).array()); + + // read sender port. + int port = in.readInt(); + + Endpoint sender = new Endpoint(host, port); + + // read message payload; first read size and then bytes. + Object payload = serializer.decode(in.readBytes(in.readInt()).array()); + + InternalMessage message = new InternalMessage.Builder(messagingService) + .withId(id) + .withSender(sender) + .withType(type) + .withPayload(payload) + .build(); + + messages.add(message); + } +} diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java new file mode 100644 index 0000000000..1b52a0fe9e --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java @@ -0,0 +1,60 @@ +package org.onlab.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +/** + * Encode InternalMessage out into a byte buffer. + */ +public class MessageEncoder extends MessageToByteEncoder { + + // onosiscool in ascii + public static final byte[] PREAMBLE = "onosiscool".getBytes(); + + private final Serializer serializer; + + public MessageEncoder(Serializer serializer) { + this.serializer = serializer; + } + + @Override + protected void encode(ChannelHandlerContext context, InternalMessage message, + ByteBuf out) throws Exception { + + // write preamble + out.writeBytes(PREAMBLE); + + // write id + out.writeLong(message.id()); + + // write type length + out.writeInt(message.type().length()); + + // write type + out.writeBytes(message.type().getBytes()); + + // write sender host name size + out.writeInt(message.sender().host().length()); + + // write sender host name. + out.writeBytes(message.sender().host().getBytes()); + + // write port + out.writeInt(message.sender().port()); + + try { + serializer.encode(message.payload()); + } catch (Exception e) { + e.printStackTrace(); + } + + byte[] payload = serializer.encode(message.payload()); + + // write payload length. + out.writeInt(payload.length); + + // write payload bytes + out.writeBytes(payload); + } +} diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java b/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java new file mode 100644 index 0000000000..7bd5a7f47f --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java @@ -0,0 +1,16 @@ +package org.onlab.netty; + +import java.io.IOException; + +/** + * Handler for a message. + */ +public interface MessageHandler { + + /** + * Handles the message. + * @param message message. + * @throws IOException. + */ + public void handle(Message message) throws IOException; +} diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java new file mode 100644 index 0000000000..ebad44265b --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java @@ -0,0 +1,41 @@ +package org.onlab.netty; + +import java.io.IOException; + +/** + * Interface for low level messaging primitives. + */ +public interface MessagingService { + /** + * Sends a message asynchronously to the specified communication end point. + * The message is specified using the type and payload. + * @param ep end point to send the message to. + * @param type type of message. + * @param payload message payload. + * @throws IOException + */ + public void sendAsync(Endpoint ep, String type, Object payload) throws IOException; + + /** + * Sends a message synchronously and waits for a response. + * @param ep end point to send the message to. + * @param type type of message. + * @param payload message payload. + * @return a response future + * @throws IOException + */ + public Response sendAndReceive(Endpoint ep, String type, Object payload) throws IOException; + + /** + * Registers a new message handler for message type. + * @param type message type. + * @param handler message handler + */ + public void registerHandler(String type, MessageHandler handler); + + /** + * Unregister current handler, if one exists for message type. + * @param type message type + */ + public void unregisterHandler(String type); +} diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java new file mode 100644 index 0000000000..54da8cc4a8 --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java @@ -0,0 +1,244 @@ +package org.onlab.netty; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + +import org.apache.commons.lang.math.RandomUtils; +import org.apache.commons.pool.KeyedObjectPool; +import org.apache.commons.pool.KeyedPoolableObjectFactory; +import org.apache.commons.pool.impl.GenericKeyedObjectPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * A Netty based implementation of MessagingService. + */ +public class NettyMessagingService implements MessagingService { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private KeyedObjectPool channels = + new GenericKeyedObjectPool(new OnosCommunicationChannelFactory()); + private final int port; + private final EventLoopGroup bossGroup = new NioEventLoopGroup(); + private final EventLoopGroup workerGroup = new NioEventLoopGroup(); + private final ConcurrentMap handlers = new ConcurrentHashMap<>(); + private Cache> responseFutures; + private final Endpoint localEp; + + protected Serializer serializer; + + public NettyMessagingService() { + // TODO: Default port should be configurable. + this(8080); + } + + // FIXME: Constructor should not throw exceptions. + public NettyMessagingService(int port) { + this.port = port; + try { + localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port); + } catch (UnknownHostException e) { + // bailing out. + throw new RuntimeException(e); + } + } + + public void activate() throws Exception { + responseFutures = CacheBuilder.newBuilder() + .maximumSize(100000) + .weakValues() + // TODO: Once the entry expires, notify blocking threads (if any). + .expireAfterWrite(10, TimeUnit.MINUTES) + .build(); + startAcceptingConnections(); + } + + public void deactivate() throws Exception { + channels.close(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + + @Override + public void sendAsync(Endpoint ep, String type, Object payload) throws IOException { + InternalMessage message = new InternalMessage.Builder(this) + .withId(RandomUtils.nextLong()) + .withSender(localEp) + .withType(type) + .withPayload(payload) + .build(); + sendAsync(ep, message); + } + + protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException { + Channel channel = null; + try { + channel = channels.borrowObject(ep); + channel.eventLoop().execute(new WriteTask(channel, message)); + } catch (Exception e) { + throw new IOException(e); + } finally { + try { + channels.returnObject(ep, channel); + } catch (Exception e) { + log.warn("Error returning object back to the pool", e); + // ignored. + } + } + } + + @Override + public Response sendAndReceive(Endpoint ep, String type, Object payload) + throws IOException { + AsyncResponse futureResponse = new AsyncResponse(); + Long messageId = RandomUtils.nextLong(); + responseFutures.put(messageId, futureResponse); + InternalMessage message = new InternalMessage.Builder(this) + .withId(messageId) + .withSender(localEp) + .withType(type) + .withPayload(payload) + .build(); + sendAsync(ep, message); + return futureResponse; + } + + @Override + public void registerHandler(String type, MessageHandler handler) { + // TODO: Is this the right semantics for handler registration? + handlers.putIfAbsent(type, handler); + } + + public void unregisterHandler(String type) { + handlers.remove(type); + } + + private MessageHandler getMessageHandler(String type) { + return handlers.get(type); + } + + private void startAcceptingConnections() throws InterruptedException { + ServerBootstrap b = new ServerBootstrap(); + b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new OnosCommunicationChannelInitializer()) + .option(ChannelOption.SO_BACKLOG, 128) + .childOption(ChannelOption.SO_KEEPALIVE, true); + + // Bind and start to accept incoming connections. + b.bind(port).sync(); + } + + private class OnosCommunicationChannelFactory + implements KeyedPoolableObjectFactory { + + @Override + public void activateObject(Endpoint endpoint, Channel channel) + throws Exception { + } + + @Override + public void destroyObject(Endpoint ep, Channel channel) throws Exception { + channel.close(); + } + + @Override + public Channel makeObject(Endpoint ep) throws Exception { + Bootstrap b = new Bootstrap(); + b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + b.group(workerGroup); + // TODO: Make this faster: + // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0 + b.channel(NioSocketChannel.class); + b.option(ChannelOption.SO_KEEPALIVE, true); + b.handler(new OnosCommunicationChannelInitializer()); + + // Start the client. + ChannelFuture f = b.connect(ep.host(), ep.port()).sync(); + return f.channel(); + } + + @Override + public void passivateObject(Endpoint ep, Channel channel) + throws Exception { + } + + @Override + public boolean validateObject(Endpoint ep, Channel channel) { + return channel.isOpen(); + } + } + + private class OnosCommunicationChannelInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel channel) throws Exception { + channel.pipeline() + .addLast(new MessageEncoder(serializer)) + .addLast(new MessageDecoder(NettyMessagingService.this, serializer)) + .addLast(new NettyMessagingService.InboundMessageDispatcher()); + } + } + + private class WriteTask implements Runnable { + + private final Object message; + private final Channel channel; + + public WriteTask(Channel channel, Object message) { + this.message = message; + this.channel = channel; + } + + @Override + public void run() { + channel.writeAndFlush(message); + } + } + + private class InboundMessageDispatcher extends SimpleChannelInboundHandler { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception { + String type = message.type(); + if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) { + try { + AsyncResponse futureResponse = + NettyMessagingService.this.responseFutures.getIfPresent(message.id()); + if (futureResponse != null) { + futureResponse.setResponse(message.payload()); + } + log.warn("Received a reply. But was unable to locate the request handle"); + } finally { + NettyMessagingService.this.responseFutures.invalidate(message.id()); + } + return; + } + MessageHandler handler = NettyMessagingService.this.getMessageHandler(type); + handler.handle(message); + } + } +} diff --git a/utils/netty/src/main/java/org/onlab/netty/Response.java b/utils/netty/src/main/java/org/onlab/netty/Response.java new file mode 100644 index 0000000000..04675ce28d --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/Response.java @@ -0,0 +1,36 @@ +package org.onlab.netty; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Response object returned when making synchronous requests. + * Can you used to check is a response is ready and/or wait for a response + * to become available. + * + * @param type of response. + */ +public interface Response { + + /** + * Gets the response waiting for a designated timeout period. + * @param timeout timeout period (since request was sent out) + * @param tu unit of time. + * @return response + * @throws TimeoutException if the timeout expires before the response arrives. + */ + public T get(long timeout, TimeUnit tu) throws TimeoutException; + + /** + * Gets the response waiting for indefinite timeout period. + * @return response + * @throws InterruptedException if the thread is interrupted before the response arrives. + */ + public T get() throws InterruptedException; + + /** + * Checks if the response is ready without blocking. + * @return true if response is ready, false otherwise. + */ + public boolean isReady(); +} diff --git a/utils/netty/src/main/java/org/onlab/netty/Serializer.java b/utils/netty/src/main/java/org/onlab/netty/Serializer.java new file mode 100644 index 0000000000..ac55f5af7a --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/Serializer.java @@ -0,0 +1,24 @@ +package org.onlab.netty; + +/** + * Interface for encoding/decoding message payloads. + */ +public interface Serializer { + + /** + * Decodes the specified byte array to a POJO. + * + * @param data byte array. + * @return POJO + */ + Object decode(byte[] data); + + /** + * Encodes the specified POJO into a byte array. + * + * @param data POJO to be encoded + * @return byte array. + */ + byte[] encode(Object message); + +} diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java new file mode 100644 index 0000000000..1573780eb3 --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java @@ -0,0 +1,24 @@ +package org.onlab.netty; + +import java.util.concurrent.TimeUnit; + +public final class SimpleClient { + private SimpleClient() {} + + public static void main(String... args) throws Exception { + NettyMessagingService messaging = new TestNettyMessagingService(9081); + messaging.activate(); + + messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World"); + Response response = messaging.sendAndReceive(new Endpoint("localhost", 8080), "echo", "Hello World"); + System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS)); + } + + public static class TestNettyMessagingService extends NettyMessagingService { + public TestNettyMessagingService(int port) throws Exception { + super(port); + Serializer serializer = new KryoSerializer(); + this.serializer = serializer; + } + } +} diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java new file mode 100644 index 0000000000..12fa0251e1 --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java @@ -0,0 +1,19 @@ +package org.onlab.netty; + +public final class SimpleServer { + private SimpleServer() {} + + public static void main(String... args) throws Exception { + NettyMessagingService server = new TestNettyMessagingService(); + server.activate(); + server.registerHandler("simple", new LoggingHandler()); + server.registerHandler("echo", new EchoHandler()); + } + + public static class TestNettyMessagingService extends NettyMessagingService { + protected TestNettyMessagingService() { + Serializer serializer = new KryoSerializer(); + this.serializer = serializer; + } + } +} From f6577b68dcb055d9c6333f0c7ddd80495ea3d623 Mon Sep 17 00:00:00 2001 From: pankaj Date: Thu, 2 Oct 2014 16:38:38 -0700 Subject: [PATCH 05/10] Create a component for metrics --- utils/misc/pom.xml | 4 ++++ .../main/java/org/onlab/metrics/MetricsManager.java | 13 +++++++++++++ 2 files changed, 17 insertions(+) diff --git a/utils/misc/pom.xml b/utils/misc/pom.xml index ae47a38b43..bd3cc08cc1 100644 --- a/utils/misc/pom.xml +++ b/utils/misc/pom.xml @@ -60,6 +60,10 @@ metrics-core 3.1.0 + + org.apache.felix + org.apache.felix.scr.annotations + diff --git a/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java b/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java index a54093f015..e07d3f9248 100644 --- a/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java +++ b/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java @@ -7,6 +7,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; + import com.codahale.metrics.Counter; import com.codahale.metrics.CsvReporter; import com.codahale.metrics.Gauge; @@ -49,6 +53,7 @@ import com.codahale.metrics.Timer; * * */ +@Component(immediate = true) public final class MetricsManager implements MetricsService { /** @@ -79,6 +84,14 @@ public final class MetricsManager implements MetricsService { reporter.start(10, TimeUnit.SECONDS); } + @Activate + public void activate() { + } + + @Deactivate + public void deactivate() { + } + /** * Registers a component. * From 70da512717008331a8691991d9985567fe9292c7 Mon Sep 17 00:00:00 2001 From: Jonathan Hart Date: Wed, 1 Oct 2014 16:37:42 -0700 Subject: [PATCH 06/10] Wired up HostMonitor to its dependencies and got it working. --- .../org/onlab/onos/config/AddressEntry.java | 16 ++- .../onos/config/NetworkConfigReader.java | 30 ++++- .../onlab/onos/net/host/impl/HostManager.java | 30 +++-- .../onlab/onos/net/host/impl/HostMonitor.java | 117 ++++++++++-------- .../main/java/org/onlab/packet/IpPrefix.java | 23 ++++ .../java/org/onlab/packet/IpPrefixTest.java | 15 ++- 6 files changed, 155 insertions(+), 76 deletions(-) diff --git a/apps/config/src/main/java/org/onlab/onos/config/AddressEntry.java b/apps/config/src/main/java/org/onlab/onos/config/AddressEntry.java index 318aebdda4..081efed14a 100644 --- a/apps/config/src/main/java/org/onlab/onos/config/AddressEntry.java +++ b/apps/config/src/main/java/org/onlab/onos/config/AddressEntry.java @@ -3,8 +3,6 @@ package org.onlab.onos.config; import java.util.List; import org.codehaus.jackson.annotate.JsonProperty; -import org.onlab.packet.IpPrefix; -import org.onlab.packet.MacAddress; /** * Represents a set of addresses bound to a port. @@ -12,8 +10,8 @@ import org.onlab.packet.MacAddress; public class AddressEntry { private String dpid; private short portNumber; - private List ipAddresses; - private MacAddress macAddress; + private List ipAddresses; + private String macAddress; public String getDpid() { return dpid; @@ -33,21 +31,21 @@ public class AddressEntry { this.portNumber = portNumber; } - public List getIpAddresses() { + public List getIpAddresses() { return ipAddresses; } @JsonProperty("ips") - public void setIpAddresses(List ipAddresses) { - this.ipAddresses = ipAddresses; + public void setIpAddresses(List strIps) { + this.ipAddresses = strIps; } - public MacAddress getMacAddress() { + public String getMacAddress() { return macAddress; } @JsonProperty("mac") - public void setMacAddress(MacAddress macAddress) { + public void setMacAddress(String macAddress) { this.macAddress = macAddress; } } diff --git a/apps/config/src/main/java/org/onlab/onos/config/NetworkConfigReader.java b/apps/config/src/main/java/org/onlab/onos/config/NetworkConfigReader.java index 985c4a2897..4f1a48ac69 100644 --- a/apps/config/src/main/java/org/onlab/onos/config/NetworkConfigReader.java +++ b/apps/config/src/main/java/org/onlab/onos/config/NetworkConfigReader.java @@ -5,6 +5,8 @@ import static org.slf4j.LoggerFactory.getLogger; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.HashSet; +import java.util.Set; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; @@ -17,10 +19,10 @@ import org.onlab.onos.net.DeviceId; import org.onlab.onos.net.PortNumber; import org.onlab.onos.net.host.HostAdminService; import org.onlab.onos.net.host.PortAddresses; +import org.onlab.packet.IpPrefix; +import org.onlab.packet.MacAddress; import org.slf4j.Logger; -import com.google.common.collect.Sets; - /** * Simple configuration module to read in supplementary network configuration * from a file. @@ -51,9 +53,29 @@ public class NetworkConfigReader { DeviceId.deviceId(dpidToUri(entry.getDpid())), PortNumber.portNumber(entry.getPortNumber())); + Set ipAddresses = new HashSet(); + + for (String strIp : entry.getIpAddresses()) { + try { + IpPrefix address = IpPrefix.valueOf(strIp); + ipAddresses.add(address); + } catch (IllegalArgumentException e) { + log.warn("Bad format for IP address in config: {}", strIp); + } + } + + MacAddress macAddress = null; + if (entry.getMacAddress() != null) { + try { + macAddress = MacAddress.valueOf(entry.getMacAddress()); + } catch (IllegalArgumentException e) { + log.warn("Bad format for MAC address in config: {}", + entry.getMacAddress()); + } + } + PortAddresses addresses = new PortAddresses(cp, - Sets.newHashSet(entry.getIpAddresses()), - entry.getMacAddress()); + ipAddresses, macAddress); hostAdminService.bindAddressesToPort(addresses); } diff --git a/core/net/src/main/java/org/onlab/onos/net/host/impl/HostManager.java b/core/net/src/main/java/org/onlab/onos/net/host/impl/HostManager.java index e3f53fe2ef..88b6923046 100644 --- a/core/net/src/main/java/org/onlab/onos/net/host/impl/HostManager.java +++ b/core/net/src/main/java/org/onlab/onos/net/host/impl/HostManager.java @@ -1,5 +1,10 @@ package org.onlab.onos.net.host.impl; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.Set; + import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -12,6 +17,7 @@ import org.onlab.onos.net.ConnectPoint; import org.onlab.onos.net.DeviceId; import org.onlab.onos.net.Host; import org.onlab.onos.net.HostId; +import org.onlab.onos.net.device.DeviceService; import org.onlab.onos.net.host.HostAdminService; import org.onlab.onos.net.host.HostDescription; import org.onlab.onos.net.host.HostEvent; @@ -23,6 +29,7 @@ import org.onlab.onos.net.host.HostService; import org.onlab.onos.net.host.HostStore; import org.onlab.onos.net.host.HostStoreDelegate; import org.onlab.onos.net.host.PortAddresses; +import org.onlab.onos.net.packet.PacketService; import org.onlab.onos.net.provider.AbstractProviderRegistry; import org.onlab.onos.net.provider.AbstractProviderService; import org.onlab.packet.IpAddress; @@ -31,11 +38,6 @@ import org.onlab.packet.MacAddress; import org.onlab.packet.VlanId; import org.slf4j.Logger; -import java.util.Set; - -import static com.google.common.base.Preconditions.checkNotNull; -import static org.slf4j.LoggerFactory.getLogger; - /** * Provides basic implementation of the host SB & NB APIs. */ @@ -59,12 +61,22 @@ public class HostManager @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected EventDeliveryService eventDispatcher; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected DeviceService deviceService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected PacketService packetService; + + private HostMonitor monitor; @Activate public void activate() { + log.info("Started"); store.setDelegate(delegate); eventDispatcher.addSink(HostEvent.class, listenerRegistry); - log.info("Started"); + + monitor = new HostMonitor(deviceService, packetService, this); + } @Deactivate @@ -76,6 +88,8 @@ public class HostManager @Override protected HostProviderService createProviderService(HostProvider provider) { + monitor.registerHostProvider(provider); + return new InternalHostProviderService(provider); } @@ -126,12 +140,12 @@ public class HostManager @Override public void startMonitoringIp(IpAddress ip) { - // TODO pass through to HostMonitor + monitor.addMonitoringFor(ip); } @Override public void stopMonitoringIp(IpAddress ip) { - // TODO pass through to HostMonitor + monitor.stopMonitoring(ip); } @Override diff --git a/core/net/src/main/java/org/onlab/onos/net/host/impl/HostMonitor.java b/core/net/src/main/java/org/onlab/onos/net/host/impl/HostMonitor.java index a5aa13e5cf..9f8dd48d71 100644 --- a/core/net/src/main/java/org/onlab/onos/net/host/impl/HostMonitor.java +++ b/core/net/src/main/java/org/onlab/onos/net/host/impl/HostMonitor.java @@ -2,10 +2,11 @@ package org.onlab.onos.net.host.impl; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.jboss.netty.util.Timeout; @@ -21,19 +22,19 @@ import org.onlab.onos.net.flow.TrafficTreatment; import org.onlab.onos.net.flow.instructions.Instruction; import org.onlab.onos.net.flow.instructions.Instructions; import org.onlab.onos.net.host.HostProvider; -import org.onlab.onos.net.host.HostService; -import org.onlab.onos.net.host.HostStore; import org.onlab.onos.net.host.PortAddresses; import org.onlab.onos.net.packet.DefaultOutboundPacket; import org.onlab.onos.net.packet.OutboundPacket; import org.onlab.onos.net.packet.PacketService; -import org.onlab.onos.net.topology.TopologyService; +import org.onlab.onos.net.provider.ProviderId; import org.onlab.packet.ARP; import org.onlab.packet.Ethernet; import org.onlab.packet.IpAddress; import org.onlab.packet.IpPrefix; import org.onlab.packet.MacAddress; import org.onlab.util.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Monitors hosts on the dataplane to detect changes in host data. @@ -43,9 +44,7 @@ import org.onlab.util.Timer; * probe for hosts that have not yet been detected (specified by IP address). */ public class HostMonitor implements TimerTask { - - private static final byte[] DEFAULT_MAC_ADDRESS = - MacAddress.valueOf("00:00:00:00:00:01").getAddress(); + private static final Logger log = LoggerFactory.getLogger(HostMonitor.class); private static final byte[] ZERO_MAC_ADDRESS = MacAddress.valueOf("00:00:00:00:00:00").getAddress(); @@ -54,59 +53,77 @@ public class HostMonitor implements TimerTask { private static final byte[] BROADCAST_MAC = MacAddress.valueOf("ff:ff:ff:ff:ff:ff").getAddress(); - private final HostService hostService; - private final TopologyService topologyService; - private final DeviceService deviceService; - private final HostProvider hostProvider; - private final PacketService packetService; - private final HostStore hostStore; + private DeviceService deviceService; + private PacketService packetService; + private HostManager hostManager; private final Set monitoredAddresses; + private final Map hostProviders; + private final long probeRate; private final Timeout timeout; - public HostMonitor(HostService hostService, TopologyService topologyService, + public HostMonitor( DeviceService deviceService, - HostProvider hostProvider, PacketService packetService, - HostStore hostStore) { - this.hostService = hostService; - this.topologyService = topologyService; + PacketService packetService, + HostManager hostService) { + this.deviceService = deviceService; - this.hostProvider = hostProvider; this.packetService = packetService; - this.hostStore = hostStore; + this.hostManager = hostService; monitoredAddresses = new HashSet<>(); + hostProviders = new ConcurrentHashMap<>(); probeRate = 30000; // milliseconds timeout = Timer.getTimer().newTimeout(this, 0, TimeUnit.MILLISECONDS); + + addDefaultAddresses(); } - public void addMonitoringFor(IpAddress ip) { + private void addDefaultAddresses() { + //monitoredAddresses.add(IpAddress.valueOf("10.0.0.1")); + } + + void addMonitoringFor(IpAddress ip) { monitoredAddresses.add(ip); } - public void stopMonitoring(IpAddress ip) { + void stopMonitoring(IpAddress ip) { monitoredAddresses.remove(ip); } - public void shutdown() { + void shutdown() { timeout.cancel(); } + void registerHostProvider(HostProvider provider) { + hostProviders.put(provider.id(), provider); + } + + void unregisterHostProvider(HostProvider provider) { + // TODO find out how to call this + } + @Override public void run(Timeout timeout) throws Exception { for (IpAddress ip : monitoredAddresses) { - Set hosts = Collections.emptySet(); //TODO hostService.getHostsByIp(ip); + // TODO have to convert right now because the HostService API uses IpPrefix + IpPrefix prefix = IpPrefix.valueOf(ip.toOctets()); + + Set hosts = hostManager.getHostsByIp(prefix); if (hosts.isEmpty()) { sendArpRequest(ip); } else { for (Host host : hosts) { - hostProvider.triggerProbe(host); + HostProvider provider = hostProviders.get(host.providerId()); + if (provider != null) { + provider.triggerProbe(host); + } } } } @@ -120,29 +137,26 @@ public class HostMonitor implements TimerTask { * @param targetIp IP address to ARP for */ private void sendArpRequest(IpAddress targetIp) { - // Find ports with an IP address in the target's subnet and sent ARP // probes out those ports. for (Device device : deviceService.getDevices()) { for (Port port : deviceService.getPorts(device.id())) { ConnectPoint cp = new ConnectPoint(device.id(), port.number()); - PortAddresses addresses = hostStore.getAddressBindingsForPort(cp); + PortAddresses addresses = hostManager.getAddressBindingsForPort(cp); - /*for (IpPrefix prefix : addresses.ips()) { + for (IpPrefix prefix : addresses.ips()) { if (prefix.contains(targetIp)) { - sendProbe(device.id(), port, addresses, targetIp); + sendProbe(device.id(), port, targetIp, + prefix.toIpAddress(), addresses.mac()); } - }*/ + } } } - - // TODO case where no address was found. - // Broadcast out internal edge ports? } - private void sendProbe(DeviceId deviceId, Port port, PortAddresses portAddresses, - IpAddress targetIp) { - Ethernet arpPacket = createArpFor(targetIp, portAddresses); + private void sendProbe(DeviceId deviceId, Port port, IpAddress targetIp, + IpAddress sourceIp, MacAddress sourceMac) { + Ethernet arpPacket = buildArpRequest(targetIp, sourceIp, sourceMac); List instructions = new ArrayList<>(); instructions.add(Instructions.createOutput(port.number())); @@ -158,31 +172,26 @@ public class HostMonitor implements TimerTask { packetService.emit(outboundPacket); } - private Ethernet createArpFor(IpAddress targetIp, PortAddresses portAddresses) { + private Ethernet buildArpRequest(IpAddress targetIp, IpAddress sourceIp, + MacAddress sourceMac) { ARP arp = new ARP(); arp.setHardwareType(ARP.HW_TYPE_ETHERNET) - .setHardwareAddressLength((byte) Ethernet.DATALAYER_ADDRESS_LENGTH) - .setProtocolType(ARP.PROTO_TYPE_IP) - .setProtocolAddressLength((byte) IpPrefix.INET_LEN); + .setHardwareAddressLength((byte) Ethernet.DATALAYER_ADDRESS_LENGTH) + .setProtocolType(ARP.PROTO_TYPE_IP) + .setProtocolAddressLength((byte) IpPrefix.INET_LEN) + .setOpCode(ARP.OP_REQUEST); - byte[] sourceMacAddress; - if (portAddresses.mac() == null) { - sourceMacAddress = DEFAULT_MAC_ADDRESS; - } else { - sourceMacAddress = portAddresses.mac().getAddress(); - } - - arp.setSenderHardwareAddress(sourceMacAddress) - //TODO .setSenderProtocolAddress(portAddresses.ips().toOctets()) - .setTargetHardwareAddress(ZERO_MAC_ADDRESS) - .setTargetProtocolAddress(targetIp.toOctets()); + arp.setSenderHardwareAddress(sourceMac.getAddress()) + .setSenderProtocolAddress(sourceIp.toOctets()) + .setTargetHardwareAddress(ZERO_MAC_ADDRESS) + .setTargetProtocolAddress(targetIp.toOctets()); Ethernet ethernet = new Ethernet(); ethernet.setEtherType(Ethernet.TYPE_ARP) - .setDestinationMACAddress(BROADCAST_MAC) - .setSourceMACAddress(sourceMacAddress) - .setPayload(arp); + .setDestinationMACAddress(BROADCAST_MAC) + .setSourceMACAddress(sourceMac.getAddress()) + .setPayload(arp); return ethernet; } diff --git a/utils/misc/src/main/java/org/onlab/packet/IpPrefix.java b/utils/misc/src/main/java/org/onlab/packet/IpPrefix.java index b205f90167..84acb8222c 100644 --- a/utils/misc/src/main/java/org/onlab/packet/IpPrefix.java +++ b/utils/misc/src/main/java/org/onlab/packet/IpPrefix.java @@ -250,6 +250,17 @@ public final class IpPrefix { return new IpPrefix(version, host, netmask); } + /** + * Returns an IpAddress of the bytes contained in this prefix. + * FIXME this is a hack for now and only works because IpPrefix doesn't + * mask the input bytes on creation. + * + * @return the IpAddress + */ + public IpAddress toIpAddress() { + return IpAddress.valueOf(octets); + } + public boolean isMasked() { return mask() != 0; } @@ -278,6 +289,17 @@ public final class IpPrefix { return false; } + public boolean contains(IpAddress address) { + // Need to get the network address because prefixes aren't automatically + // masked on creation + IpPrefix meMasked = network(); + + IpPrefix otherMasked = + IpPrefix.valueOf(address.octets, netmask).network(); + + return Arrays.equals(meMasked.octets, otherMasked.octets); + } + @Override public int hashCode() { final int prime = 31; @@ -303,6 +325,7 @@ public final class IpPrefix { if (netmask != other.netmask) { return false; } + // TODO not quite right until we mask the input if (!Arrays.equals(octets, other.octets)) { return false; } diff --git a/utils/misc/src/test/java/org/onlab/packet/IpPrefixTest.java b/utils/misc/src/test/java/org/onlab/packet/IpPrefixTest.java index f6bf6f168d..297a0f336b 100644 --- a/utils/misc/src/test/java/org/onlab/packet/IpPrefixTest.java +++ b/utils/misc/src/test/java/org/onlab/packet/IpPrefixTest.java @@ -76,7 +76,7 @@ public class IpPrefixTest { } @Test - public void testContains() { + public void testContainsIpPrefix() { IpPrefix slash31 = IpPrefix.valueOf(BYTES1, 31); IpPrefix slash32 = IpPrefix.valueOf(BYTES1, 32); IpPrefix differentSlash32 = IpPrefix.valueOf(BYTES2, 32); @@ -96,4 +96,17 @@ public class IpPrefixTest { assertTrue(slash8.contains(slash31)); assertFalse(slash31.contains(slash8)); } + + @Test + public void testContainsIpAddress() { + IpPrefix slash31 = IpPrefix.valueOf(BYTES1, 31); + IpAddress slash32 = IpAddress.valueOf(BYTES1, 32); + + assertTrue(slash31.contains(slash32)); + + IpPrefix intf = IpPrefix.valueOf("192.168.10.101/24"); + IpAddress addr = IpAddress.valueOf("192.168.10.1"); + + assertTrue(intf.contains(addr)); + } } From 3b0dfd53bdf6fa9f0cacc7a65a6ba0c79a60b496 Mon Sep 17 00:00:00 2001 From: Madan Jampani Date: Thu, 2 Oct 2014 16:48:13 -0700 Subject: [PATCH 07/10] Javadoc improvements --- .../cluster/impl/DistributedClusterStore.java | 4 ++-- .../messaging/ClusterMessageHandler.java | 8 ++++++++ .../cluster/messaging/MessageSubject.java | 2 ++ .../cluster/messaging/MessageSubscriber.java | 18 ------------------ ...r.java => ClusterCommunicationManager.java} | 2 +- .../impl/ClusterCommunicationManagerTest.java | 10 +++++----- .../java/org/onlab/netty/package-info.java | 4 ++++ 7 files changed, 22 insertions(+), 26 deletions(-) delete mode 100644 core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java rename core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/{OnosClusterCommunicationManager.java => ClusterCommunicationManager.java} (99%) create mode 100644 utils/netty/src/main/java/org/onlab/netty/package-info.java diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java index e25c964012..9408cc95c2 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java @@ -20,7 +20,7 @@ import org.onlab.onos.cluster.DefaultControllerNode; import org.onlab.onos.cluster.NodeId; import org.onlab.onos.store.AbstractStore; import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService; -import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager; +import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager; import org.onlab.packet.IpPrefix; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +50,7 @@ public class DistributedClusterStore private final Map states = new ConcurrentHashMap<>(); private final Cache livenessCache = CacheBuilder.newBuilder() .maximumSize(1000) - .expireAfterWrite(OnosClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS) + .expireAfterWrite(ClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS) .removalListener(new LivenessCacheRemovalListener()).build(); @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java index 15e756ddf5..7ec27ecf73 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java @@ -1,5 +1,13 @@ package org.onlab.onos.store.cluster.messaging; +/** + * Interface for handling cluster messages. + */ public interface ClusterMessageHandler { + + /** + * Handles/Processes the cluster message. + * @param message cluster message. + */ public void handle(ClusterMessage message); } \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java index 4c9eefac78..ee8d9c1660 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java @@ -2,6 +2,8 @@ package org.onlab.onos.store.cluster.messaging; /** * Representation of a message subject. + * Cluster messages have associated subjects that dictate how they get handled + * on the receiving side. */ public class MessageSubject { diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java deleted file mode 100644 index 666ac6d157..0000000000 --- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.onlab.onos.store.cluster.messaging; - -import org.onlab.onos.cluster.NodeId; - -/** - * Represents a message consumer. - */ -public interface MessageSubscriber { - - /** - * Receives the specified cluster message. - * - * @param message message to be received - * @param fromNodeId node from which the message was received - */ - void receive(Object messagePayload, NodeId fromNodeId); - -} diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/OnosClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java similarity index 99% rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/OnosClusterCommunicationManager.java rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java index e6e4a4d84c..d4fd9c02b1 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/OnosClusterCommunicationManager.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java @@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory; @Component(immediate = true) @Service -public class OnosClusterCommunicationManager +public class ClusterCommunicationManager implements ClusterCommunicationService, ClusterCommunicationAdminService { private final Logger log = LoggerFactory.getLogger(getClass()); diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java index 44e54217a8..bba12f2b6b 100644 --- a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java +++ b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java @@ -6,7 +6,7 @@ import org.junit.Ignore; import org.junit.Test; import org.onlab.onos.cluster.DefaultControllerNode; import org.onlab.onos.cluster.NodeId; -import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager; +import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager; import org.onlab.netty.NettyMessagingService; import org.onlab.packet.IpPrefix; @@ -29,8 +29,8 @@ public class ClusterCommunicationManagerTest { private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1"); - private OnosClusterCommunicationManager ccm1; - private OnosClusterCommunicationManager ccm2; + private ClusterCommunicationManager ccm1; + private ClusterCommunicationManager ccm2; private TestDelegate cnd1 = new TestDelegate(); private TestDelegate cnd2 = new TestDelegate(); @@ -46,11 +46,11 @@ public class ClusterCommunicationManagerTest { NettyMessagingService messagingService = new NettyMessagingService(); messagingService.activate(); - ccm1 = new OnosClusterCommunicationManager(); + ccm1 = new ClusterCommunicationManager(); // ccm1.serializationService = messageSerializer; ccm1.activate(); - ccm2 = new OnosClusterCommunicationManager(); + ccm2 = new ClusterCommunicationManager(); // ccm2.serializationService = messageSerializer; ccm2.activate(); diff --git a/utils/netty/src/main/java/org/onlab/netty/package-info.java b/utils/netty/src/main/java/org/onlab/netty/package-info.java new file mode 100644 index 0000000000..b1b90a3728 --- /dev/null +++ b/utils/netty/src/main/java/org/onlab/netty/package-info.java @@ -0,0 +1,4 @@ +/** + * Asynchronous messaging APIs implemented using the Netty framework. + */ +package org.onlab.netty; \ No newline at end of file From 707a7e62f2bb161f8af280c0119b00cc6c73c4dd Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Thu, 2 Oct 2014 16:35:49 -0700 Subject: [PATCH 08/10] moved commons-pool to root pom.xml Change-Id: Ic366fd4c790d6a9b9c43d6b593e034427eefa6a6 --- pom.xml | 5 +++++ utils/netty/pom.xml | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 56bbd744a6..cb00f32295 100644 --- a/pom.xml +++ b/pom.xml @@ -248,6 +248,11 @@ tests test + + commons-pool + commons-pool + 1.6 + diff --git a/utils/netty/pom.xml b/utils/netty/pom.xml index d3351174f0..a980d1d178 100644 --- a/utils/netty/pom.xml +++ b/utils/netty/pom.xml @@ -42,7 +42,6 @@ commons-pool commons-pool - 1.6 From 79cbd1caf6bd596edfe91379704ba8080680b114 Mon Sep 17 00:00:00 2001 From: Yuta HIGUCHI Date: Thu, 2 Oct 2014 16:57:57 -0700 Subject: [PATCH 09/10] Ignore setRole if switch is not connected to that node. Change-Id: I40fb0b768294926924b29c7715a91c95df9b9664 --- .../openflow/controller/impl/OpenFlowControllerImpl.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java index eb122865c2..e8ebcd1bbe 100644 --- a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java +++ b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java @@ -169,7 +169,12 @@ public class OpenFlowControllerImpl implements OpenFlowController { @Override public void setRole(Dpid dpid, RoleState role) { - getSwitch(dpid).setRole(role); + final OpenFlowSwitch sw = getSwitch(dpid); + if (sw == null) { + log.debug("Switch not connected. Ignoring setRole({}, {})", dpid, role); + return; + } + sw.setRole(role); } /** From ba5ac487b5f5cb72e91ae9bc536113e4e3045228 Mon Sep 17 00:00:00 2001 From: alshabib Date: Thu, 2 Oct 2014 17:15:20 -0700 Subject: [PATCH 10/10] flowrules no longer install a timeout but are monitored by onos in order to be expired Change-Id: Ibd1a5952349d7ccb27c92b4982d04574f31424c0 --- .../onlab/onos/fwd/ReactiveForwarding.java | 10 +- .../onlab/onos/net/flow/DefaultFlowRule.java | 40 +++-- .../onos/net/flow/DefaultTrafficSelector.java | 44 +++++- .../org/onlab/onos/net/flow/FlowRule.java | 8 +- .../onlab/onos/net/flow/FlowRuleProvider.java | 2 + .../net/flow/FlowRuleProviderService.java | 21 --- .../onlab/onos/net/flow/TrafficSelector.java | 4 +- .../onos/net/flow/criteria/Criteria.java | 138 ++++++++++++++++++ .../onos/net/flow/impl/FlowRuleManager.java | 51 +++++-- .../net/flow/impl/FlowRuleManagerTest.java | 44 ++++-- .../trivial/impl/SimpleFlowRuleStore.java | 19 +-- .../provider/of/flow/impl/FlowModBuilder.java | 1 - .../of/flow/impl/FlowRuleBuilder.java | 5 +- .../of/flow/impl/OpenFlowRuleProvider.java | 2 +- .../src/main/resources/onos/checkstyle.xml | 2 +- 15 files changed, 293 insertions(+), 98 deletions(-) diff --git a/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java b/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java index f7fbdbbfbc..aaf535058f 100644 --- a/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java +++ b/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java @@ -38,6 +38,8 @@ import org.slf4j.Logger; @Component(immediate = true) public class ReactiveForwarding { + private static final int TIMEOUT = 10; + private final Logger log = getLogger(getClass()); @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) @@ -184,15 +186,15 @@ public class ReactiveForwarding { Ethernet inPkt = context.inPacket().parsed(); TrafficSelector.Builder builder = new DefaultTrafficSelector.Builder(); builder.matchEthType(inPkt.getEtherType()) - .matchEthSrc(inPkt.getSourceMAC()) - .matchEthDst(inPkt.getDestinationMAC()) - .matchInport(context.inPacket().receivedFrom().port()); + .matchEthSrc(inPkt.getSourceMAC()) + .matchEthDst(inPkt.getDestinationMAC()) + .matchInport(context.inPacket().receivedFrom().port()); TrafficTreatment.Builder treat = new DefaultTrafficTreatment.Builder(); treat.setOutput(portNumber); FlowRule f = new DefaultFlowRule(context.inPacket().receivedFrom().deviceId(), - builder.build(), treat.build(), 0, appId); + builder.build(), treat.build(), 0, appId, TIMEOUT); flowRuleService.applyFlowRules(f); } diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java index f705a945f0..bb4805bc24 100644 --- a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java +++ b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java @@ -27,11 +27,12 @@ public class DefaultFlowRule implements FlowRule { private final ApplicationId appId; - private boolean expired; + private final int timeout; public DefaultFlowRule(DeviceId deviceId, TrafficSelector selector, TrafficTreatment treatment, int priority, FlowRuleState state, - long life, long packets, long bytes, long flowId, boolean expired) { + long life, long packets, long bytes, long flowId, boolean expired, + int timeout) { this.deviceId = deviceId; this.priority = priority; this.selector = selector; @@ -39,26 +40,30 @@ public class DefaultFlowRule implements FlowRule { this.state = state; this.appId = ApplicationId.valueOf((int) (flowId >> 32)); this.id = FlowId.valueOf(flowId); - this.expired = expired; this.life = life; this.packets = packets; this.bytes = bytes; this.created = System.currentTimeMillis(); + this.timeout = timeout; } public DefaultFlowRule(DeviceId deviceId, TrafficSelector selector, - TrafficTreatment treatement, int priority, ApplicationId appId) { - this(deviceId, selector, treatement, priority, FlowRuleState.CREATED, appId); + TrafficTreatment treatement, int priority, ApplicationId appId, + int timeout) { + this(deviceId, selector, treatement, priority, + FlowRuleState.CREATED, appId, timeout); } public DefaultFlowRule(FlowRule rule, FlowRuleState state) { this(rule.deviceId(), rule.selector(), rule.treatment(), - rule.priority(), state, rule.id(), rule.appId()); + rule.priority(), state, rule.id(), rule.appId(), + rule.timeout()); } private DefaultFlowRule(DeviceId deviceId, TrafficSelector selector, TrafficTreatment treatment, - int priority, FlowRuleState state, ApplicationId appId) { + int priority, FlowRuleState state, ApplicationId appId, + int timeout) { this.deviceId = deviceId; this.priority = priority; this.selector = selector; @@ -69,13 +74,16 @@ public class DefaultFlowRule implements FlowRule { this.bytes = 0; this.appId = appId; + this.timeout = timeout; + this.id = FlowId.valueOf((((long) appId().id()) << 32) | (this.hash() & 0xffffffffL)); this.created = System.currentTimeMillis(); } private DefaultFlowRule(DeviceId deviceId, TrafficSelector selector, TrafficTreatment treatment, - int priority, FlowRuleState state, FlowId flowId, ApplicationId appId) { + int priority, FlowRuleState state, FlowId flowId, ApplicationId appId, + int timeout) { this.deviceId = deviceId; this.priority = priority; this.selector = selector; @@ -86,6 +94,7 @@ public class DefaultFlowRule implements FlowRule { this.bytes = 0; this.appId = appId; this.id = flowId; + this.timeout = timeout; this.created = System.currentTimeMillis(); } @@ -149,7 +158,7 @@ public class DefaultFlowRule implements FlowRule { * @see java.lang.Object#equals(java.lang.Object) */ public int hashCode() { - return Objects.hash(deviceId, id); + return Objects.hash(deviceId, selector, priority); } public int hash() { @@ -170,7 +179,10 @@ public class DefaultFlowRule implements FlowRule { if (obj instanceof DefaultFlowRule) { DefaultFlowRule that = (DefaultFlowRule) obj; return Objects.equals(deviceId, that.deviceId) && - Objects.equals(id, that.id); + //Objects.equals(id, that.id) && + Objects.equals(priority, that.priority) && + Objects.equals(selector, that.selector); + } return false; } @@ -181,16 +193,16 @@ public class DefaultFlowRule implements FlowRule { .add("id", id) .add("deviceId", deviceId) .add("priority", priority) - .add("selector", selector) - .add("treatment", treatment) + .add("selector", selector.criteria()) + .add("treatment", treatment == null ? "N/A" : treatment.instructions()) .add("created", created) .add("state", state) .toString(); } @Override - public boolean expired() { - return expired; + public int timeout() { + return timeout > MAX_TIMEOUT ? MAX_TIMEOUT : this.timeout; } } diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java index 8f68ea5709..d792c7e898 100644 --- a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java +++ b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java @@ -3,8 +3,9 @@ package org.onlab.onos.net.flow; import static org.slf4j.LoggerFactory.getLogger; import java.util.Collections; -import java.util.LinkedList; -import java.util.List; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; import org.onlab.onos.net.PortNumber; import org.onlab.onos.net.flow.criteria.Criteria; @@ -16,22 +17,42 @@ import org.slf4j.Logger; public final class DefaultTrafficSelector implements TrafficSelector { - private final List selector; + private final Set selector; - private DefaultTrafficSelector(List selector) { - this.selector = Collections.unmodifiableList(selector); + private DefaultTrafficSelector(Set selector) { + this.selector = Collections.unmodifiableSet(selector); } @Override - public List criteria() { + public Set criteria() { return selector; } + @Override + public int hashCode() { + return Objects.hash(selector); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof DefaultTrafficSelector) { + DefaultTrafficSelector that = (DefaultTrafficSelector) obj; + return Objects.equals(selector, that.selector); + + } + return false; + } + + + public static class Builder implements TrafficSelector.Builder { private final Logger log = getLogger(getClass()); - private final List selector = new LinkedList<>(); + private final Set selector = new HashSet<>(); @Override public Builder add(Criterion criterion) { @@ -39,38 +60,47 @@ public final class DefaultTrafficSelector implements TrafficSelector { return this; } + @Override public Builder matchInport(PortNumber port) { return add(Criteria.matchInPort(port)); } + @Override public Builder matchEthSrc(MacAddress addr) { return add(Criteria.matchEthSrc(addr)); } + @Override public Builder matchEthDst(MacAddress addr) { return add(Criteria.matchEthDst(addr)); } + @Override public Builder matchEthType(short ethType) { return add(Criteria.matchEthType(ethType)); } + @Override public Builder matchVlanId(VlanId vlanId) { return add(Criteria.matchVlanId(vlanId)); } + @Override public Builder matchVlanPcp(Byte vlanPcp) { return add(Criteria.matchVlanPcp(vlanPcp)); } + @Override public Builder matchIPProtocol(Byte proto) { return add(Criteria.matchIPProtocol(proto)); } + @Override public Builder matchIPSrc(IpPrefix ip) { return add(Criteria.matchIPSrc(ip)); } + @Override public Builder matchIPDst(IpPrefix ip) { return add(Criteria.matchIPDst(ip)); } diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java index 2728e21efa..4d1b3cf9f7 100644 --- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java +++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java @@ -9,6 +9,7 @@ import org.onlab.onos.net.DeviceId; */ public interface FlowRule { + static final int MAX_TIMEOUT = 60; public enum FlowRuleState { /** @@ -112,10 +113,9 @@ public interface FlowRule { long bytes(); /** - * Indicates that this flow has expired at the device. - * - * @return true if it has expired, false otherwise + * Returns the timeout for this flow requested by an application. + * @return integer value of the timeout */ - boolean expired(); + int timeout(); } diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java index b2c3d30e8e..c4e2f926d3 100644 --- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java +++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java @@ -8,6 +8,8 @@ import org.onlab.onos.net.provider.Provider; */ public interface FlowRuleProvider extends Provider { + static final int POLL_INTERVAL = 5; + /** * Instructs the provider to apply the specified flow rules to their * respective devices. diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProviderService.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProviderService.java index 01e437236f..20761037cc 100644 --- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProviderService.java +++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProviderService.java @@ -16,27 +16,6 @@ public interface FlowRuleProviderService extends ProviderService criteria(); + Set criteria(); /** * Builder of traffic selector entities. diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java b/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java index 758c51c4d5..a819bd32cb 100644 --- a/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java +++ b/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java @@ -2,6 +2,8 @@ package org.onlab.onos.net.flow.criteria; import static com.google.common.base.MoreObjects.toStringHelper; +import java.util.Objects; + import org.onlab.onos.net.PortNumber; import org.onlab.onos.net.flow.criteria.Criterion.Type; import org.onlab.packet.IpPrefix; @@ -137,6 +139,25 @@ public final class Criteria { return toStringHelper(type().toString()) .add("port", port).toString(); } + + @Override + public int hashCode() { + return Objects.hash(port); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof PortCriterion) { + PortCriterion that = (PortCriterion) obj; + return Objects.equals(port, that.port); + + } + return false; + } + } @@ -164,6 +185,27 @@ public final class Criteria { .add("mac", mac).toString(); } + @Override + public int hashCode() { + return Objects.hash(mac, type); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof EthCriterion) { + EthCriterion that = (EthCriterion) obj; + return Objects.equals(mac, that.mac) && + Objects.equals(type, that.type); + + + } + return false; + } + + } public static final class EthTypeCriterion implements Criterion { @@ -189,6 +231,25 @@ public final class Criteria { .add("ethType", Long.toHexString(ethType)).toString(); } + @Override + public int hashCode() { + return Objects.hash(ethType); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof EthTypeCriterion) { + EthTypeCriterion that = (EthTypeCriterion) obj; + return Objects.equals(ethType, that.ethType); + + + } + return false; + } + } @@ -217,6 +278,26 @@ public final class Criteria { .add("ip", ip).toString(); } + @Override + public int hashCode() { + return Objects.hash(ip, type); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof IPCriterion) { + IPCriterion that = (IPCriterion) obj; + return Objects.equals(ip, that.ip) && + Objects.equals(type, that.type); + + + } + return false; + } + } @@ -243,6 +324,25 @@ public final class Criteria { .add("protocol", Long.toHexString(proto)).toString(); } + @Override + public int hashCode() { + return Objects.hash(proto); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof IPProtocolCriterion) { + IPProtocolCriterion that = (IPProtocolCriterion) obj; + return Objects.equals(proto, that.proto); + + + } + return false; + } + } @@ -269,6 +369,25 @@ public final class Criteria { .add("pcp", Long.toHexString(vlanPcp)).toString(); } + @Override + public int hashCode() { + return Objects.hash(vlanPcp); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof VlanPcpCriterion) { + VlanPcpCriterion that = (VlanPcpCriterion) obj; + return Objects.equals(vlanPcp, that.vlanPcp); + + + } + return false; + } + } @@ -296,6 +415,25 @@ public final class Criteria { .add("id", vlanId).toString(); } + @Override + public int hashCode() { + return Objects.hash(vlanId); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof VlanIdCriterion) { + VlanIdCriterion that = (VlanIdCriterion) obj; + return Objects.equals(vlanId, that.vlanId); + + + } + return false; + } + } diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java index a6f5ebbd5e..00619b38f2 100644 --- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java +++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java @@ -5,6 +5,9 @@ import static org.slf4j.LoggerFactory.getLogger; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; @@ -59,6 +62,8 @@ implements FlowRuleService, FlowRuleProviderRegistry { @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected DeviceService deviceService; + private final Map deadRounds = new ConcurrentHashMap<>(); + @Activate public void activate() { store.setDelegate(delegate); @@ -84,6 +89,7 @@ implements FlowRuleService, FlowRuleProviderRegistry { FlowRule f = flowRules[i]; final Device device = deviceService.getDevice(f.deviceId()); final FlowRuleProvider frp = getProvider(device.providerId()); + deadRounds.put(f, new AtomicInteger(0)); store.storeFlowRule(f); frp.applyFlowRule(f); } @@ -98,6 +104,7 @@ implements FlowRuleService, FlowRuleProviderRegistry { f = flowRules[i]; device = deviceService.getDevice(f.deviceId()); frp = getProvider(device.providerId()); + deadRounds.remove(f); store.deleteFlowRule(f); frp.removeFlowRule(f); } @@ -161,11 +168,7 @@ implements FlowRuleService, FlowRuleProviderRegistry { switch (stored.state()) { case ADDED: case PENDING_ADD: - if (flowRule.expired()) { - event = store.removeFlowRule(flowRule); - } else { frp.applyFlowRule(stored); - } break; case PENDING_REMOVE: case REMOVED: @@ -181,8 +184,8 @@ implements FlowRuleService, FlowRuleProviderRegistry { } } - @Override - public void flowMissing(FlowRule flowRule) { + + private void flowMissing(FlowRule flowRule) { checkNotNull(flowRule, FLOW_RULE_NULL); checkValidity(); Device device = deviceService.getDevice(flowRule.deviceId()); @@ -209,29 +212,47 @@ implements FlowRuleService, FlowRuleProviderRegistry { } - @Override - public void extraneousFlow(FlowRule flowRule) { + + private void extraneousFlow(FlowRule flowRule) { checkNotNull(flowRule, FLOW_RULE_NULL); checkValidity(); removeFlowRules(flowRule); log.debug("Flow {} is on switch but not in store.", flowRule); } - @Override - public void flowAdded(FlowRule flowRule) { + + private void flowAdded(FlowRule flowRule) { checkNotNull(flowRule, FLOW_RULE_NULL); checkValidity(); - FlowRuleEvent event = store.addOrUpdateFlowRule(flowRule); - if (event == null) { - log.debug("No flow store event generated."); + if (deadRounds.containsKey(flowRule) && + checkRuleLiveness(flowRule, store.getFlowRule(flowRule))) { + + FlowRuleEvent event = store.addOrUpdateFlowRule(flowRule); + if (event == null) { + log.debug("No flow store event generated."); + } else { + log.debug("Flow {} {}", flowRule, event.type()); + post(event); + } } else { - log.debug("Flow {} {}", flowRule, event.type()); - post(event); + removeFlowRules(flowRule); } } + private boolean checkRuleLiveness(FlowRule swRule, FlowRule storedRule) { + int timeout = storedRule.timeout(); + if (storedRule.packets() != swRule.packets()) { + deadRounds.get(swRule).set(0); + return true; + } + + return (deadRounds.get(swRule).getAndIncrement() * + FlowRuleProvider.POLL_INTERVAL) <= timeout; + + } + // Posts the specified event to the local event dispatcher. private void post(FlowRuleEvent event) { if (event != null) { diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java index 5ff72a2f1a..0b451c061f 100644 --- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java +++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java @@ -9,7 +9,9 @@ import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED; import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; import org.junit.After; import org.junit.Before; @@ -42,6 +44,7 @@ import org.onlab.onos.net.provider.AbstractProvider; import org.onlab.onos.net.provider.ProviderId; import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -52,6 +55,7 @@ public class FlowRuleManagerTest { private static final ProviderId PID = new ProviderId("of", "foo"); private static final DeviceId DID = DeviceId.deviceId("of:001"); + private static final int TIMEOUT = 10; private static final Device DEV = new DefaultDevice( PID, DID, Type.SWITCH, "", "", "", ""); @@ -96,7 +100,7 @@ public class FlowRuleManagerTest { private FlowRule flowRule(int tsval, int trval) { TestSelector ts = new TestSelector(tsval); TestTreatment tr = new TestTreatment(trval); - return new DefaultFlowRule(DID, ts, tr, 0, appId); + return new DefaultFlowRule(DID, ts, tr, 0, appId, TIMEOUT); } private FlowRule flowRule(FlowRule rule, FlowRuleState state) { @@ -105,7 +109,8 @@ public class FlowRuleManagerTest { private FlowRule addFlowRule(int hval) { FlowRule rule = flowRule(hval, hval); - providerService.flowAdded(rule); + service.applyFlowRules(rule); + assertNotNull("rule should be found", service.getFlowEntries(DID)); return rule; } @@ -135,13 +140,18 @@ public class FlowRuleManagerTest { public void getFlowEntries() { assertTrue("store should be empty", Sets.newHashSet(service.getFlowEntries(DID)).isEmpty()); - addFlowRule(1); - addFlowRule(2); + FlowRule f1 = addFlowRule(1); + FlowRule f2 = addFlowRule(2); + assertEquals("2 rules should exist", 2, flowCount()); + + providerService.pushFlowMetrics(DID, ImmutableList.of(f1, f2)); validateEvents(RULE_ADDED, RULE_ADDED); addFlowRule(1); assertEquals("should still be 2 rules", 2, flowCount()); + + providerService.pushFlowMetrics(DID, ImmutableList.of(f1)); validateEvents(RULE_UPDATED); } @@ -179,8 +189,10 @@ public class FlowRuleManagerTest { public void removeFlowRules() { FlowRule f1 = addFlowRule(1); FlowRule f2 = addFlowRule(2); - addFlowRule(3); + FlowRule f3 = addFlowRule(3); assertEquals("3 rules should exist", 3, flowCount()); + + providerService.pushFlowMetrics(DID, ImmutableList.of(f1, f2, f3)); validateEvents(RULE_ADDED, RULE_ADDED, RULE_ADDED); FlowRule rem1 = flowRule(f1, FlowRuleState.REMOVED); @@ -200,8 +212,9 @@ public class FlowRuleManagerTest { @Test public void flowRemoved() { FlowRule f1 = addFlowRule(1); + FlowRule f2 = addFlowRule(2); + providerService.pushFlowMetrics(f1.deviceId(), ImmutableList.of(f1, f2)); service.removeFlowRules(f1); - addFlowRule(2); FlowRule rem1 = flowRule(f1, FlowRuleState.REMOVED); providerService.flowRemoved(rem1); validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED); @@ -209,9 +222,11 @@ public class FlowRuleManagerTest { providerService.flowRemoved(rem1); validateEvents(); - FlowRule f3 = flowRule(flowRule(3, 3), FlowRuleState.ADDED); - providerService.flowAdded(f3); + FlowRule f3 = flowRule(3, 3); + service.applyFlowRules(f3); + providerService.pushFlowMetrics(f3.deviceId(), Collections.singletonList(f3)); validateEvents(RULE_ADDED); + providerService.flowRemoved(f3); validateEvents(); } @@ -223,9 +238,10 @@ public class FlowRuleManagerTest { FlowRule f3 = flowRule(3, 3); + + mgr.applyFlowRules(f1, f2, f3); FlowRule updatedF1 = flowRule(f1, FlowRuleState.ADDED); FlowRule updatedF2 = flowRule(f2, FlowRuleState.ADDED); - mgr.applyFlowRules(f1, f2, f3); providerService.pushFlowMetrics(DID, Lists.newArrayList(updatedF1, updatedF2)); @@ -233,7 +249,7 @@ public class FlowRuleManagerTest { validateState(FlowRuleState.PENDING_ADD, FlowRuleState.ADDED, FlowRuleState.ADDED)); - validateEvents(RULE_UPDATED, RULE_UPDATED); + validateEvents(RULE_ADDED, RULE_ADDED); } @Test @@ -241,15 +257,15 @@ public class FlowRuleManagerTest { FlowRule f1 = flowRule(1, 1); FlowRule f2 = flowRule(2, 2); FlowRule f3 = flowRule(3, 3); + mgr.applyFlowRules(f1, f2); FlowRule updatedF1 = flowRule(f1, FlowRuleState.ADDED); FlowRule updatedF2 = flowRule(f2, FlowRuleState.ADDED); FlowRule updatedF3 = flowRule(f3, FlowRuleState.ADDED); - mgr.applyFlowRules(f1, f2); providerService.pushFlowMetrics(DID, Lists.newArrayList(updatedF1, updatedF2, updatedF3)); - validateEvents(RULE_UPDATED, RULE_UPDATED); + validateEvents(RULE_ADDED, RULE_ADDED); } @@ -271,7 +287,7 @@ public class FlowRuleManagerTest { providerService.pushFlowMetrics(DID, Lists.newArrayList(updatedF1, updatedF2)); - validateEvents(RULE_UPDATED, RULE_UPDATED, RULE_REMOVED); + validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED); } @@ -386,7 +402,7 @@ public class FlowRuleManagerTest { } @Override - public List criteria() { + public Set criteria() { return null; } diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java index 2f43211923..d12d00e401 100644 --- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java +++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java @@ -1,6 +1,5 @@ package org.onlab.onos.store.trivial.impl; -import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED; import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED; import static org.slf4j.LoggerFactory.getLogger; @@ -116,18 +115,21 @@ public class SimpleFlowRuleStore DeviceId did = rule.deviceId(); // check if this new rule is an update to an existing entry - if (flowEntries.containsEntry(did, rule)) { - //synchronized (flowEntries) { + FlowRule stored = getFlowRule(rule); + if (stored != null) { // Multimaps support duplicates so we have to remove our rule // and replace it with the current version. flowEntries.remove(did, rule); flowEntries.put(did, rule); - //} + + if (stored.state() == FlowRuleState.PENDING_ADD) { + return new FlowRuleEvent(Type.RULE_ADDED, rule); + } return new FlowRuleEvent(Type.RULE_UPDATED, rule); } flowEntries.put(did, rule); - return new FlowRuleEvent(RULE_ADDED, rule); + return null; } @Override @@ -140,11 +142,4 @@ public class SimpleFlowRuleStore } //} } - - - - - - - } diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java index 86ab701a9d..ade651ed8d 100644 --- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java +++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java @@ -77,7 +77,6 @@ public class FlowModBuilder { .setCookie(U64.of(cookie.value())) .setBufferId(OFBufferId.NO_BUFFER) .setActions(actions) - .setIdleTimeout(10) .setMatch(match) .setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM)) .setPriority(priority) diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowRuleBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowRuleBuilder.java index ac00f05dfa..eba228266e 100644 --- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowRuleBuilder.java +++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowRuleBuilder.java @@ -71,7 +71,7 @@ public class FlowRuleBuilder { buildSelector(), buildTreatment(), stat.getPriority(), FlowRuleState.ADDED, stat.getDurationNsec() / 1000000, stat.getPacketCount().getValue(), stat.getByteCount().getValue(), - stat.getCookie().getValue(), false); + stat.getCookie().getValue(), false, stat.getIdleTimeout()); } else { // TODO: revisit potentially. return new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)), @@ -79,7 +79,8 @@ public class FlowRuleBuilder { FlowRuleState.REMOVED, removed.getDurationNsec() / 1000000, removed.getPacketCount().getValue(), removed.getByteCount().getValue(), removed.getCookie().getValue(), - removed.getReason() == OFFlowRemovedReason.IDLE_TIMEOUT.ordinal()); + removed.getReason() == OFFlowRemovedReason.IDLE_TIMEOUT.ordinal(), + stat.getIdleTimeout()); } } diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java index bf29ae4329..24a7ea8658 100644 --- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java +++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java @@ -127,7 +127,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr @Override public void switchAdded(Dpid dpid) { - FlowStatsCollector fsc = new FlowStatsCollector(controller.getSwitch(dpid), 5); + FlowStatsCollector fsc = new FlowStatsCollector(controller.getSwitch(dpid), POLL_INTERVAL); fsc.start(); collectors.put(dpid, fsc); } diff --git a/tools/build/conf/src/main/resources/onos/checkstyle.xml b/tools/build/conf/src/main/resources/onos/checkstyle.xml index 06413aae48..dad602d63d 100644 --- a/tools/build/conf/src/main/resources/onos/checkstyle.xml +++ b/tools/build/conf/src/main/resources/onos/checkstyle.xml @@ -176,7 +176,7 @@ - +