diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java index 9942740f43..6f6bd6d147 100644 --- a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java +++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java @@ -1,5 +1,6 @@ package org.onlab.nio; +import com.google.common.collect.Lists; import org.onlab.util.Counter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,6 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static java.lang.String.format; +import static java.lang.System.currentTimeMillis; import static java.lang.System.out; import static org.onlab.nio.IOLoopTestServer.PORT; import static org.onlab.util.Tools.delay; @@ -46,15 +48,18 @@ public class IOLoopTestClient { Counter messages; Counter bytes; + long latencyTotal = 0; + long latencyCount = 0; + /** * Main entry point to launch the client. * * @param args command-line arguments - * @throws IOException if unable to connect to server - * @throws InterruptedException if latch wait gets interrupted - * @throws ExecutionException if wait gets interrupted - * @throws TimeoutException if timeout occurred while waiting for completion + * @throws java.io.IOException if unable to connect to server + * @throws InterruptedException if latch wait gets interrupted + * @throws java.util.concurrent.ExecutionException if wait gets interrupted + * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion */ public static void main(String[] args) throws IOException, InterruptedException, ExecutionException, TimeoutException { @@ -95,7 +100,7 @@ public class IOLoopTestClient { * @param mc message count to send per client * @param ml message length in bytes * @param port socket port - * @throws IOException if unable to create IO loops + * @throws java.io.IOException if unable to create IO loops */ public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException { this.ip = ip; @@ -113,7 +118,7 @@ public class IOLoopTestClient { /** * Starts the client workers. * - * @throws IOException if unable to open connection + * @throws java.io.IOException if unable to open connection */ public void start() throws IOException { messages = new Counter(); @@ -141,7 +146,7 @@ public class IOLoopTestClient { * channel with the given IO loop. * * @param loop loop with which the channel should be registered - * @throws IOException if the socket could not be open or connected + * @throws java.io.IOException if the socket could not be open or connected */ private void openConnection(CustomIOLoop loop) throws IOException { SocketAddress sa = new InetSocketAddress(ip, port); @@ -156,15 +161,17 @@ public class IOLoopTestClient { * Waits for the client workers to complete. * * @param secs timeout in seconds - * @throws ExecutionException if execution failed - * @throws InterruptedException if interrupt occurred while waiting - * @throws TimeoutException if timeout occurred + * @throws java.util.concurrent.ExecutionException if execution failed + * @throws InterruptedException if interrupt occurred while waiting + * @throws java.util.concurrent.TimeoutException if timeout occurred */ public void await(int secs) throws InterruptedException, ExecutionException, TimeoutException { for (CustomIOLoop l : iloops) { if (l.worker.task != null) { l.worker.task.get(secs, TimeUnit.SECONDS); + latencyTotal += l.latencyTotal; + latencyCount += l.latencyCount; } } messages.freeze(); @@ -176,10 +183,11 @@ public class IOLoopTestClient { */ public void report() { DecimalFormat f = new DecimalFormat("#,##0"); - out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs", + out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ms latency", f.format(messages.total()), f.format(bytes.total()), f.format(messages.throughput()), - f.format(bytes.throughput() / (1024 * msgLength)))); + f.format(bytes.throughput() / (1024 * msgLength)), + f.format(latencyTotal / latencyCount))); } @@ -187,6 +195,9 @@ public class IOLoopTestClient { private class CustomIOLoop extends IOLoop { Worker worker = new Worker(); + long latencyTotal = 0; + long latencyCount = 0; + public CustomIOLoop() throws IOException { super(500); @@ -217,7 +228,12 @@ public class IOLoopTestClient { @Override protected void processMessages(List messages, - MessageStream b) { + MessageStream stream) { + for (TestMessage message : messages) { + // TODO: summarize latency data better + latencyTotal += currentTimeMillis() - message.requestorTime(); + latencyCount++; + } worker.release(messages.size()); } @@ -239,15 +255,15 @@ public class IOLoopTestClient { private static final int BATCH_SIZE = 1000; private static final int PERMITS = 2 * BATCH_SIZE; - private TestMessageStream b; + private TestMessageStream stream; private FutureTask task; // Stuff to throttle pump private final Semaphore semaphore = new Semaphore(PERMITS); private int msgWritten; - void pump(TestMessageStream b) { - this.b = b; + void pump(TestMessageStream stream) { + this.stream = stream; task = new FutureTask<>(this, this); wpool.execute(task); } @@ -257,18 +273,15 @@ public class IOLoopTestClient { try { log.info("Worker started..."); - List batch = new ArrayList<>(); - for (int i = 0; i < BATCH_SIZE; i++) { - batch.add(new TestMessage(msgLength)); - } - while (msgWritten < msgCount) { - msgWritten += writeBatch(b, batch); + int size = Math.min(BATCH_SIZE, msgCount - msgWritten); + writeBatch(size); + msgWritten += size; } // Now try to get all the permits back before sending poison pill semaphore.acquireUninterruptibly(PERMITS); - b.close(); + stream.close(); log.info("Worker done..."); @@ -278,18 +291,15 @@ public class IOLoopTestClient { } - private int writeBatch(TestMessageStream b, List batch) - throws IOException { - int count = Math.min(BATCH_SIZE, msgCount - msgWritten); - acquire(count); - if (count == BATCH_SIZE) { - b.write(batch); - } else { - for (int i = 0; i < count; i++) { - b.write(batch.get(i)); - } + private void writeBatch(int size) throws IOException { + // Build a batch of messages + List batch = Lists.newArrayListWithCapacity(size); + for (int i = 0; i < size; i++) { + batch.add(new TestMessage(msgLength, currentTimeMillis(), 0, + stream.padding())); } - return count; + acquire(size); + stream.write(batch); } diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java index 3bcbaa15cc..18566d7a43 100644 --- a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java +++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java @@ -1,5 +1,6 @@ package org.onlab.nio; +import com.google.common.collect.Lists; import org.onlab.util.Counter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,8 +20,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static java.lang.String.format; +import static java.lang.System.currentTimeMillis; import static java.lang.System.out; -import static org.onlab.junit.TestTools.delay; +import static org.onlab.util.Tools.delay; import static org.onlab.util.Tools.namedThreads; /** @@ -58,7 +60,7 @@ public class IOLoopTestServer { * Main entry point to launch the server. * * @param args command-line arguments - * @throws IOException if unable to crate IO loops + * @throws java.io.IOException if unable to crate IO loops */ public static void main(String[] args) throws IOException { startStandalone(args); @@ -94,7 +96,7 @@ public class IOLoopTestServer { * @param wc worker count * @param ml message length in bytes * @param port listen port - * @throws IOException if unable to create IO loops + * @throws java.io.IOException if unable to create IO loops */ public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException { this.workerCount = wc; @@ -199,11 +201,20 @@ public class IOLoopTestServer { protected void processMessages(List messages, MessageStream stream) { try { - stream.write(messages); + stream.write(createResponses(messages)); } catch (IOException e) { log.error("Unable to echo messages", e); } } + + private List createResponses(List messages) { + List responses = Lists.newArrayListWithCapacity(messages.size()); + for (TestMessage message : messages) { + responses.add(new TestMessage(message.length(), message.requestorTime(), + currentTimeMillis(), message.padding())); + } + return responses; + } } // Loop for accepting client connections diff --git a/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java b/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java index 583d0ec5a6..40b5a4f625 100644 --- a/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java +++ b/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java @@ -23,11 +23,10 @@ import static org.junit.Assert.assertNull; */ public class MessageStreamTest { - private static final int SIZE = 16; - private static final TestMessage MESSAGE = new TestMessage(SIZE); - + private static final int SIZE = 64; private static final int BIG_SIZE = 32 * 1024; - private static final TestMessage BIG_MESSAGE = new TestMessage(BIG_SIZE); + + private TestMessage message; private TestIOLoop loop; private TestByteChannel channel; @@ -41,6 +40,8 @@ public class MessageStreamTest { key = new TestKey(channel); stream = loop.createStream(channel); stream.setKey(key); + stream.setNonStrict(); + message = new TestMessage(SIZE, 0, 0, stream.padding()); } @After @@ -68,11 +69,13 @@ public class MessageStreamTest { public void bufferGrowth() throws IOException { // Create a stream for big messages and test the growth. stream = new TestMessageStream(BIG_SIZE, channel, loop); - stream.write(BIG_MESSAGE); - stream.write(BIG_MESSAGE); - stream.write(BIG_MESSAGE); - stream.write(BIG_MESSAGE); - stream.write(BIG_MESSAGE); + TestMessage bigMessage = new TestMessage(BIG_SIZE, 0, 0, stream.padding()); + + stream.write(bigMessage); + stream.write(bigMessage); + stream.write(bigMessage); + stream.write(bigMessage); + stream.write(bigMessage); } @Test @@ -102,25 +105,25 @@ public class MessageStreamTest { validate(false, false, 0, 0); // First write is immediate... - stream.write(MESSAGE); + stream.write(message); validate(false, false, 0, SIZE); // Second and third get buffered... - stream.write(MESSAGE); + stream.write(message); validate(false, true, 0, SIZE); - stream.write(MESSAGE); + stream.write(message); validate(false, true, 0, SIZE); // Reset write, which will flush if needed; the next write is again buffered stream.flushIfWriteNotPending(); validate(false, false, 0, SIZE * 3); - stream.write(MESSAGE); + stream.write(message); validate(false, true, 0, SIZE * 3); // Select reset, which will flush if needed; the next write is again buffered stream.flushIfPossible(); validate(false, false, 0, SIZE * 4); - stream.write(MESSAGE); + stream.write(message); validate(false, true, 0, SIZE * 4); stream.flush(); validate(false, true, 0, SIZE * 4); @@ -132,10 +135,10 @@ public class MessageStreamTest { // First write is immediate... List messages = new ArrayList<>(); - messages.add(MESSAGE); - messages.add(MESSAGE); - messages.add(MESSAGE); - messages.add(MESSAGE); + messages.add(message); + messages.add(message); + messages.add(message); + messages.add(message); stream.write(messages); validate(false, false, 0, SIZE * 4); @@ -152,14 +155,14 @@ public class MessageStreamTest { validate(false, false, 0, 0); // First write is immediate... - stream.write(MESSAGE); + stream.write(message); validate(false, false, 0, SIZE); // Tell test channel to accept only half. channel.bytesToWrite = SIZE / 2; // Second and third get buffered... - stream.write(MESSAGE); + stream.write(message); validate(false, true, 0, SIZE); stream.flushIfPossible(); validate(true, true, 0, SIZE + SIZE / 2); @@ -170,14 +173,14 @@ public class MessageStreamTest { validate(false, false, 0, 0); // First write is immediate... - stream.write(MESSAGE); + stream.write(message); validate(false, false, 0, SIZE); // Tell test channel to accept only half. channel.bytesToWrite = SIZE / 2; // Second and third get buffered... - stream.write(MESSAGE); + stream.write(message); validate(false, true, 0, SIZE); stream.flushIfWriteNotPending(); validate(true, true, 0, SIZE + SIZE / 2); @@ -190,7 +193,7 @@ public class MessageStreamTest { assertEquals(1, messages.size()); validate(false, false, SIZE + 4, 0); - stream.write(MESSAGE); + stream.write(message); validate(false, false, SIZE + 4, SIZE); channel.bytesToRead = SIZE - 4; diff --git a/utils/nio/src/test/java/org/onlab/nio/TestMessage.java b/utils/nio/src/test/java/org/onlab/nio/TestMessage.java index 00315ecae8..1ce469af8a 100644 --- a/utils/nio/src/test/java/org/onlab/nio/TestMessage.java +++ b/utils/nio/src/test/java/org/onlab/nio/TestMessage.java @@ -1,39 +1,41 @@ package org.onlab.nio; +import static com.google.common.base.Preconditions.checkNotNull; + /** - * Fixed-length message. + * Test message for measuring rate and round-trip latency. */ public class TestMessage extends AbstractMessage { - private final byte[] data; + private final byte[] padding; - /** - * Creates a new message with the specified length. - * - * @param length message length - */ - public TestMessage(int length) { - this.length = length; - data = new byte[length]; - } + private final long requestorTime; + private final long responderTime; /** * Creates a new message with the specified data. * - * @param data message data + * @param requestorTime requester time + * @param responderTime responder time + * @param padding message padding */ - TestMessage(byte[] data) { - this.length = data.length; - this.data = data; + TestMessage(int length, long requestorTime, long responderTime, byte[] padding) { + this.length = length; + this.requestorTime = requestorTime; + this.responderTime = responderTime; + this.padding = checkNotNull(padding, "Padding cannot be null"); } - /** - * Gets the backing byte array data. - * - * @return backing byte array - */ - public byte[] data() { - return data; + public long requestorTime() { + return requestorTime; + } + + public long responderTime() { + return responderTime; + } + + public byte[] padding() { + return padding; } } diff --git a/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java b/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java index a8ab8fa710..c357fe4303 100644 --- a/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java +++ b/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java @@ -3,53 +3,72 @@ package org.onlab.nio; import java.nio.ByteBuffer; import java.nio.channels.ByteChannel; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + /** * Fixed-length message transfer buffer. */ public class TestMessageStream extends MessageStream { private static final String E_WRONG_LEN = "Illegal message length: "; + private static final long START_TAG = 0xfeedcafedeaddeedL; + private static final long END_TAG = 0xbeadcafedeaddeedL; + private static final int META_LENGTH = 40; private final int length; + private boolean isStrict = true; - /** - * Create a new buffer for transferring messages of the specified length. - * - * @param length message length - * @param ch backing channel - * @param loop driver loop - */ - public TestMessageStream(int length, ByteChannel ch, - IOLoop loop) { + public TestMessageStream(int length, ByteChannel ch, IOLoop loop) { super(loop, ch, 64 * 1024, 500); + checkArgument(length >= META_LENGTH, "Length must be greater than header length of 40"); this.length = length; } + void setNonStrict() { + isStrict = false; + } + @Override protected TestMessage read(ByteBuffer rb) { if (rb.remaining() < length) { return null; } - TestMessage message = new TestMessage(length); - rb.get(message.data()); - return message; + + long startTag = rb.getLong(); + if (isStrict) { + checkState(startTag == START_TAG, "Incorrect message start"); + } + + long size = rb.getLong(); + long requestorTime = rb.getLong(); + long responderTime = rb.getLong(); + byte[] padding = padding(); + rb.get(padding); + + long endTag = rb.getLong(); + if (isStrict) { + checkState(endTag == END_TAG, "Incorrect message end"); + } + + return new TestMessage((int) size, requestorTime, responderTime, padding); } - /** - * {@inheritDoc} - *

- * This implementation enforces the message length against the buffer - * supported length. - * - * @throws IllegalArgumentException if message size does not match the - * supported buffer size - */ @Override protected void write(TestMessage message, ByteBuffer wb) { if (message.length() != length) { throw new IllegalArgumentException(E_WRONG_LEN + message.length()); } - wb.put(message.data()); + + wb.putLong(START_TAG); + wb.putLong(message.length()); + wb.putLong(message.requestorTime()); + wb.putLong(message.responderTime()); + wb.put(message.padding(), 0, length - META_LENGTH); + wb.putLong(END_TAG); } + public byte[] padding() { + return new byte[length - META_LENGTH]; + } }