diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java index 72e7a59222..29a1904de0 100644 --- a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java +++ b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java @@ -1,5 +1,6 @@ package org.onlab.onos.foo; +import com.google.common.collect.Lists; import org.onlab.nio.IOLoop; import org.onlab.nio.MessageStream; import org.onlab.util.Counter; @@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static java.lang.String.format; +import static java.lang.System.currentTimeMillis; import static java.lang.System.out; import static org.onlab.onos.foo.IOLoopTestServer.PORT; import static org.onlab.util.Tools.delay; @@ -48,15 +50,18 @@ public class IOLoopTestClient { Counter messages; Counter bytes; + long latencyTotal = 0; + long latencyCount = 0; + /** * Main entry point to launch the client. * * @param args command-line arguments - * @throws java.io.IOException if unable to connect to server - * @throws InterruptedException if latch wait gets interrupted - * @throws java.util.concurrent.ExecutionException if wait gets interrupted - * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion + * @throws java.io.IOException if unable to connect to server + * @throws InterruptedException if latch wait gets interrupted + * @throws java.util.concurrent.ExecutionException if wait gets interrupted + * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion */ public static void main(String[] args) throws IOException, InterruptedException, ExecutionException, TimeoutException { @@ -158,15 +163,17 @@ public class IOLoopTestClient { * Waits for the client workers to complete. * * @param secs timeout in seconds - * @throws java.util.concurrent.ExecutionException if execution failed - * @throws InterruptedException if interrupt occurred while waiting - * @throws java.util.concurrent.TimeoutException if timeout occurred + * @throws java.util.concurrent.ExecutionException if execution failed + * @throws InterruptedException if interrupt occurred while waiting + * @throws java.util.concurrent.TimeoutException if timeout occurred */ public void await(int secs) throws InterruptedException, ExecutionException, TimeoutException { for (CustomIOLoop l : iloops) { if (l.worker.task != null) { l.worker.task.get(secs, TimeUnit.SECONDS); + latencyTotal += l.latencyTotal; + latencyCount += l.latencyCount; } } messages.freeze(); @@ -178,10 +185,11 @@ public class IOLoopTestClient { */ public void report() { DecimalFormat f = new DecimalFormat("#,##0"); - out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs", + out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ms latency", f.format(messages.total()), f.format(bytes.total()), f.format(messages.throughput()), - f.format(bytes.throughput() / (1024 * msgLength)))); + f.format(bytes.throughput() / (1024 * msgLength)), + f.format(latencyTotal / latencyCount))); } @@ -189,6 +197,9 @@ public class IOLoopTestClient { private class CustomIOLoop extends IOLoop { Worker worker = new Worker(); + long latencyTotal = 0; + long latencyCount = 0; + public CustomIOLoop() throws IOException { super(500); @@ -219,7 +230,12 @@ public class IOLoopTestClient { @Override protected void processMessages(List messages, - MessageStream b) { + MessageStream stream) { + for (TestMessage message : messages) { + // TODO: summarize latency data better + latencyTotal += currentTimeMillis() - message.requestorTime(); + latencyCount++; + } worker.release(messages.size()); } @@ -241,15 +257,15 @@ public class IOLoopTestClient { private static final int BATCH_SIZE = 1000; private static final int PERMITS = 2 * BATCH_SIZE; - private TestMessageStream b; + private TestMessageStream stream; private FutureTask task; // Stuff to throttle pump private final Semaphore semaphore = new Semaphore(PERMITS); private int msgWritten; - void pump(TestMessageStream b) { - this.b = b; + void pump(TestMessageStream stream) { + this.stream = stream; task = new FutureTask<>(this, this); wpool.execute(task); } @@ -259,18 +275,15 @@ public class IOLoopTestClient { try { log.info("Worker started..."); - List batch = new ArrayList<>(); - for (int i = 0; i < BATCH_SIZE; i++) { - batch.add(new TestMessage(msgLength)); - } - while (msgWritten < msgCount) { - msgWritten += writeBatch(b, batch); + int size = Math.min(BATCH_SIZE, msgCount - msgWritten); + writeBatch(size); + msgWritten += size; } // Now try to get all the permits back before sending poison pill semaphore.acquireUninterruptibly(PERMITS); - b.close(); + stream.close(); log.info("Worker done..."); @@ -280,18 +293,15 @@ public class IOLoopTestClient { } - private int writeBatch(TestMessageStream b, List batch) - throws IOException { - int count = Math.min(BATCH_SIZE, msgCount - msgWritten); - acquire(count); - if (count == BATCH_SIZE) { - b.write(batch); - } else { - for (int i = 0; i < count; i++) { - b.write(batch.get(i)); - } + private void writeBatch(int size) throws IOException { + // Build a batch of messages + List batch = Lists.newArrayListWithCapacity(size); + for (int i = 0; i < size; i++) { + batch.add(new TestMessage(msgLength, currentTimeMillis(), 0, + this.stream.padding(msgLength))); } - return count; + acquire(size); + stream.write(batch); } diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestServer.java b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestServer.java index 9c1f649bd4..bb5fee75f6 100644 --- a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestServer.java +++ b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestServer.java @@ -1,5 +1,6 @@ package org.onlab.onos.foo; +import com.google.common.collect.Lists; import org.onlab.nio.AcceptorLoop; import org.onlab.nio.IOLoop; import org.onlab.nio.MessageStream; @@ -22,6 +23,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static java.lang.String.format; +import static java.lang.System.currentTimeMillis; import static java.lang.System.out; import static org.onlab.util.Tools.delay; import static org.onlab.util.Tools.namedThreads; @@ -202,11 +204,20 @@ public class IOLoopTestServer { protected void processMessages(List messages, MessageStream stream) { try { - stream.write(messages); + stream.write(createResponses(messages)); } catch (IOException e) { log.error("Unable to echo messages", e); } } + + private List createResponses(List messages) { + List responses = Lists.newArrayListWithCapacity(messages.size()); + for (TestMessage message : messages) { + responses.add(new TestMessage(message.length(), message.requestorTime(), + currentTimeMillis(), message.padding())); + } + return responses; + } } // Loop for accepting client connections diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/TestMessage.java b/apps/foo/src/main/java/org/onlab/onos/foo/TestMessage.java index b6f1768069..4f0c53b406 100644 --- a/apps/foo/src/main/java/org/onlab/onos/foo/TestMessage.java +++ b/apps/foo/src/main/java/org/onlab/onos/foo/TestMessage.java @@ -2,40 +2,42 @@ package org.onlab.onos.foo; import org.onlab.nio.AbstractMessage; +import static com.google.common.base.Preconditions.checkNotNull; + /** - * Fixed-length message. + * Test message for measuring rate and round-trip latency. */ public class TestMessage extends AbstractMessage { - private final byte[] data; + private final byte[] padding; - /** - * Creates a new message with the specified length. - * - * @param length message length - */ - public TestMessage(int length) { - this.length = length; - data = new byte[length]; - } + private final long requestorTime; + private final long responderTime; /** * Creates a new message with the specified data. * - * @param data message data + * @param requestorTime requester time + * @param responderTime responder time + * @param padding message padding */ - TestMessage(byte[] data) { - this.length = data.length; - this.data = data; + TestMessage(int length, long requestorTime, long responderTime, byte[] padding) { + this.length = length; + this.requestorTime = requestorTime; + this.responderTime = responderTime; + this.padding = checkNotNull(padding, "Padding cannot be null"); } - /** - * Gets the backing byte array data. - * - * @return backing byte array - */ - public byte[] data() { - return data; + public long requestorTime() { + return requestorTime; + } + + public long responderTime() { + return responderTime; + } + + public byte[] padding() { + return padding; } } diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/TestMessageStream.java b/apps/foo/src/main/java/org/onlab/onos/foo/TestMessageStream.java index 4a44682011..a7a3e6a285 100644 --- a/apps/foo/src/main/java/org/onlab/onos/foo/TestMessageStream.java +++ b/apps/foo/src/main/java/org/onlab/onos/foo/TestMessageStream.java @@ -6,24 +6,21 @@ import org.onlab.nio.MessageStream; import java.nio.ByteBuffer; import java.nio.channels.ByteChannel; +import static com.google.common.base.Preconditions.checkState; + /** * Fixed-length message transfer buffer. */ public class TestMessageStream extends MessageStream { private static final String E_WRONG_LEN = "Illegal message length: "; + private static final long START_TAG = 0xfeedcafedeaddeedL; + private static final long END_TAG = 0xbeadcafedeaddeedL; + private static final int META_LENGTH = 40; private final int length; - /** - * Create a new buffer for transferring messages of the specified length. - * - * @param length message length - * @param ch backing channel - * @param loop driver loop - */ - public TestMessageStream(int length, ByteChannel ch, - IOLoop loop) { + public TestMessageStream(int length, ByteChannel ch, IOLoop loop) { super(loop, ch, 64 * 1024, 500); this.length = length; } @@ -33,26 +30,37 @@ public class TestMessageStream extends MessageStream { if (rb.remaining() < length) { return null; } - TestMessage message = new TestMessage(length); - rb.get(message.data()); - return message; + + long startTag = rb.getLong(); + checkState(startTag == START_TAG, "Incorrect message start"); + + long size = rb.getLong(); + long requestorTime = rb.getLong(); + long responderTime = rb.getLong(); + byte[] padding = padding(length); + rb.get(padding); + + long endTag = rb.getLong(); + checkState(endTag == END_TAG, "Incorrect message end"); + + return new TestMessage((int) size, requestorTime, responderTime, padding); } - /** - * {@inheritDoc} - *

- * This implementation enforces the message length against the buffer - * supported length. - * - * @throws IllegalArgumentException if message size does not match the - * supported buffer size - */ @Override protected void write(TestMessage message, ByteBuffer wb) { if (message.length() != length) { throw new IllegalArgumentException(E_WRONG_LEN + message.length()); } - wb.put(message.data()); + + wb.putLong(START_TAG); + wb.putLong(message.length()); + wb.putLong(message.requestorTime()); + wb.putLong(message.responderTime()); + wb.put(message.padding(), 0, length - META_LENGTH); + wb.putLong(END_TAG); } + public byte[] padding(int msgLength) { + return new byte[msgLength - META_LENGTH]; + } }