From 978513e8337aceee7cc48c5c196d95db2eee2f6d Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Fri, 4 Oct 2013 11:09:08 +0900 Subject: [PATCH] handle datapath join race and disconnection If no datapath is avalable, requests are queued and will be executed later. Signed-off-by: FUJITA Tomonori --- ryu/controller/api.py | 57 +++++++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/ryu/controller/api.py b/ryu/controller/api.py index 01d79060..bda03fe9 100644 --- a/ryu/controller/api.py +++ b/ryu/controller/api.py @@ -1,4 +1,5 @@ import eventlet +import select import time from ryu.base import app_manager from ryu.controller import handler @@ -35,6 +36,7 @@ class OFWireRpcSession(object): self.socket = socket self.dpset = dpset self.session = RpcSession() + self.pending = [] self.pool = eventlet.GreenPool() self.send_queue = eventlet.queue.Queue() self.pool.spawn_n(self._send) @@ -71,9 +73,8 @@ class OFWireRpcSession(object): break if dp is None: - m = self.session.create_response(msg[1], None, - [{'error': 'no datapath'}]) - self.send_queue.put(m) + print 'no datapath, queued', msg + self.pending.append(msg) return ofmsg = None @@ -157,25 +158,35 @@ class OFWireRpcSession(object): r = self.session.create_response(msg[1], 0, []) self.send_queue.put(r) + def _handle_rpc_message(self, m): + if m[0] == RpcMessage.REQUEST: + if m[2] == 'ofp': + self.ofp_handle_request(m) + elif m[2] == 'monitor_port': + self.monitor_port(m) + elif m[0] == RpcMessage.RESPONSE: + pass + elif m[0] == RpcMessage.NOTIFY: + if m[1] == 'traceroute': + self._tr_handle_notify(m) + else: + print "invalid type", m[0] + def serve(self): while True: - ret = self.socket.recv(4096) - if len(ret) == 0: - break - messages = self.session.get_messages(ret) - for m in messages: - if m[0] == RpcMessage.REQUEST: - if m[2] == 'ofp': - self.ofp_handle_request(m) - elif m[2] == 'monitor_port': - self.monitor_port(m) - elif m[0] == RpcMessage.RESPONSE: - pass - elif m[0] == RpcMessage.NOTIFY: - if m[1] == 'traceroute': - self._tr_handle_notify(m) - else: - print "invalid type", m + rready, _, _ = select.select([self.socket], [], [], 5) + + for idx in range(len(self.pending)): + print "found pending", msg + msg = self.pending.pop(0) + self._handle_rpc_message(msg) + + if len(rready) > 0: + ret = self.socket.recv(4096) + if len(ret) == 0: + break + for m in self.session.get_messages(ret): + self._handle_rpc_message(m) class RPCApi(app_manager.RyuApp): @@ -204,13 +215,7 @@ class RPCApi(app_manager.RyuApp): pass eventlet.sleep(monitored_ports['interval']) - def _wait_for_dp_joined(self): - while not self.dp_joined: - #print "waiting for dp joined..." - eventlet.sleep(0.1) - def serve(self): - self._wait_for_dp_joined() while True: sock, address = self.server.accept() session = OFWireRpcSession(sock, self.dpset)