From 6f74371b5e17b45f6f975c52166c16bdeb03548a Mon Sep 17 00:00:00 2001 From: Madan Jampani Date: Sat, 26 Mar 2016 11:20:25 -0700 Subject: [PATCH] Misc bug fixes in preparation for enabling StorageManager Change-Id: I953414891c901e5d1f92844ca8c4eaa8c042dd53 --- .../impl/CopycatTransportConnection.java | 31 +++++++------------ .../primitives/impl/StoragePartition.java | 2 -- .../impl/AtomixConsistentMapCommands.java | 4 +-- 3 files changed, 13 insertions(+), 24 deletions(-) diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java index fafb2d085b..984a61098e 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java @@ -24,12 +24,14 @@ import java.io.InputStream; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; + + import org.apache.commons.io.IOUtils; import org.onlab.util.Tools; import org.onosproject.cluster.PartitionId; +import org.onosproject.store.cluster.messaging.MessagingException; import org.onosproject.store.cluster.messaging.MessagingService; import com.google.common.base.MoreObjects; @@ -41,6 +43,7 @@ import io.atomix.catalyst.serializer.SerializationException; import io.atomix.catalyst.transport.Address; import io.atomix.catalyst.transport.Connection; import io.atomix.catalyst.transport.MessageHandler; +import io.atomix.catalyst.transport.TransportException; import io.atomix.catalyst.util.Assert; import io.atomix.catalyst.util.Listener; import io.atomix.catalyst.util.Listeners; @@ -66,10 +69,6 @@ public class CopycatTransportConnection implements Connection { private final String inboundMessageSubject; private final ThreadContext context; private final Map, InternalHandler> handlers = Maps.newConcurrentMap(); - private final AtomicInteger messagesSent = new AtomicInteger(0); - private final AtomicInteger sendFailures = new AtomicInteger(0); - private final AtomicInteger messagesReceived = new AtomicInteger(0); - private final AtomicInteger receiveFailures = new AtomicInteger(0); CopycatTransportConnection(long connectionId, CopycatTransport.Mode mode, @@ -120,12 +119,14 @@ public class CopycatTransportConnection implements Connection { baos.toByteArray(), context.executor()) .whenComplete((r, e) -> { - if (e == null) { - messagesSent.incrementAndGet(); - } else { - sendFailures.incrementAndGet(); + Throwable wrappedError = e; + if (e != null) { + Throwable rootCause = Throwables.getRootCause(e); + if (MessagingException.class.isAssignableFrom(rootCause.getClass())) { + wrappedError = new TransportException(e); + } } - handleResponse(r, e, result, context); + handleResponse(r, wrappedError, result, context); }); } catch (SerializationException | IOException e) { result.completeExceptionally(e); @@ -172,11 +173,6 @@ public class CopycatTransportConnection implements Connection { "No handler registered for " + request.getClass())); } return handler.handle(request).handle((result, error) -> { - if (error == null) { - messagesReceived.incrementAndGet(); - } else { - receiveFailures.incrementAndGet(); - } try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { baos.write(error != null ? FAILURE : SUCCESS); context.serializer().writeObject(error != null ? error : result, baos); @@ -220,7 +216,6 @@ public class CopycatTransportConnection implements Connection { if (!(other instanceof CopycatTransportConnection)) { return false; } - return connectionId == ((CopycatTransportConnection) other).connectionId; } @@ -228,10 +223,6 @@ public class CopycatTransportConnection implements Connection { public String toString() { return MoreObjects.toStringHelper(getClass()) .add("id", connectionId) - .add("sent", messagesSent.get()) - .add("received", messagesReceived.get()) - .add("sendFailures", sendFailures.get()) - .add("receiveFailures", receiveFailures.get()) .toString(); } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java index b40e39db8b..286b2fb119 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java @@ -18,7 +18,6 @@ package org.onosproject.store.primitives.impl; import io.atomix.catalyst.serializer.Serializer; import io.atomix.catalyst.transport.Address; import io.atomix.resource.ResourceType; -import io.atomix.variables.DistributedLong; import java.io.File; import java.util.Collection; @@ -57,7 +56,6 @@ public class StoragePartition implements Managed { private StoragePartitionClient client; public static final Collection RESOURCE_TYPES = ImmutableSet.of( - new ResourceType(DistributedLong.class), new ResourceType(AtomixLeaderElector.class), new ResourceType(AtomixConsistentMap.class)); diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java index dbc31577e1..4f912da083 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java @@ -52,7 +52,7 @@ public final class AtomixConsistentMapCommands { @Override public ConsistencyLevel consistency() { - return ConsistencyLevel.SEQUENTIAL; + return ConsistencyLevel.LINEARIZABLE; } @Override @@ -78,7 +78,7 @@ public final class AtomixConsistentMapCommands { @Override public ConsistencyLevel consistency() { - return ConsistencyLevel.SEQUENTIAL; + return ConsistencyLevel.BOUNDED_LINEARIZABLE; } @Override