diff --git a/core/store/primitives/pom.xml b/core/store/primitives/pom.xml index aec1ea21df..f3db107d0a 100644 --- a/core/store/primitives/pom.xml +++ b/core/store/primitives/pom.xml @@ -70,18 +70,18 @@ io.atomix atomix - 1.0.4 + 1.0.5 io.atomix.catalyst catalyst-netty - 1.1.2 + 1.2.0 io.atomix.catalyst catalyst-transport - 1.1.2 + 1.2.0 org.onosproject diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java index 75a75d7ff4..c88242c6f0 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java @@ -90,7 +90,7 @@ public class CopycatTransportClient implements Client { endpoint, messagingService, context); - connection.closeListener(connections::remove); + connection.onClose(connections::remove); connections.add(connection); future.complete(connection); log.debug("Created connection {}-{} to {}", partitionId, connectionId, address); diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java index 9d7edd063e..eebbf9c8d3 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java @@ -15,24 +15,6 @@ */ package org.onosproject.store.primitives.impl; -import com.google.common.base.Throwables; -import io.atomix.catalyst.concurrent.Listener; -import io.atomix.catalyst.concurrent.Listeners; -import io.atomix.catalyst.concurrent.ThreadContext; -import io.atomix.catalyst.serializer.SerializationException; -import io.atomix.catalyst.transport.Connection; -import io.atomix.catalyst.transport.MessageHandler; -import io.atomix.catalyst.transport.TransportException; -import io.atomix.catalyst.util.reference.ReferenceCounted; -import org.apache.commons.io.IOUtils; -import org.onlab.util.Tools; -import org.onosproject.cluster.PartitionId; -import org.onosproject.store.cluster.messaging.Endpoint; -import org.onosproject.store.cluster.messaging.MessagingException; -import org.onosproject.store.cluster.messaging.MessagingService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -44,6 +26,24 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; +import java.util.function.Function; + +import com.google.common.base.Throwables; +import io.atomix.catalyst.concurrent.Listener; +import io.atomix.catalyst.concurrent.Listeners; +import io.atomix.catalyst.concurrent.ThreadContext; +import io.atomix.catalyst.serializer.SerializationException; +import io.atomix.catalyst.transport.Connection; +import io.atomix.catalyst.transport.TransportException; +import io.atomix.catalyst.util.reference.ReferenceCounted; +import org.apache.commons.io.IOUtils; +import org.onlab.util.Tools; +import org.onosproject.cluster.PartitionId; +import org.onosproject.store.cluster.messaging.Endpoint; +import org.onosproject.store.cluster.messaging.MessagingException; +import org.onosproject.store.cluster.messaging.MessagingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.checkNotNull; import static org.onosproject.store.primitives.impl.CopycatTransport.CLOSE; @@ -85,7 +85,33 @@ public class CopycatTransportConnection implements Connection { } @Override - public CompletableFuture send(T message) { + public CompletableFuture send(Object message) { + ThreadContext context = ThreadContext.currentContextOrThrow(); + CompletableFuture future = new CompletableFuture<>(); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + DataOutputStream dos = new DataOutputStream(baos); + dos.writeByte(MESSAGE); + context.serializer().writeObject(message, baos); + if (message instanceof ReferenceCounted) { + ((ReferenceCounted) message).release(); + } + + messagingService.sendAsync(endpoint, remoteSubject, baos.toByteArray()) + .whenComplete((r, e) -> { + if (e != null) { + context.executor().execute(() -> future.completeExceptionally(e)); + } else { + context.executor().execute(() -> future.complete(null)); + } + }); + } catch (SerializationException | IOException e) { + future.completeExceptionally(e); + } + return future; + } + + @Override + public CompletableFuture sendAndReceive(T message) { ThreadContext context = ThreadContext.currentContextOrThrow(); CompletableFuture future = new CompletableFuture<>(); try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { @@ -210,7 +236,15 @@ public class CopycatTransportConnection implements Connection { } @Override - public Connection handler(Class type, MessageHandler handler) { + public Connection handler(Class type, Consumer handler) { + return handler(type, r -> { + handler.accept(r); + return null; + }); + } + + @Override + public Connection handler(Class type, Function> handler) { if (log.isTraceEnabled()) { log.trace("Registered handler on connection {}-{}: {}", partitionId, connectionId, type); } @@ -219,12 +253,12 @@ public class CopycatTransportConnection implements Connection { } @Override - public Listener exceptionListener(Consumer consumer) { + public Listener onException(Consumer consumer) { return exceptionListeners.add(consumer); } @Override - public Listener closeListener(Consumer consumer) { + public Listener onClose(Consumer consumer) { return closeListeners.add(consumer); } @@ -329,10 +363,10 @@ public class CopycatTransportConnection implements Connection { * Internal container for a handler/context pair. */ private static class InternalHandler { - private final MessageHandler handler; + private final Function handler; private final ThreadContext context; - InternalHandler(MessageHandler handler, ThreadContext context) { + InternalHandler(Function handler, ThreadContext context) { this.handler = handler; this.context = context; } @@ -340,13 +374,18 @@ public class CopycatTransportConnection implements Connection { @SuppressWarnings("unchecked") CompletableFuture handle(Object message) { CompletableFuture future = new CompletableFuture<>(); - context.execute(() -> handler.handle(message).whenComplete((r, e) -> { - if (e != null) { - future.completeExceptionally((Throwable) e); - } else { - future.complete(r); + context.executor().execute(() -> { + CompletableFuture responseFuture = (CompletableFuture) handler.apply(message); + if (responseFuture != null) { + responseFuture.whenComplete((r, e) -> { + if (e != null) { + future.completeExceptionally((Throwable) e); + } else { + future.complete(r); + } + }); } - })); + }); return future; } } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java index a38dea3eaf..8de05a36f3 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java @@ -76,7 +76,7 @@ public class CopycatTransportServer implements Server { sender, messagingService, context); - connection.closeListener(connections::remove); + connection.onClose(connections::remove); connections.add(connection); CompletableFuture future = new CompletableFuture<>(); diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/CopycatTransportTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/CopycatTransportTest.java index 21b4d704be..de76d0dd6a 100644 --- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/CopycatTransportTest.java +++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/CopycatTransportTest.java @@ -15,27 +15,6 @@ */ package org.onosproject.store.primitives.impl; -import com.google.common.collect.Lists; -import io.atomix.catalyst.concurrent.SingleThreadContext; -import io.atomix.catalyst.concurrent.ThreadContext; -import io.atomix.catalyst.transport.Address; -import io.atomix.catalyst.transport.Client; -import io.atomix.catalyst.transport.Server; -import io.atomix.catalyst.transport.Transport; -import io.atomix.copycat.protocol.ConnectRequest; -import io.atomix.copycat.protocol.ConnectResponse; -import io.atomix.copycat.protocol.PublishRequest; -import io.atomix.copycat.protocol.PublishResponse; -import io.atomix.copycat.protocol.Response; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.onlab.packet.IpAddress; -import org.onlab.util.Tools; -import org.onosproject.cluster.PartitionId; -import org.onosproject.store.cluster.messaging.Endpoint; -import org.onosproject.store.cluster.messaging.MessagingService; - import java.time.Duration; import java.util.Map; import java.util.UUID; @@ -47,6 +26,25 @@ import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.BiFunction; +import com.google.common.collect.Lists; +import io.atomix.catalyst.concurrent.SingleThreadContext; +import io.atomix.catalyst.concurrent.ThreadContext; +import io.atomix.catalyst.transport.Address; +import io.atomix.catalyst.transport.Client; +import io.atomix.catalyst.transport.Server; +import io.atomix.catalyst.transport.Transport; +import io.atomix.copycat.protocol.ConnectRequest; +import io.atomix.copycat.protocol.ConnectResponse; +import io.atomix.copycat.protocol.Response; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onlab.packet.IpAddress; +import org.onlab.util.Tools; +import org.onosproject.cluster.PartitionId; +import org.onosproject.store.cluster.messaging.Endpoint; +import org.onosproject.store.cluster.messaging.MessagingService; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -135,7 +133,7 @@ public class CopycatTransportTest { } catch (InterruptedException e) { fail(); } - connection.send(ConnectRequest.builder() + connection.sendAndReceive(ConnectRequest.builder() .withClientId(UUID.randomUUID().toString()) .build()) .thenAccept(response -> { @@ -166,15 +164,12 @@ public class CopycatTransportTest { serverContext.checkThread(); latch.countDown(); serverContext.schedule(Duration.ofMillis(100), () -> { - connection.send(PublishRequest.builder() - .withSession(1) - .withEventIndex(3) - .withPreviousIndex(2) + connection.sendAndReceive(ConnectRequest.builder() + .withClientId("foo") .build()) .thenAccept(response -> { serverContext.checkThread(); assertEquals(Response.Status.OK, response.status()); - assertEquals(1, response.index()); latch.countDown(); }); }); @@ -187,15 +182,14 @@ public class CopycatTransportTest { client.connect(new Address(IP_STRING, endpoint2.port())).thenAccept(connection -> { clientContext.checkThread(); latch.countDown(); - connection.handler(PublishRequest.class, request -> { + connection.handler(ConnectRequest.class, request -> { clientContext.checkThread(); latch.countDown(); - assertEquals(1, request.session()); - assertEquals(3, request.eventIndex()); - assertEquals(2, request.previousIndex()); - return CompletableFuture.completedFuture(PublishResponse.builder() + assertEquals("foo", request.client()); + return CompletableFuture.completedFuture(ConnectResponse.builder() .withStatus(Response.Status.OK) - .withIndex(1) + .withLeader(new Address(IP_STRING, endpoint2.port())) + .withMembers(Lists.newArrayList(new Address(IP_STRING, endpoint2.port()))) .build()); }); }); @@ -219,7 +213,7 @@ public class CopycatTransportTest { server.listen(new Address(IP_STRING, endpoint2.port()), connection -> { serverContext.checkThread(); latch.countDown(); - connection.closeListener(c -> { + connection.onClose(c -> { serverContext.checkThread(); latch.countDown(); }); @@ -232,7 +226,7 @@ public class CopycatTransportTest { client.connect(new Address(IP_STRING, endpoint2.port())).thenAccept(connection -> { clientContext.checkThread(); latch.countDown(); - connection.closeListener(c -> { + connection.onClose(c -> { clientContext.checkThread(); latch.countDown(); }); @@ -263,7 +257,7 @@ public class CopycatTransportTest { server.listen(new Address(IP_STRING, endpoint2.port()), connection -> { serverContext.checkThread(); latch.countDown(); - connection.closeListener(c -> { + connection.onClose(c -> { latch.countDown(); }); serverContext.schedule(Duration.ofMillis(100), () -> { @@ -281,7 +275,7 @@ public class CopycatTransportTest { client.connect(new Address(IP_STRING, endpoint2.port())).thenAccept(connection -> { clientContext.checkThread(); latch.countDown(); - connection.closeListener(c -> { + connection.onClose(c -> { latch.countDown(); }); }); diff --git a/lib/BUCK b/lib/BUCK index 679dff84e5..601d2b0419 100644 --- a/lib/BUCK +++ b/lib/BUCK @@ -1,4 +1,4 @@ -# ***** This file was auto-generated at Fri, 31 Mar 2017 06:11:30 GMT. Do not edit this file manually. ***** +# ***** This file was auto-generated at Mon, 17 Apr 2017 19:01:18 GMT. Do not edit this file manually. ***** # ***** Use onos-lib-gen ***** pass_thru_pom( @@ -149,10 +149,10 @@ remote_jar ( remote_jar ( name = 'atomix', - out = 'atomix-all-1.0.4.jar', - url = 'mvn:io.atomix:atomix-all:jar:1.0.4', - sha1 = '45a61e7efa49015d2637866cc174f9b86679c4d4', - maven_coords = 'io.atomix:atomix-all:1.0.4', + out = 'atomix-all-1.0.5.jar', + url = 'mvn:io.atomix:atomix-all:jar:1.0.5', + sha1 = 'd92d41fb74dd8cc0e708606b3588ef5bc3a58e45', + maven_coords = 'io.atomix:atomix-all:1.0.5', visibility = [ 'PUBLIC' ], ) @@ -824,28 +824,28 @@ remote_jar ( remote_jar ( name = 'catalyst-concurrent', - out = 'catalyst-concurrent-1.1.2.jar', - url = 'mvn:io.atomix.catalyst:catalyst-concurrent:jar:1.1.2', - sha1 = 'f7323bc5a1ebfd4710574cf5485956c949eec20b', - maven_coords = 'io.atomix.catalyst:catalyst-concurrent:1.1.2', + out = 'catalyst-concurrent-1.2.0.jar', + url = 'mvn:io.atomix.catalyst:catalyst-concurrent:jar:1.2.0', + sha1 = 'ba91527a1c0a68c8f46cc591ef0dded3d2d0c298', + maven_coords = 'io.atomix.catalyst:catalyst-concurrent:1.2.0', visibility = [ 'PUBLIC' ], ) remote_jar ( name = 'catalyst-netty', - out = 'catalyst-netty-1.1.2.jar', - url = 'mvn:io.atomix.catalyst:catalyst-netty:jar:1.1.2', - sha1 = '3072b53b1a6c0686ceccff3d371f8ea8aa28a273', - maven_coords = 'io.atomix.catalyst:catalyst-netty:1.1.2', + out = 'catalyst-netty-1.2.0.jar', + url = 'mvn:io.atomix.catalyst:catalyst-netty:jar:1.2.0', + sha1 = 'abb694b6fe835eb66d30ae6979ec0f7e4ac2e738', + maven_coords = 'io.atomix.catalyst:catalyst-netty:1.2.0', visibility = [ 'PUBLIC' ], ) remote_jar ( name = 'catalyst-transport', - out = 'catalyst-transport-1.1.2.jar', - url = 'mvn:io.atomix.catalyst:catalyst-transport:jar:1.1.2', - sha1 = '4a50b74deb6601d7fdca34e873a016c83db7bebf', - maven_coords = 'io.atomix.catalyst:catalyst-transport:1.1.2', + out = 'catalyst-transport-1.2.0.jar', + url = 'mvn:io.atomix.catalyst:catalyst-transport:jar:1.2.0', + sha1 = '1469017e168a5e611fa4c251273184a763e0cd7f', + maven_coords = 'io.atomix.catalyst:catalyst-transport:1.2.0', visibility = [ 'PUBLIC' ], ) diff --git a/lib/deps.json b/lib/deps.json index 55e94de202..c0ea59fb77 100644 --- a/lib/deps.json +++ b/lib/deps.json @@ -93,7 +93,7 @@ "aopalliance-repackaged": "mvn:org.glassfish.hk2.external:aopalliance-repackaged:2.5.0-b30", "amqp-client": "mvn:com.rabbitmq:amqp-client:jar:3.6.1", "asm": "mvn:org.ow2.asm:asm:5.0.4", - "atomix": "mvn:io.atomix:atomix-all:1.0.4", + "atomix": "mvn:io.atomix:atomix-all:1.0.5", "commons-codec": "mvn:commons-codec:commons-codec:1.10", "commons-collections": "mvn:commons-collections:commons-collections:3.2.2", "commons-configuration": "mvn:commons-configuration:commons-configuration:1.10", @@ -168,9 +168,9 @@ "netty-transport": "mvn:io.netty:netty-transport:4.1.5.Final", "netty-transport-native-epoll": "mvn:io.netty:netty-transport-native-epoll:4.1.5.Final", "netty-resolver": "mvn:io.netty:netty-resolver:4.1.5.Final", - "catalyst-concurrent": "mvn:io.atomix.catalyst:catalyst-concurrent:1.1.2", - "catalyst-netty": "mvn:io.atomix.catalyst:catalyst-netty:1.1.2", - "catalyst-transport": "mvn:io.atomix.catalyst:catalyst-transport:1.1.2", + "catalyst-concurrent": "mvn:io.atomix.catalyst:catalyst-concurrent:1.2.0", + "catalyst-netty": "mvn:io.atomix.catalyst:catalyst-netty:1.2.0", + "catalyst-transport": "mvn:io.atomix.catalyst:catalyst-transport:1.2.0", "objenesis": "mvn:org.objenesis:objenesis:2.2", "openflowj": "mvn:org.onosproject:openflowj:0.9.7.onos", "org.apache.felix.scr": "mvn:org.apache.felix:org.apache.felix.scr:1.8.2",