apgw: add Meter suport

- handle MeterMod message
- log meter stats periodically

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
FUJITA Tomonori 2014-02-07 23:10:51 +09:00
parent 87fba4e199
commit 692290fd67
2 changed files with 150 additions and 0 deletions

View File

@ -65,6 +65,7 @@ class RpcOFPManager(app_manager.RyuApp):
self.traceroute_source = {}
self.monitored_ports = {}
self.monitored_flows = {}
self.monitored_meters = {}
self.pending_rpc_requests = []
self._rpc_events = hub.Queue(128)
# per 30 secs by default
@ -248,6 +249,20 @@ class RpcOFPManager(app_manager.RyuApp):
stats.update(self.monitored_ports[port.name])
self.logger.info(_(msg=stats, log_type='stats'))
@handler.set_ev_cls(ofp_event.EventOFPMeterStatsReply,
handler.MAIN_DISPATCHER)
def _meter_stats_reply_handler(self, ev):
msg = ev.msg
dp = msg.datapath
for stat in msg.body:
if stat.meter_id in self.monitored_meters:
contexts = self.monitored_meters[stat.meter_id]
stats = {'meter_id': stat.meter_id,
'flow_count': stat.flow_count,
'byte_in_count': stat.byte_in_count}
stats.update(contexts)
self.logger.info(_(msg=stats, log_type='stats'))
@handler.set_ev_cls(ofp_event.EventOFPStatsReply,
handler.MAIN_DISPATCHER)
def _stats_reply_handler(self, ev):
@ -335,6 +350,15 @@ class RpcOFPManager(app_manager.RyuApp):
dp.send_msg(msg)
hub.sleep(interval)
def _meter_stats_loop(self, dp, interval, meter_id):
while True:
if not meter_id in self.monitored_meters:
break
msg = dp.ofproto_parser.OFPMeterStatsRequest(datapath=dp,
meter_id=meter_id)
dp.send_msg(msg)
hub.sleep(interval)
def _handle_ofprotocol(self, msgid, params):
try:
param_dict = params[0]
@ -415,6 +439,25 @@ class RpcOFPManager(app_manager.RyuApp):
del self.monitored_flows[key]
except:
raise RPCError('unknown key, %s' % (str(key)))
elif (dp.ofproto.OFP_VERSION == ofproto_v1_3.OFP_VERSION and
ofmsg.msg_type is dp.ofproto.OFPT_METER_MOD):
if contexts is not None:
if ofmsg.command is dp.ofproto.OFPMC_ADD:
if ofmsg.meter_id in self.monitored_meters:
raise RPCError('meter already exitsts %d' %
(ofmsg.meter_id))
self.monitored_meters[ofmsg.meter_id] = contexts
hub.spawn(self._meter_stats_loop,
dp, interval, ofmsg.meter_id)
elif ofmsg.command is dp.ofproto.OFPMC_DELETE:
try:
del self.monitored_meters[ofmsg.meter_id]
except:
raise RPCError('unknown meter %d' % (ofmsg.meter_id))
elif ofmsg.command is dp.ofproto.OFPMC_MODIFY:
raise RPCError('METER_MOD with contexts is not supported')
else:
raise RPCError('unknown meter_mod command')
else:
raise RPCError('unknown of message, %s' % (str(param_dict)))

View File

@ -211,6 +211,49 @@ class TestRpcOFPManager(unittest.TestCase):
def test_handle_ofprotocol_flowmod_13(self):
self._test_handle_ofprotocol_flowmod(ofproto_v1_3, ofproto_v1_3_parser)
def _test_handle_ofprotocol_meter_mod(self, ofp, ofpp):
dpid = 10
msgid = 1
nr_sent = 0
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
m = api.RpcOFPManager(dpset=dps)
dp = dps.get(dpid)
bands = [ofpp.OFPMeterBandDrop(10, 100)]
meter_id = 10
ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_ADD,
ofp.OFPMF_KBPS, meter_id, bands)
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict()}])
nr_sent += 1
eq_(r, {'xid': 1})
eq_(len(dp.sent), nr_sent)
contexts = {'hello': 'world'}
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
nr_sent += 1
eq_(r, {'xid': 1})
eq_(len(dp.sent), nr_sent)
eq_(len(m.monitored_meters), 1)
eq_(m.monitored_meters[meter_id], contexts)
ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_DELETE, ofp.OFPMF_KBPS,
meter_id)
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
nr_sent += 1
eq_(r, {'xid': 1})
eq_(len(dp.sent), nr_sent)
eq_(len(m.monitored_meters), 0)
def test_handle_ofprotocol_meter_mod_13(self):
self._test_handle_ofprotocol_meter_mod(ofproto_v1_3,
ofproto_v1_3_parser)
def _test_handle_ofprotocol(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
@ -448,6 +491,42 @@ class TestRpcOFPManager(unittest.TestCase):
def test_flow_stats_loop_13(self):
self._test_flow_stats_loop(ofproto_v1_3, ofproto_v1_3_parser)
def _test_meter_stats_loop(self, ofp, ofpp):
dpid = 10
msgid = 1
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
dp = dps.get(dpid)
m = api.RpcOFPManager(dpset=dps)
bands = [ofpp.OFPMeterBandDrop(10, 100)]
meter_id = 10
ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_ADD,
ofp.OFPMF_KBPS, meter_id, bands)
contexts = {'hello': 'world'}
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
threads = []
with hub.Timeout(5):
threads.append(hub.spawn(m._meter_stats_loop,
dp, 0.1, meter_id))
hub.sleep(0.5)
ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_DELETE, ofp.OFPMF_KBPS,
meter_id)
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
eq_(len(m.monitored_meters), 0)
hub.joinall(threads)
for m in dp.sent:
if m.__class__ in (ofpp.OFPMeterMod, ofpp.OFPPortStatsRequest):
continue
eq_(m.__class__, ofpp.OFPMeterStatsRequest)
eq_(m.meter_id, ofmsg.meter_id)
def test_meter_stats_loop_13(self):
self._test_meter_stats_loop(ofproto_v1_3, ofproto_v1_3_parser)
def _test_rpc_message_thread(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
@ -598,3 +677,31 @@ class TestRpcOFPManager(unittest.TestCase):
match=ofpp.OFPMatch(in_port=2))
msg.body = [s1, s2]
manager._flow_stats_reply_handler(ev)
def test_stats_reply_meter_handler_13(self):
ofp = ofproto_v1_3
ofpp = ofproto_v1_3_parser
dpid = 10
msgid = 9
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
m = api.RpcOFPManager(dpset=dps)
dp = dps.get(dpid)
bands = [ofpp.OFPMeterBandDrop(10, 100)]
meter_id = 10
ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_ADD,
ofp.OFPMF_KBPS, meter_id, bands)
contexts = {'hello': 'world'}
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
msg = ofpp.OFPMeterStatsReply(datapath=dp)
ev = ofp_event.EventOFPStatsReply(msg)
s = ofpp.OFPMeterStats(meter_id=meter_id, flow_count=10,
packet_in_count=10, byte_in_count=10,
duration_sec=10, duration_nsec=10,
band_stats=[])
msg.body = [s]
m._meter_stats_reply_handler(ev)