From 692290fd67c04d1dbdd0502bc2bb56b7b0544104 Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Fri, 7 Feb 2014 23:10:51 +0900 Subject: [PATCH] apgw: add Meter suport - handle MeterMod message - log meter stats periodically Signed-off-by: FUJITA Tomonori --- ryu/controller/api.py | 43 +++++++++++ ryu/tests/unit/app/test_apgw_rpc.py | 107 ++++++++++++++++++++++++++++ 2 files changed, 150 insertions(+) diff --git a/ryu/controller/api.py b/ryu/controller/api.py index 6473cd0c..30398f45 100644 --- a/ryu/controller/api.py +++ b/ryu/controller/api.py @@ -65,6 +65,7 @@ class RpcOFPManager(app_manager.RyuApp): self.traceroute_source = {} self.monitored_ports = {} self.monitored_flows = {} + self.monitored_meters = {} self.pending_rpc_requests = [] self._rpc_events = hub.Queue(128) # per 30 secs by default @@ -248,6 +249,20 @@ class RpcOFPManager(app_manager.RyuApp): stats.update(self.monitored_ports[port.name]) self.logger.info(_(msg=stats, log_type='stats')) + @handler.set_ev_cls(ofp_event.EventOFPMeterStatsReply, + handler.MAIN_DISPATCHER) + def _meter_stats_reply_handler(self, ev): + msg = ev.msg + dp = msg.datapath + for stat in msg.body: + if stat.meter_id in self.monitored_meters: + contexts = self.monitored_meters[stat.meter_id] + stats = {'meter_id': stat.meter_id, + 'flow_count': stat.flow_count, + 'byte_in_count': stat.byte_in_count} + stats.update(contexts) + self.logger.info(_(msg=stats, log_type='stats')) + @handler.set_ev_cls(ofp_event.EventOFPStatsReply, handler.MAIN_DISPATCHER) def _stats_reply_handler(self, ev): @@ -335,6 +350,15 @@ class RpcOFPManager(app_manager.RyuApp): dp.send_msg(msg) hub.sleep(interval) + def _meter_stats_loop(self, dp, interval, meter_id): + while True: + if not meter_id in self.monitored_meters: + break + msg = dp.ofproto_parser.OFPMeterStatsRequest(datapath=dp, + meter_id=meter_id) + dp.send_msg(msg) + hub.sleep(interval) + def _handle_ofprotocol(self, msgid, params): try: param_dict = params[0] @@ -415,6 +439,25 @@ class RpcOFPManager(app_manager.RyuApp): del self.monitored_flows[key] except: raise RPCError('unknown key, %s' % (str(key))) + elif (dp.ofproto.OFP_VERSION == ofproto_v1_3.OFP_VERSION and + ofmsg.msg_type is dp.ofproto.OFPT_METER_MOD): + if contexts is not None: + if ofmsg.command is dp.ofproto.OFPMC_ADD: + if ofmsg.meter_id in self.monitored_meters: + raise RPCError('meter already exitsts %d' % + (ofmsg.meter_id)) + self.monitored_meters[ofmsg.meter_id] = contexts + hub.spawn(self._meter_stats_loop, + dp, interval, ofmsg.meter_id) + elif ofmsg.command is dp.ofproto.OFPMC_DELETE: + try: + del self.monitored_meters[ofmsg.meter_id] + except: + raise RPCError('unknown meter %d' % (ofmsg.meter_id)) + elif ofmsg.command is dp.ofproto.OFPMC_MODIFY: + raise RPCError('METER_MOD with contexts is not supported') + else: + raise RPCError('unknown meter_mod command') else: raise RPCError('unknown of message, %s' % (str(param_dict))) diff --git a/ryu/tests/unit/app/test_apgw_rpc.py b/ryu/tests/unit/app/test_apgw_rpc.py index d8cc2b7b..e1042804 100644 --- a/ryu/tests/unit/app/test_apgw_rpc.py +++ b/ryu/tests/unit/app/test_apgw_rpc.py @@ -211,6 +211,49 @@ class TestRpcOFPManager(unittest.TestCase): def test_handle_ofprotocol_flowmod_13(self): self._test_handle_ofprotocol_flowmod(ofproto_v1_3, ofproto_v1_3_parser) + def _test_handle_ofprotocol_meter_mod(self, ofp, ofpp): + dpid = 10 + msgid = 1 + nr_sent = 0 + dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp) + m = api.RpcOFPManager(dpset=dps) + + dp = dps.get(dpid) + bands = [ofpp.OFPMeterBandDrop(10, 100)] + meter_id = 10 + ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_ADD, + ofp.OFPMF_KBPS, meter_id, bands) + + r = m._handle_ofprotocol(msgid, [{'dpid': dpid, + 'ofmsg': ofmsg.to_jsondict()}]) + nr_sent += 1 + eq_(r, {'xid': 1}) + eq_(len(dp.sent), nr_sent) + + contexts = {'hello': 'world'} + r = m._handle_ofprotocol(msgid, [{'dpid': dpid, + 'ofmsg': ofmsg.to_jsondict(), + 'contexts': contexts}]) + nr_sent += 1 + eq_(r, {'xid': 1}) + eq_(len(dp.sent), nr_sent) + eq_(len(m.monitored_meters), 1) + eq_(m.monitored_meters[meter_id], contexts) + + ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_DELETE, ofp.OFPMF_KBPS, + meter_id) + r = m._handle_ofprotocol(msgid, [{'dpid': dpid, + 'ofmsg': ofmsg.to_jsondict(), + 'contexts': contexts}]) + nr_sent += 1 + eq_(r, {'xid': 1}) + eq_(len(dp.sent), nr_sent) + eq_(len(m.monitored_meters), 0) + + def test_handle_ofprotocol_meter_mod_13(self): + self._test_handle_ofprotocol_meter_mod(ofproto_v1_3, + ofproto_v1_3_parser) + def _test_handle_ofprotocol(self, ofp, ofpp): dpid = 10 dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp) @@ -448,6 +491,42 @@ class TestRpcOFPManager(unittest.TestCase): def test_flow_stats_loop_13(self): self._test_flow_stats_loop(ofproto_v1_3, ofproto_v1_3_parser) + def _test_meter_stats_loop(self, ofp, ofpp): + dpid = 10 + msgid = 1 + dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp) + dp = dps.get(dpid) + m = api.RpcOFPManager(dpset=dps) + bands = [ofpp.OFPMeterBandDrop(10, 100)] + meter_id = 10 + ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_ADD, + ofp.OFPMF_KBPS, meter_id, bands) + contexts = {'hello': 'world'} + r = m._handle_ofprotocol(msgid, [{'dpid': dpid, + 'ofmsg': ofmsg.to_jsondict(), + 'contexts': contexts}]) + threads = [] + with hub.Timeout(5): + threads.append(hub.spawn(m._meter_stats_loop, + dp, 0.1, meter_id)) + hub.sleep(0.5) + ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_DELETE, ofp.OFPMF_KBPS, + meter_id) + r = m._handle_ofprotocol(msgid, [{'dpid': dpid, + 'ofmsg': ofmsg.to_jsondict(), + 'contexts': contexts}]) + eq_(len(m.monitored_meters), 0) + hub.joinall(threads) + + for m in dp.sent: + if m.__class__ in (ofpp.OFPMeterMod, ofpp.OFPPortStatsRequest): + continue + eq_(m.__class__, ofpp.OFPMeterStatsRequest) + eq_(m.meter_id, ofmsg.meter_id) + + def test_meter_stats_loop_13(self): + self._test_meter_stats_loop(ofproto_v1_3, ofproto_v1_3_parser) + def _test_rpc_message_thread(self, ofp, ofpp): dpid = 10 dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp) @@ -598,3 +677,31 @@ class TestRpcOFPManager(unittest.TestCase): match=ofpp.OFPMatch(in_port=2)) msg.body = [s1, s2] manager._flow_stats_reply_handler(ev) + + def test_stats_reply_meter_handler_13(self): + ofp = ofproto_v1_3 + ofpp = ofproto_v1_3_parser + dpid = 10 + msgid = 9 + dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp) + m = api.RpcOFPManager(dpset=dps) + + dp = dps.get(dpid) + bands = [ofpp.OFPMeterBandDrop(10, 100)] + meter_id = 10 + ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_ADD, + ofp.OFPMF_KBPS, meter_id, bands) + contexts = {'hello': 'world'} + r = m._handle_ofprotocol(msgid, [{'dpid': dpid, + 'ofmsg': ofmsg.to_jsondict(), + 'contexts': contexts}]) + + msg = ofpp.OFPMeterStatsReply(datapath=dp) + ev = ofp_event.EventOFPStatsReply(msg) + s = ofpp.OFPMeterStats(meter_id=meter_id, flow_count=10, + packet_in_count=10, byte_in_count=10, + duration_sec=10, duration_nsec=10, + band_stats=[]) + + msg.body = [s] + m._meter_stats_reply_handler(ev)