diff --git a/ryu/services/protocols/bgp/net_ctrl.py b/ryu/services/protocols/bgp/net_ctrl.py index 7944ac20..92a8e71e 100644 --- a/ryu/services/protocols/bgp/net_ctrl.py +++ b/ryu/services/protocols/bgp/net_ctrl.py @@ -107,6 +107,8 @@ class RpcSession(Activity): self._socket = sock self._outgoing_msg_sink_iter = outgoing_msg_sink_iter self.is_connected = True + self.green_in = None + self.green_out = None def stop(self): super(RpcSession, self).stop() @@ -115,15 +117,15 @@ class RpcSession(Activity): def _run(self): # Process outgoing messages in new thread. - green_out = self._spawn('net_ctrl._process_outgoing', - self._process_outgoing_msg, - self._outgoing_msg_sink_iter) + self.green_out = self._spawn('net_ctrl._process_outgoing', + self._process_outgoing_msg, + self._outgoing_msg_sink_iter) # Process incoming messages in new thread. - green_in = self._spawn('net_ctrl._process_incoming', - self._process_incoming_msgs) + self.green_in = self._spawn('net_ctrl._process_incoming', + self._process_incoming_msgs) LOG.info('RPC Session to %s started', self.peer_name) - green_in.wait() - green_out.wait() + self.green_in.wait() + self.green_out.wait() def _next_msg_id(self): this_id = self._next_msgid @@ -202,6 +204,10 @@ class RpcSession(Activity): LOG.error('Invalid message type: %r', msg) self.pause(0) + # Stop outgoing connection. + if self.green_out: + self.green_out.kill() + def _process_outgoing_msg(self, sink_iter): """For every message we construct a corresponding RPC message to be sent over the given socket inside given RPC session. @@ -231,6 +237,10 @@ class RpcSession(Activity): self._sendall(rpc_msg) self.pause(0) + # Stop incoming connection. + if self.green_in: + self.green_in.kill() + def _recv(self): return self._sock_wrap(self._socket.recv)(RPC_SOCK_BUFF_SIZE)