From 278bc8fe008baaaa13f0567f8fef0e99d2b53549 Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Tue, 25 Feb 2014 17:17:58 +0900 Subject: [PATCH] apgw: fix port stats Support interval per port. refactor to use the same code for port and queue stats. Signed-off-by: FUJITA Tomonori --- ryu/controller/api.py | 93 ++++++++++++++++------------- ryu/tests/unit/app/test_apgw_rpc.py | 31 ++++++---- 2 files changed, 70 insertions(+), 54 deletions(-) diff --git a/ryu/controller/api.py b/ryu/controller/api.py index 6b00d00d..531b2187 100644 --- a/ryu/controller/api.py +++ b/ryu/controller/api.py @@ -21,6 +21,7 @@ from ryu.lib.of_config import capable_switch as cs from ryu.lib.of_config import constants as consts import ryu.lib.of_config.classes as ofc import eventlet +import sys _ = type('', (apgw.StructuredMessage,), {}) @@ -86,10 +87,8 @@ class RpcOFPManager(app_manager.RyuApp): self.pending_rpc_requests = [] self._rpc_events = hub.Queue(128) # per 30 secs by default - self.port_monitor_interval = 30 self.ofconfig = hub.Queue(128) hub.spawn(self._peer_accept_thread) - hub.spawn(self._port_status_thread) hub.spawn(self._rpc_message_thread) hub.spawn(self._ofconfig_thread) apgw.update_syslog_format() @@ -155,17 +154,6 @@ class RpcOFPManager(app_manager.RyuApp): self.peer_accept_handler) server.serve_forever() - def _port_status_thread(self): - while True: - for k, dp in self.dpset.get_all(): - try: - ofmsg = dp.ofproto_parser.OFPPortStatsRequest(datapath=dp) - dp.send_msg(ofmsg) - except: - # ignore the error due to dead datapath - pass - hub.sleep(self.port_monitor_interval) - def _send_waited_rpc_response(self, msg): for peer in self._peers: if not msg.datapath.id in peer.wait_for_ofp_resepnse: @@ -270,7 +258,8 @@ class RpcOFPManager(app_manager.RyuApp): if port.name in self.monitored_ports: stats = {'physical_port_no': port.name} stats.update(stat.to_jsondict()['OFPPortStats']) - stats.update(self.monitored_ports[port.name]) + contexts, interval_ = self.monitored_ports[port.name] + stats.update(contexts) self.logger.info(_(msg=stats, log_type='stats')) @handler.set_ev_cls(ofp_event.EventOFPMeterStatsReply, @@ -645,18 +634,19 @@ class RpcOFPManager(app_manager.RyuApp): self.ofconfig.put((peer, msgid, params)) raise NoRPCResponse() - def _monitor(self, resource_name, msgid, params): + def _monitor(self, resource_name, resource_dict, request_generator, + msgid, params): try: param_dict = params[0] except: raise RPCError('parameters are missing') - name = None + resource_id = None contexts = None interval = 60 for k, v in param_dict.items(): if k == resource_name: - name = v + resource_id = v elif k == 'contexts': contexts = v elif k == 'interval': @@ -668,44 +658,63 @@ class RpcOFPManager(app_manager.RyuApp): raise RPCError('"contexts" parameter is necessary') if contexts is not None and not isinstance(contexts, dict): raise RPCError('"contexts" parameter must be dictionary') - if name is None: + if resource_id is None: raise RPCError('"%s" parameter is necessary' % resource_name) - return name, contexts, interval - - def _monitor_port(self, msgid, params): - name, contexts, interval = self._monitor('physical_port_no', - msgid, params) - self.monitored_ports[name] = contexts - self.port_monitor_interval = interval - return {} - - def _monitor_queue(self, msgid, params): - queue_id, contexts, interval = self._monitor('queue_id', - msgid, params) if interval == 0: - if queue_id in self.monitored_queues: - del self.monitored_queues[queue_id] + if resource_id in resource_dict: + del resource_dict[resource_id] else: - raise RPCError('queue id %d does not exist' % queue_id) + raise RPCError('%s %d does not exist' % (resource_name, + resource_id)) else: need_spawn = False - if not queue_id in self.monitored_queues: + if not resource_id in resource_dict: need_spawn = True - self.monitored_queues[queue_id] = (contexts, interval) + resource_dict[resource_id] = (contexts, interval) if need_spawn: - hub.spawn(self._monitor_queue_thread, queue_id) + pass + hub.spawn(self._monitor_thread, resource_id, resource_dict, + request_generator) return {} - def _monitor_queue_thread(self, queue_id): - while queue_id in self.monitored_queues: - _contexts, interval = self.monitored_queues[queue_id] + def _port_stats_generator(self, dp, port_name): + port_no = None + ports = self.dpset.get_ports(dp.id) + for port in ports: + if port.name == port_name: + port_no = port.port_no + break + if port_no is None: + return None + return dp.ofproto_parser.OFPPortStatsRequest(datapath=dp, + port_no=port_no) + + def _monitor_port(self, msgid, params): + return self._monitor('physical_port_no', + self.monitored_ports, + self._port_stats_generator, + msgid, params) + + def _monitor_thread(self, resource_id, resource_dict, generator): + while resource_id in resource_dict: + _contexts, interval = resource_dict[resource_id] for k, dp in self.dpset.get_all(): try: - ofmsg = dp.ofproto_parser.OFPQueueStatsRequest( - datapath=dp, queue_id=queue_id) - dp.send_msg(ofmsg) + ofmsg = generator(dp, resource_id) + if ofmsg: + dp.send_msg(ofmsg) except: # ignore the error due to dead datapath pass hub.sleep(interval) + + def _queue_stats_generator(self, dp, queue_id): + return dp.ofproto_parser.OFPQueueStatsRequest(datapath=dp, + queue_id=queue_id) + + def _monitor_queue(self, msgid, params): + return self._monitor('queue_id', + self.monitored_queues, + self._queue_stats_generator, + msgid, params) diff --git a/ryu/tests/unit/app/test_apgw_rpc.py b/ryu/tests/unit/app/test_apgw_rpc.py index a11a3281..f95d8661 100644 --- a/ryu/tests/unit/app/test_apgw_rpc.py +++ b/ryu/tests/unit/app/test_apgw_rpc.py @@ -86,17 +86,19 @@ class TestRpcOFPManager(unittest.TestCase): except api.RPCError as e: pass + port_name = 'OFP11' + interval = 10 try: - m._monitor_port(msgid, {'physical_port_no': 1}) + m._monitor_port(msgid, {'physical_port_no': port_name}) except api.RPCError as e: pass contents = {'hoge': 'jail'} - r = m._monitor_port(msgid, [{'physical_port_no': 1, + r = m._monitor_port(msgid, [{'physical_port_no': port_name, 'contexts': contents, - 'interval': 30}]) + 'interval': interval}]) eq_(r, {}) - eq_(m.monitored_ports[1], contents) + eq_(m.monitored_ports[port_name], (contents, interval)) def test_register_traceroute(self): m = api.RpcOFPManager(dpset=None) @@ -137,8 +139,8 @@ class TestRpcOFPManager(unittest.TestCase): if ports: class DummyPort(object): def __init__(self, port_no): - self.name = port_no - dps.ports = dict(map((lambda n: (n, DummyPort(n))), ports)) + self.port_no = port_no + dps.ports = map(lambda n: DummyPort(n), ports) dps.get_ports = lambda dpid: dps.ports dps._register(dp) @@ -423,23 +425,28 @@ class TestRpcOFPManager(unittest.TestCase): def _test_port_status_thread(self, ofp, ofpp): dpid = 10 - dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp) + port_no = 1 + port_name = 'OFP11' + dps = self._create_dpset(dpid, ports=(port_no,), ofp=ofp, ofpp=ofpp) dp = dps.get(dpid) m = api.RpcOFPManager(dpset=dps) - m.port_monitor_interval = 10 - + p = dps.get_ports(dpid) + p[0].name = port_name threads = [] + m.monitored_ports[port_name] = ({}, 1) with hub.Timeout(2): - threads.append(hub.spawn(m._port_status_thread)) - + threads.append(hub.spawn(m._monitor_thread, port_name, + m.monitored_ports, + m._port_stats_generator)) hub.sleep(0.5) for t in threads: hub.kill(t) hub.joinall(threads) + assert len(dp.sent) for m in dp.sent: eq_(m.__class__, ofpp.OFPPortStatsRequest) - eq_(m.port_no, ofp.OFPP_ANY) + eq_(m.port_no, port_no) def test_port_status_thread_12(self): self._test_port_status_thread(ofproto_v1_2, ofproto_v1_2_parser)