From 5a8779cd26014ba9968af800743419e3e535beae Mon Sep 17 00:00:00 2001 From: tom Date: Mon, 29 Sep 2014 14:48:43 -0700 Subject: [PATCH] Added ability to properly register/deregister new connections and have the node status properly reflected. --- .../org/onlab/onos/foo/IOLoopTestClient.java | 2 +- .../cluster/impl/DistributedClusterStore.java | 63 ++++++++++++++----- .../src/main/java/org/onlab/nio/IOLoop.java | 17 +++-- .../java/org/onlab/nio/MessageStream.java | 3 +- .../java/org/onlab/nio/IOLoopTestClient.java | 2 +- 5 files changed, 59 insertions(+), 28 deletions(-) diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java index 3ec8c07044..302a0c75d6 100644 --- a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java +++ b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java @@ -233,7 +233,7 @@ public class IOLoopTestClient { } @Override - protected void connect(SelectionKey key) { + protected void connect(SelectionKey key) throws IOException { super.connect(key); TestMessageStream b = (TestMessageStream) key.attachment(); Worker w = ((CustomIOLoop) b.loop()).worker; diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java index 08a182bc3d..5cd9d9eb5f 100644 --- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java +++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java @@ -30,6 +30,7 @@ import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -129,6 +130,7 @@ public class DistributedClusterStore if (self == null) { self = new DefaultControllerNode(new NodeId(ip.toString()), ip); nodes.put(self.id(), self); + states.put(self.id(), State.ACTIVE); } } @@ -219,7 +221,10 @@ public class DistributedClusterStore @Override public void removeNode(NodeId nodeId) { nodes.remove(nodeId); - streams.remove(nodeId); + TLVMessageStream stream = streams.remove(nodeId); + if (stream != null) { + stream.close(); + } } // Listens and accepts inbound connections from other cluster nodes. @@ -256,12 +261,13 @@ public class DistributedClusterStore protected void processMessages(List messages, MessageStream stream) { TLVMessageStream tlvStream = (TLVMessageStream) stream; for (TLVMessage message : messages) { - // TODO: add type-based dispatching here... - log.info("Got message {}", message.type()); - - // FIXME: hack to get going + // TODO: add type-based dispatching here... this is just a hack to get going if (message.type() == HELLO_MSG) { processHello(message, tlvStream); + } else if (message.type() == ECHO_MSG) { + processEcho(message, tlvStream); + } else { + log.info("Deal with other messages"); } } } @@ -271,7 +277,7 @@ public class DistributedClusterStore TLVMessageStream stream = super.acceptStream(channel); try { InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress(); - log.info("Accepted a new connection from node {}", IpPrefix.valueOf(sa.getAddress().getAddress())); + log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress())); stream.write(createHello(self)); } catch (IOException e) { @@ -285,31 +291,55 @@ public class DistributedClusterStore TLVMessageStream stream = super.connectStream(channel); DefaultControllerNode node = nodesByChannel.get(channel); if (node != null) { - log.info("Opened connection to node {}", node.id()); + log.debug("Opened connection to node {}", node.id()); nodesByChannel.remove(channel); } return stream; } @Override - protected void connect(SelectionKey key) { - super.connect(key); - TLVMessageStream stream = (TLVMessageStream) key.attachment(); - send(stream, createHello(self)); + protected void connect(SelectionKey key) throws IOException { + try { + super.connect(key); + TLVMessageStream stream = (TLVMessageStream) key.attachment(); + send(stream, createHello(self)); + } catch (IOException e) { + if (!Objects.equals(e.getMessage(), "Connection refused")) { + throw e; + } + } + } + + @Override + protected void removeStream(MessageStream stream) { + DefaultControllerNode node = ((TLVMessageStream) stream).node(); + if (node != null) { + log.info("Closed connection to node {}", node.id()); + states.put(node.id(), State.INACTIVE); + streams.remove(node.id()); + } + super.removeStream(stream); } } - // FIXME: pure hack for now + // Processes a HELLO message from a peer controller node. private void processHello(TLVMessage message, TLVMessageStream stream) { + // FIXME: pure hack for now String data = new String(message.data()); - log.info("Processing hello with data [{}]", data); - String[] fields = new String(data).split(":"); + String[] fields = data.split(":"); DefaultControllerNode node = new DefaultControllerNode(new NodeId(fields[0]), - IpPrefix.valueOf(fields[1]), + valueOf(fields[1]), Integer.parseInt(fields[2])); stream.setNode(node); nodes.put(node.id(), node); streams.put(node.id(), stream); + states.put(node.id(), State.ACTIVE); + } + + // Processes an ECHO message from a peer controller node. + private void processEcho(TLVMessage message, TLVMessageStream tlvStream) { + // TODO: implement heart-beat refresh + log.info("Dealing with echoes..."); } // Sends message to the specified stream. @@ -321,6 +351,7 @@ public class DistributedClusterStore } } + // Creates a hello message to be sent to a peer controller node. private TLVMessage createHello(DefaultControllerNode self) { return new TLVMessage(HELLO_MSG, (self.id() + ":" + self.ip() + ":" + self.tcpPort()).getBytes()); } @@ -335,7 +366,7 @@ public class DistributedClusterStore try { openConnection(node, findLeastUtilizedLoop()); } catch (IOException e) { - log.warn("Unable to connect", e); + log.debug("Unable to connect", e); } } } diff --git a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java index 805b58af28..dc3ecaf0b6 100644 --- a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java +++ b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java @@ -93,14 +93,9 @@ public abstract class IOLoop> * * @param key selection key holding the pending connect operation. */ - protected void connect(SelectionKey key) { - try { - SocketChannel ch = (SocketChannel) key.channel(); - ch.finishConnect(); - } catch (IOException | IllegalStateException e) { - log.warn("Unable to complete connection", e); - } - + protected void connect(SelectionKey key) throws IOException { + SocketChannel ch = (SocketChannel) key.channel(); + ch.finishConnect(); if (key.isValid()) { key.interestOps(SelectionKey.OP_READ); } @@ -124,7 +119,11 @@ public abstract class IOLoop> // If there is a pending connect operation, complete it. if (key.isConnectable()) { - connect(key); + try { + connect(key); + } catch (IOException | IllegalStateException e) { + log.warn("Unable to complete connection", e); + } } // If there is a read operation, slurp as much data as possible. diff --git a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java index a7416e954c..c38f0f514d 100644 --- a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java +++ b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java @@ -10,6 +10,7 @@ import java.nio.channels.ByteChannel; import java.nio.channels.SelectionKey; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -262,7 +263,7 @@ public abstract class MessageStream { try { channel.write(outbound); } catch (IOException e) { - if (!closed && !e.getMessage().equals("Broken pipe")) { + if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) { log.warn("Unable to write data", e); ioError = e; } diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java index bdcc97a00f..bbeedd00c1 100644 --- a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java +++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java @@ -230,7 +230,7 @@ public class IOLoopTestClient { } @Override - protected void connect(SelectionKey key) { + protected void connect(SelectionKey key) throws IOException { super.connect(key); TestMessageStream b = (TestMessageStream) key.attachment(); Worker w = ((CustomIOLoop) b.loop()).worker;