diff --git a/ryu/controller/api.py b/ryu/controller/api.py index 7dfc08dd..4d3e87db 100644 --- a/ryu/controller/api.py +++ b/ryu/controller/api.py @@ -6,6 +6,7 @@ from ryu.controller import handler from ryu.controller import dpset from ryu.ofproto.ofproto_parser import MsgBase from ryu.lib.rpc import RpcSession, RpcMessage +from ryu.ofproto import ofproto_parser from ryu.ofproto import ofproto_v1_0 from ryu.ofproto import ofproto_v1_0_parser from ryu.ofproto import ofproto_v1_2 @@ -14,20 +15,29 @@ from ryu.ofproto import ofproto_v1_3 from ryu.ofproto import ofproto_v1_3_parser +def find_ofp_cls(ofp_version, name): + parser_name = 'ryu.ofproto.ofproto_v1_' + str(ofp_version - 1) + \ + '_parser' + mod = sys.modules[parser_name] + for i in inspect.getmembers(mod, lambda cls: (inspect.isclass(cls))): + if i[0] == name: + return i[1] + return None + + class OFWireRpcSession(object): def __init__(self, socket, dpset): self.socket = socket self.dpset = dpset self.session = RpcSession() + self.pool = eventlet.GreenPool() + self.send_queue = eventlet.queue.Queue() + self.pool.spawn_n(self._send) - def _find_ofp_cls(self, ofp_version, name): - parser_name = 'ryu.ofproto.ofproto_v1_' + str(ofp_version - 1) + \ - '_parser' - mod = sys.modules[parser_name] - for i in inspect.getmembers(mod, lambda cls: (inspect.isclass(cls))): - if i[0] == name: - return i[1] - return None + def _send(self): + while True: + m = self.send_queue.get() + self.socket.sendall(m) def _ofp_handle_match(self, clses, params): match = clses() @@ -40,7 +50,7 @@ class OFWireRpcSession(object): for k, v in params.items(): if type(v) == dict: self._ofp_handle_params(dp, v) - clses = self._find_ofp_cls(dp.ofproto.OFP_VERSION, k) + clses = find_ofp_cls(dp.ofproto.OFP_VERSION, k) if clses is not None: if issubclass(clses, MsgBase): ins = clses(dp, **v) @@ -68,17 +78,17 @@ class OFWireRpcSession(object): params = msg[3][1] dp = self.dpset.get(int(dpid)) if dp is None: - print self.dpset.get_all() r = self.session.create_response(msg[1], 1, 0) - self.socket.sendall(r) + self.send_queue.put(r) return self._ofp_handle_params(dp, params) + result = {} for k, v in params.items(): dp.send_msg(v) - - r = self.session.create_response(msg[1], 0, 0) - self.socket.sendall(r) + result = {'xid': v.xid} + r = self.session.create_response(msg[1], 0, result) + self.send_queue.put(r) def serve(self): while True: @@ -98,14 +108,14 @@ class OFWireRpcSession(object): print "invalid type", m -class RpcApi(app_manager.RyuApp): +class RPCApi(app_manager.RyuApp): _CONTEXTS = { 'dpset': dpset.DPSet, } def __init__(self, *args, **kwargs): self.dpset = kwargs['dpset'] - super(RpcApi, self).__init__(*args, **kwargs) + super(RPCApi, self).__init__(*args, **kwargs) self.server = eventlet.listen(('', 50000)) self.pool = eventlet.GreenPool() self.pool.spawn_n(self.serve) @@ -118,14 +128,50 @@ class RpcApi(app_manager.RyuApp): self.sessions.append(session) self.pool.spawn_n(session.serve) + def _ofp_handle_ob(self, dp, msg, params): + clses = find_ofp_cls(dp.ofproto.OFP_VERSION, + msg.__class__.__name__) + if clses: + params[msg.__class__.__name__] = {} + _params = params[msg.__class__.__name__] + else: + _params = params + + for i in ofproto_parser.ofp_attr(msg): + if i.startswith('_'): + continue + elif i == 'parser': + continue + elif i == 'serialize': + continue + + v = getattr(msg, i) + if type(v) == dict: + _params[i] = {} + for key in v.keys(): + _params[i][key] = {} + self._ofp_handle_ob(dp, v[key], _params[i][key]) + elif type(v) == list: + ins = [] + for j in v: + d = {} + self._ofp_handle_ob(dp, j, d) + ins.append(d) + _params[i] = ins + elif type(v).__name__ == 'builtin_function_or_method': + pass + else: + _params[i] = v + @handler.observe_all_events('ofp_event') def handler(self, ev): if hasattr(ev, 'msg'): + ofmsg = ev.msg params = {} - params[type(ev.msg).__name__] = {} - for m in inspect.getmembers(ev.msg): - if not m[0].startswith('__'): - # we need a mechnism to get _only_ OFP defined members. - params[type(ev.msg).__name__][m[0]] = m[1] + self._ofp_handle_ob(ofmsg.datapath, ofmsg, params) + for k in params.keys(): + params[k]['xid'] = ofmsg.xid - # send this to all sessions as a notification. + for s in self.sessions: + m = s.session.create_notification('ofp', params) + s.send_queue.put(m)