diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java index e4c96863c2..f8243c72d4 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java @@ -168,10 +168,10 @@ public class ClusterMessagingProtocol @Override public ProtocolClient createClient(TcpMember member) { - ControllerNode node = getControllerNode(member.host(), member.port()); - checkNotNull(node, "A valid controller node is expected"); + ControllerNode remoteNode = getControllerNode(member.host(), member.port()); + checkNotNull(remoteNode, "A valid controller node is expected"); return new ClusterMessagingProtocolClient( - clusterCommunicator, node); + clusterCommunicator, clusterService.getLocalNode(), remoteNode); } private ControllerNode getControllerNode(String host, int port) { diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java index bb6bfcf77b..61ec4673af 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java @@ -42,6 +42,7 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { public static final long RETRY_INTERVAL_MILLIS = 2000; private final ClusterCommunicationService clusterCommunicator; + private final ControllerNode localNode; private final ControllerNode remoteNode; // FIXME: Thread pool sizing. @@ -50,8 +51,10 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { public ClusterMessagingProtocolClient( ClusterCommunicationService clusterCommunicator, + ControllerNode localNode, ControllerNode remoteNode) { this.clusterCommunicator = clusterCommunicator; + this.localNode = localNode; this.remoteNode = remoteNode; } @@ -117,7 +120,7 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { this.request = request; this.message = new ClusterMessage( - null, // FIXME fill in proper sender + localNode.id(), messageType(request), ClusterMessagingProtocol.SERIALIZER.encode(request)); this.future = future; @@ -132,22 +135,12 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { future.complete(ClusterMessagingProtocol.SERIALIZER.decode(response)); } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) { -// if (message.subject().equals(ClusterMessagingProtocol.COPYCAT_SYNC) || -// message.subject().equals(ClusterMessagingProtocol.COPYCAT_PING)) { -// log.warn("{} Request to {} failed. Will retry in {} ms", -// message.subject(), remoteNode, RETRY_INTERVAL_MILLIS); -// THREAD_POOL.schedule( -// this, -// RETRY_INTERVAL_MILLIS, -// TimeUnit.MILLISECONDS); -// } else { - log.warn("RPCTask for {} failed.", request, e); - future.completeExceptionally(e); -// } + log.warn("RPCTask for {} failed.", request, e); + future.completeExceptionally(e); } catch (Exception e) { log.warn("RPCTask for {} terribly failed.", request, e); future.completeExceptionally(e); } } } -} \ No newline at end of file +}