BGPSpeaker/net_ctrl: Close RPC session when disconnected

This patch fixes net_ctrl to check whether socket is connected and
to close RPC session when disconnected by RPC peer.

Signed-off-by: IWASE Yusuke <iwase.yusuke0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
IWASE Yusuke 2016-12-16 15:17:40 +09:00 committed by FUJITA Tomonori
parent fcea0dc2af
commit f1edc9fbd0

View File

@ -102,12 +102,12 @@ class RpcSession(Activity):
self._next_msgid = 0
self._socket = sock
self._outgoing_msg_sink_iter = outgoing_msg_sink_iter
self.peer_name = str(self._socket.getpeername())
self.is_connected = True
def stop(self):
super(RpcSession, self).stop()
LOG.critical(
'RPC Session to %s stopped', str(self._socket.getpeername())
)
LOG.info('RPC Session to %s stopped', self.peer_name)
def _run(self):
# Process outgoing messages in new thread.
@ -117,9 +117,7 @@ class RpcSession(Activity):
# Process incoming messages in new thread.
green_in = self._spawn('net_ctrl._process_incoming',
self._process_incoming_msgs)
LOG.critical(
'RPC Session to %s started', str(self._socket.getpeername())
)
LOG.info('RPC Session to %s started', self.peer_name)
green_in.wait()
green_out.wait()
@ -166,11 +164,13 @@ class RpcSession(Activity):
LOG.debug('NetworkController started processing incoming messages')
assert self._socket
while True:
while self.is_connected:
# Wait for request/response/notification from peer.
msg_buff = self._recv()
if len(msg_buff) == 0:
LOG.info('Peer %r disconnected.', self._socket)
LOG.info('Peer %s disconnected.', self.peer_name)
self.is_connected = False
self._socket.close()
break
messages = self.feed_and_get_messages(msg_buff)
for msg in messages:
@ -197,16 +197,17 @@ class RpcSession(Activity):
it loops forever.
"""
LOG.debug('NetworkController processing outgoing request list.')
# TODO(Team): handle un-expected exception breaking the loop in
# graceful manner. Discuss this with other component developers.
# TODO(PH): We should try not to sent routes from bgp peer that is not
# in established state.
from ryu.services.protocols.bgp.model import (
FlexinetOutgoingRoute)
while True:
while self.is_connected:
# sink iter is Sink instance and next is blocking so this isn't
# active wait.
for outgoing_msg in sink_iter:
if not self.is_connected:
self._socket.close()
return
if isinstance(outgoing_msg, FlexinetOutgoingRoute):
rpc_msg = _create_prefix_notification(outgoing_msg, self)
else:
@ -349,8 +350,11 @@ class _NetworkController(FlexinetPeer, Activity):
self._rpc_session.start()
def send_rpc_notification(self, method, params):
if (self.started and self._rpc_session is not None and
self._rpc_session.started):
if not self.started or self._rpc_session is None:
return
elif not self._rpc_session.is_connected:
self._rpc_session = None
elif self._rpc_session.started:
return self._rpc_session.send_notification(method, params)