Fix a stability issue relating to switch disconnection/re-connection events.

If a switch gets disconnected via a network interruption on the control plane (or a similar event),
the datapath associated with that switch might not be properly cleaned up.

At the same time, the socket associated with that datapath can get leaked.
This ultimately results in file descriptor resource exhaustion for Ryu,
as a socket had the potential to be leaked on each reconnect.

Also, a small typo was corrected in a method name in DPSet.
This commit is contained in:
Victor J. Orlikowski 2015-12-02 10:27:07 -05:00
parent f449488e6a
commit 6ca70939a4
2 changed files with 30 additions and 10 deletions

View File

@ -57,7 +57,8 @@ CONF.register_cli_opts([
help='openflow ssl listen port'),
cfg.StrOpt('ctl-privkey', default=None, help='controller private key'),
cfg.StrOpt('ctl-cert', default=None, help='controller certificate'),
cfg.StrOpt('ca-certs', default=None, help='CA certificates')
cfg.StrOpt('ca-certs', default=None, help='CA certificates'),
cfg.FloatOpt('socket-timeout', default=5.0, help='Time, in seconds, to await completion of socket operations.')
])
@ -102,7 +103,8 @@ def _deactivate(method):
try:
method(self)
finally:
self.is_active = False
self.send_active = False
self.set_state(handler.DEAD_DISPATCHER)
return deactivate
@ -112,8 +114,11 @@ class Datapath(ofproto_protocol.ProtocolDesc):
self.socket = socket
self.socket.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
self.socket.settimeout(CONF.socket_timeout)
self.address = address
self.is_active = True
self.send_active = True
self.close_requested = False
# The limit is arbitrary. We need to limit queue size to
# prevent it from eating memory up
@ -145,8 +150,9 @@ class Datapath(ofproto_protocol.ProtocolDesc):
# To show warning when Datapath#ports is read
ports = property(_get_ports, _set_ports)
@_deactivate
def close(self):
self.set_state(handler.DEAD_DISPATCHER)
self.close_requested = True
def set_state(self, state):
self.state = state
@ -161,12 +167,22 @@ class Datapath(ofproto_protocol.ProtocolDesc):
required_len = ofproto_common.OFP_HEADER_SIZE
count = 0
while self.is_active:
ret = self.socket.recv(required_len)
if len(ret) == 0:
self.is_active = False
while True:
ret = ""
try:
ret = self.socket.recv(required_len)
except:
# Hit socket timeout; decide what to do.
if self.close_requested:
pass
else:
continue
if (len(ret) == 0) or (self.close_requested):
self.socket.close()
break
buf += ret
while len(buf) >= required_len:
(version, msg_type, msg_len, xid) = ofproto_parser.header(buf)
@ -203,9 +219,12 @@ class Datapath(ofproto_protocol.ProtocolDesc):
@_deactivate
def _send_loop(self):
try:
while self.is_active:
while self.send_active:
buf = self.send_q.get()
self.socket.sendall(buf)
except IOError as ioe:
LOG.debug("Socket error while sending data to switch at address %s: [%d] %s",
self.address, ioe.errno, ioe.strerror)
finally:
q = self.send_q
# first, clear self.send_q to prevent new references.

View File

@ -117,6 +117,7 @@ class DPSet(app_manager.RyuApp):
self.logger.warning('DPSET: Multiple connections from %s',
dpid_to_str(dp.id))
self.logger.debug('DPSET: Forgetting datapath %s', self.dps[dp.id])
(self.dps[dp.id]).close()
self.logger.debug('DPSET: New datapath %s', dp)
self.dps[dp.id] = dp
if dp.id not in self.port_state:
@ -176,7 +177,7 @@ class DPSet(app_manager.RyuApp):
@set_ev_cls(ofp_event.EventOFPStateChange,
[handler.MAIN_DISPATCHER, handler.DEAD_DISPATCHER])
def dispacher_change(self, ev):
def dispatcher_change(self, ev):
datapath = ev.datapath
assert datapath is not None
if ev.state == handler.MAIN_DISPATCHER: