mirror of
https://github.com/faucetsdn/ryu.git
synced 2026-05-11 15:26:11 +02:00
Clean up socket close() handling
Also, temporarily work around a bug in eventlet's Queue.put() by wrapping the send_q with a semaphore. Signed-off-by: Victor J. Orlikowski <vjo@duke.edu> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
parent
f1f0ca2d16
commit
477f2ddd70
@ -30,7 +30,7 @@ from ryu.lib.hub import StreamServer
|
||||
import traceback
|
||||
import random
|
||||
import ssl
|
||||
from socket import IPPROTO_TCP, TCP_NODELAY, timeout as SocketTimeout, error as SocketError
|
||||
from socket import IPPROTO_TCP, TCP_NODELAY, SHUT_RDWR, timeout as SocketTimeout
|
||||
import warnings
|
||||
|
||||
import ryu.base.app_manager
|
||||
@ -41,8 +41,8 @@ from ryu.ofproto import ofproto_protocol
|
||||
from ryu.ofproto import ofproto_v1_0
|
||||
from ryu.ofproto import nx_match
|
||||
|
||||
from ryu.controller import handler
|
||||
from ryu.controller import ofp_event
|
||||
from ryu.controller.handler import HANDSHAKE_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER
|
||||
|
||||
from ryu.lib.dpid import dpid_to_str
|
||||
|
||||
@ -103,8 +103,15 @@ def _deactivate(method):
|
||||
try:
|
||||
method(self)
|
||||
finally:
|
||||
self.send_active = False
|
||||
self.set_state(handler.DEAD_DISPATCHER)
|
||||
try:
|
||||
self.socket.shutdown(SHUT_RDWR)
|
||||
except (EOFError, IOError):
|
||||
pass
|
||||
|
||||
if not self.is_active:
|
||||
self.socket.close()
|
||||
if self.state is not DEAD_DISPATCHER:
|
||||
self.set_state(DEAD_DISPATCHER)
|
||||
return deactivate
|
||||
|
||||
|
||||
@ -117,19 +124,20 @@ class Datapath(ofproto_protocol.ProtocolDesc):
|
||||
self.socket.settimeout(CONF.socket_timeout)
|
||||
self.address = address
|
||||
|
||||
self.send_active = True
|
||||
self.is_active = True
|
||||
self.close_requested = False
|
||||
|
||||
# The limit is arbitrary. We need to limit queue size to
|
||||
# prevent it from eating memory up
|
||||
self.send_q = hub.Queue(16)
|
||||
self._send_q_sem = hub.BoundedSemaphore(self.send_q.maxsize)
|
||||
|
||||
self.xid = random.randint(0, self.ofproto.MAX_XID)
|
||||
self.id = None # datapath_id is unknown yet
|
||||
self._ports = None
|
||||
self.flow_format = ofproto_v1_0.NXFF_OPENFLOW10
|
||||
self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
|
||||
self.set_state(handler.HANDSHAKE_DISPATCHER)
|
||||
self.set_state(HANDSHAKE_DISPATCHER)
|
||||
|
||||
def _get_ports(self):
|
||||
if (self.ofproto_parser is not None and
|
||||
@ -167,19 +175,17 @@ class Datapath(ofproto_protocol.ProtocolDesc):
|
||||
required_len = ofproto_common.OFP_HEADER_SIZE
|
||||
|
||||
count = 0
|
||||
while True:
|
||||
while not self.close_requested:
|
||||
ret = ""
|
||||
|
||||
try:
|
||||
ret = self.socket.recv(required_len)
|
||||
except SocketTimeout:
|
||||
if not self.close_requested:
|
||||
continue
|
||||
except SocketError:
|
||||
self.close_requested = True
|
||||
continue
|
||||
except (EOFError, IOError):
|
||||
break
|
||||
|
||||
if (len(ret) == 0) or (self.close_requested):
|
||||
self.socket.close()
|
||||
if len(ret) == 0:
|
||||
break
|
||||
|
||||
buf += ret
|
||||
@ -215,30 +221,45 @@ class Datapath(ofproto_protocol.ProtocolDesc):
|
||||
count = 0
|
||||
hub.sleep(0)
|
||||
|
||||
@_deactivate
|
||||
def _send_loop(self):
|
||||
try:
|
||||
while self.send_active:
|
||||
while True:
|
||||
buf = self.send_q.get()
|
||||
self._send_q_sem.release()
|
||||
self.socket.sendall(buf)
|
||||
except SocketTimeout:
|
||||
LOG.debug("Socket timed out while sending data to switch at address %s",
|
||||
self.address)
|
||||
except IOError as ioe:
|
||||
LOG.debug("Socket error while sending data to switch at address %s: [%d] %s",
|
||||
self.address, ioe.errno, ioe.strerror)
|
||||
# Convert ioe.errno to a string, just in case it was somehow set to None.
|
||||
errno = "%s" % ioe.errno
|
||||
LOG.debug("Socket error while sending data to switch at address %s: [%s] %s",
|
||||
self.address, errno, ioe.strerror)
|
||||
finally:
|
||||
q = self.send_q
|
||||
# first, clear self.send_q to prevent new references.
|
||||
self.send_q = None
|
||||
# there might be threads currently blocking in send_q.put().
|
||||
# unblock them by draining the queue.
|
||||
# Now, drain the send_q, releasing the associated semaphore for each entry.
|
||||
# This should release all threads waiting to acquire the semaphore.
|
||||
try:
|
||||
while q.get(block=False):
|
||||
pass
|
||||
self._send_q_sem.release()
|
||||
except hub.QueueEmpty:
|
||||
pass
|
||||
# Finally, ensure the _recv_loop terminates.
|
||||
self.close()
|
||||
|
||||
def send(self, buf):
|
||||
msg_enqueued = False
|
||||
self._send_q_sem.acquire()
|
||||
if self.send_q:
|
||||
self.send_q.put(buf)
|
||||
msg_enqueued = True
|
||||
else:
|
||||
self._send_q_sem.release()
|
||||
if not msg_enqueued:
|
||||
LOG.debug('Datapath in process of terminating; send() to %s discarded.',
|
||||
self.address)
|
||||
|
||||
def set_xid(self, msg):
|
||||
self.xid += 1
|
||||
@ -266,6 +287,7 @@ class Datapath(ofproto_protocol.ProtocolDesc):
|
||||
finally:
|
||||
hub.kill(send_thr)
|
||||
hub.joinall([send_thr])
|
||||
self.is_active = False
|
||||
|
||||
#
|
||||
# Utility methods for convenience
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user