handle datapath join race and disconnection

If no datapath is avalable, requests are queued and will be executed later.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
FUJITA Tomonori 2013-10-04 11:09:08 +09:00
parent 1b063ab24d
commit 978513e833

View File

@ -1,4 +1,5 @@
import eventlet
import select
import time
from ryu.base import app_manager
from ryu.controller import handler
@ -35,6 +36,7 @@ class OFWireRpcSession(object):
self.socket = socket
self.dpset = dpset
self.session = RpcSession()
self.pending = []
self.pool = eventlet.GreenPool()
self.send_queue = eventlet.queue.Queue()
self.pool.spawn_n(self._send)
@ -71,9 +73,8 @@ class OFWireRpcSession(object):
break
if dp is None:
m = self.session.create_response(msg[1], None,
[{'error': 'no datapath'}])
self.send_queue.put(m)
print 'no datapath, queued', msg
self.pending.append(msg)
return
ofmsg = None
@ -157,25 +158,35 @@ class OFWireRpcSession(object):
r = self.session.create_response(msg[1], 0, [])
self.send_queue.put(r)
def _handle_rpc_message(self, m):
if m[0] == RpcMessage.REQUEST:
if m[2] == 'ofp':
self.ofp_handle_request(m)
elif m[2] == 'monitor_port':
self.monitor_port(m)
elif m[0] == RpcMessage.RESPONSE:
pass
elif m[0] == RpcMessage.NOTIFY:
if m[1] == 'traceroute':
self._tr_handle_notify(m)
else:
print "invalid type", m[0]
def serve(self):
while True:
ret = self.socket.recv(4096)
if len(ret) == 0:
break
messages = self.session.get_messages(ret)
for m in messages:
if m[0] == RpcMessage.REQUEST:
if m[2] == 'ofp':
self.ofp_handle_request(m)
elif m[2] == 'monitor_port':
self.monitor_port(m)
elif m[0] == RpcMessage.RESPONSE:
pass
elif m[0] == RpcMessage.NOTIFY:
if m[1] == 'traceroute':
self._tr_handle_notify(m)
else:
print "invalid type", m
rready, _, _ = select.select([self.socket], [], [], 5)
for idx in range(len(self.pending)):
print "found pending", msg
msg = self.pending.pop(0)
self._handle_rpc_message(msg)
if len(rready) > 0:
ret = self.socket.recv(4096)
if len(ret) == 0:
break
for m in self.session.get_messages(ret):
self._handle_rpc_message(m)
class RPCApi(app_manager.RyuApp):
@ -204,13 +215,7 @@ class RPCApi(app_manager.RyuApp):
pass
eventlet.sleep(monitored_ports['interval'])
def _wait_for_dp_joined(self):
while not self.dp_joined:
#print "waiting for dp joined..."
eventlet.sleep(0.1)
def serve(self):
self._wait_for_dp_joined()
while True:
sock, address = self.server.accept()
session = OFWireRpcSession(sock, self.dpset)