From be8f4d656a9307075825270f13697b0783835c5f Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Tue, 25 Mar 2014 18:58:03 +0900 Subject: [PATCH] apgw: specify port_no for queuestats Signed-off-by: FUJITA Tomonori --- ryu/controller/api.py | 22 ++++++++++++++-------- ryu/tests/unit/app/test_apgw_rpc.py | 26 +++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/ryu/controller/api.py b/ryu/controller/api.py index e8ba1e44..769b3b63 100644 --- a/ryu/controller/api.py +++ b/ryu/controller/api.py @@ -667,7 +667,7 @@ class RpcOFPManager(app_manager.RyuApp): self.ofconfig.put((peer, msgid, params)) raise NoRPCResponse() - def _monitor(self, resource_name, resource_dict, request_generator, + def _monitor(self, mandatory_params, resource_dict, request_generator, msgid, params): try: param_dict = params[0] @@ -677,6 +677,7 @@ class RpcOFPManager(app_manager.RyuApp): contexts = None interval = 60 + resource_name = mandatory_params.pop(0) for k, v in param_dict.items(): if k == resource_name: resource_id = v @@ -684,6 +685,8 @@ class RpcOFPManager(app_manager.RyuApp): contexts = v elif k == 'interval': interval = v + elif k in mandatory_params: + pass else: raise RPCError('unknown parameters, %s' % k) @@ -708,10 +711,10 @@ class RpcOFPManager(app_manager.RyuApp): if need_spawn: pass hub.spawn(self._monitor_thread, resource_id, resource_dict, - request_generator) + param_dict, request_generator) return {} - def _port_stats_generator(self, dp, port_name): + def _port_stats_generator(self, dp, port_name, param_dict): port_no = None ports = self.dpset.get_ports(dp.id) for port in ports: @@ -724,17 +727,18 @@ class RpcOFPManager(app_manager.RyuApp): port_no=port_no) def _monitor_port(self, msgid, params): - return self._monitor('physical_port_no', + return self._monitor(['physical_port_no'], self.monitored_ports, self._port_stats_generator, msgid, params) - def _monitor_thread(self, resource_id, resource_dict, generator): + def _monitor_thread(self, resource_id, resource_dict, param_dict, + generator): while resource_id in resource_dict: _contexts, interval = resource_dict[resource_id] for k, dp in self.dpset.get_all(): try: - ofmsg = generator(dp, resource_id) + ofmsg = generator(dp, resource_id, param_dict) if ofmsg: dp.send_msg(ofmsg) except: @@ -742,12 +746,14 @@ class RpcOFPManager(app_manager.RyuApp): pass hub.sleep(interval) - def _queue_stats_generator(self, dp, queue_id): + def _queue_stats_generator(self, dp, queue_id, param_dict): + port_no = param_dict['port_no'] return dp.ofproto_parser.OFPQueueStatsRequest(datapath=dp, + port_no=port_no, queue_id=queue_id) def _monitor_queue(self, msgid, params): - return self._monitor('queue_id', + return self._monitor(['queue_id', 'port_no'], self.monitored_queues, self._queue_stats_generator, msgid, params) diff --git a/ryu/tests/unit/app/test_apgw_rpc.py b/ryu/tests/unit/app/test_apgw_rpc.py index 1c5cd542..4f9334da 100644 --- a/ryu/tests/unit/app/test_apgw_rpc.py +++ b/ryu/tests/unit/app/test_apgw_rpc.py @@ -100,6 +100,30 @@ class TestRpcOFPManager(unittest.TestCase): eq_(r, {}) eq_(m.monitored_ports[port_name], (contents, interval)) + def test_monitor_queue(self): + m = api.RpcOFPManager(dpset=None) + msgid = 1 + try: + m._monitor_queue(msgid, {}) + except api.RPCError as e: + pass + + queue_id = 10 + port_no = 10 + interval = 10 + try: + m._monitor_queue(msgid, {'queue_id': queue_id, 'port_no': port_no}) + except api.RPCError as e: + pass + + contents = {'hoge': 'jail'} + r = m._monitor_queue(msgid, [{'queue_id': queue_id, + 'port_no': port_no, + 'contexts': contents, + 'interval': interval}]) + eq_(r, {}) + eq_(m.monitored_queues[queue_id], (contents, interval)) + def test_register_traceroute(self): m = api.RpcOFPManager(dpset=None) msgid = 1 @@ -437,7 +461,7 @@ class TestRpcOFPManager(unittest.TestCase): m.monitored_ports[port_name] = ({}, 1) with hub.Timeout(2): threads.append(hub.spawn(m._monitor_thread, port_name, - m.monitored_ports, + m.monitored_ports, {}, m._port_stats_generator)) hub.sleep(0.5) for t in threads: