apgw: fix port stats

Support interval per port. refactor to use the same code for port and
queue stats.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
FUJITA Tomonori 2014-02-25 17:17:58 +09:00
parent 9ce8698b21
commit 278bc8fe00
2 changed files with 70 additions and 54 deletions

View File

@ -21,6 +21,7 @@ from ryu.lib.of_config import capable_switch as cs
from ryu.lib.of_config import constants as consts
import ryu.lib.of_config.classes as ofc
import eventlet
import sys
_ = type('', (apgw.StructuredMessage,), {})
@ -86,10 +87,8 @@ class RpcOFPManager(app_manager.RyuApp):
self.pending_rpc_requests = []
self._rpc_events = hub.Queue(128)
# per 30 secs by default
self.port_monitor_interval = 30
self.ofconfig = hub.Queue(128)
hub.spawn(self._peer_accept_thread)
hub.spawn(self._port_status_thread)
hub.spawn(self._rpc_message_thread)
hub.spawn(self._ofconfig_thread)
apgw.update_syslog_format()
@ -155,17 +154,6 @@ class RpcOFPManager(app_manager.RyuApp):
self.peer_accept_handler)
server.serve_forever()
def _port_status_thread(self):
while True:
for k, dp in self.dpset.get_all():
try:
ofmsg = dp.ofproto_parser.OFPPortStatsRequest(datapath=dp)
dp.send_msg(ofmsg)
except:
# ignore the error due to dead datapath
pass
hub.sleep(self.port_monitor_interval)
def _send_waited_rpc_response(self, msg):
for peer in self._peers:
if not msg.datapath.id in peer.wait_for_ofp_resepnse:
@ -270,7 +258,8 @@ class RpcOFPManager(app_manager.RyuApp):
if port.name in self.monitored_ports:
stats = {'physical_port_no': port.name}
stats.update(stat.to_jsondict()['OFPPortStats'])
stats.update(self.monitored_ports[port.name])
contexts, interval_ = self.monitored_ports[port.name]
stats.update(contexts)
self.logger.info(_(msg=stats, log_type='stats'))
@handler.set_ev_cls(ofp_event.EventOFPMeterStatsReply,
@ -645,18 +634,19 @@ class RpcOFPManager(app_manager.RyuApp):
self.ofconfig.put((peer, msgid, params))
raise NoRPCResponse()
def _monitor(self, resource_name, msgid, params):
def _monitor(self, resource_name, resource_dict, request_generator,
msgid, params):
try:
param_dict = params[0]
except:
raise RPCError('parameters are missing')
name = None
resource_id = None
contexts = None
interval = 60
for k, v in param_dict.items():
if k == resource_name:
name = v
resource_id = v
elif k == 'contexts':
contexts = v
elif k == 'interval':
@ -668,44 +658,63 @@ class RpcOFPManager(app_manager.RyuApp):
raise RPCError('"contexts" parameter is necessary')
if contexts is not None and not isinstance(contexts, dict):
raise RPCError('"contexts" parameter must be dictionary')
if name is None:
if resource_id is None:
raise RPCError('"%s" parameter is necessary' % resource_name)
return name, contexts, interval
def _monitor_port(self, msgid, params):
name, contexts, interval = self._monitor('physical_port_no',
msgid, params)
self.monitored_ports[name] = contexts
self.port_monitor_interval = interval
return {}
def _monitor_queue(self, msgid, params):
queue_id, contexts, interval = self._monitor('queue_id',
msgid, params)
if interval == 0:
if queue_id in self.monitored_queues:
del self.monitored_queues[queue_id]
if resource_id in resource_dict:
del resource_dict[resource_id]
else:
raise RPCError('queue id %d does not exist' % queue_id)
raise RPCError('%s %d does not exist' % (resource_name,
resource_id))
else:
need_spawn = False
if not queue_id in self.monitored_queues:
if not resource_id in resource_dict:
need_spawn = True
self.monitored_queues[queue_id] = (contexts, interval)
resource_dict[resource_id] = (contexts, interval)
if need_spawn:
hub.spawn(self._monitor_queue_thread, queue_id)
pass
hub.spawn(self._monitor_thread, resource_id, resource_dict,
request_generator)
return {}
def _monitor_queue_thread(self, queue_id):
while queue_id in self.monitored_queues:
_contexts, interval = self.monitored_queues[queue_id]
def _port_stats_generator(self, dp, port_name):
port_no = None
ports = self.dpset.get_ports(dp.id)
for port in ports:
if port.name == port_name:
port_no = port.port_no
break
if port_no is None:
return None
return dp.ofproto_parser.OFPPortStatsRequest(datapath=dp,
port_no=port_no)
def _monitor_port(self, msgid, params):
return self._monitor('physical_port_no',
self.monitored_ports,
self._port_stats_generator,
msgid, params)
def _monitor_thread(self, resource_id, resource_dict, generator):
while resource_id in resource_dict:
_contexts, interval = resource_dict[resource_id]
for k, dp in self.dpset.get_all():
try:
ofmsg = dp.ofproto_parser.OFPQueueStatsRequest(
datapath=dp, queue_id=queue_id)
dp.send_msg(ofmsg)
ofmsg = generator(dp, resource_id)
if ofmsg:
dp.send_msg(ofmsg)
except:
# ignore the error due to dead datapath
pass
hub.sleep(interval)
def _queue_stats_generator(self, dp, queue_id):
return dp.ofproto_parser.OFPQueueStatsRequest(datapath=dp,
queue_id=queue_id)
def _monitor_queue(self, msgid, params):
return self._monitor('queue_id',
self.monitored_queues,
self._queue_stats_generator,
msgid, params)

View File

@ -86,17 +86,19 @@ class TestRpcOFPManager(unittest.TestCase):
except api.RPCError as e:
pass
port_name = 'OFP11'
interval = 10
try:
m._monitor_port(msgid, {'physical_port_no': 1})
m._monitor_port(msgid, {'physical_port_no': port_name})
except api.RPCError as e:
pass
contents = {'hoge': 'jail'}
r = m._monitor_port(msgid, [{'physical_port_no': 1,
r = m._monitor_port(msgid, [{'physical_port_no': port_name,
'contexts': contents,
'interval': 30}])
'interval': interval}])
eq_(r, {})
eq_(m.monitored_ports[1], contents)
eq_(m.monitored_ports[port_name], (contents, interval))
def test_register_traceroute(self):
m = api.RpcOFPManager(dpset=None)
@ -137,8 +139,8 @@ class TestRpcOFPManager(unittest.TestCase):
if ports:
class DummyPort(object):
def __init__(self, port_no):
self.name = port_no
dps.ports = dict(map((lambda n: (n, DummyPort(n))), ports))
self.port_no = port_no
dps.ports = map(lambda n: DummyPort(n), ports)
dps.get_ports = lambda dpid: dps.ports
dps._register(dp)
@ -423,23 +425,28 @@ class TestRpcOFPManager(unittest.TestCase):
def _test_port_status_thread(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
port_no = 1
port_name = 'OFP11'
dps = self._create_dpset(dpid, ports=(port_no,), ofp=ofp, ofpp=ofpp)
dp = dps.get(dpid)
m = api.RpcOFPManager(dpset=dps)
m.port_monitor_interval = 10
p = dps.get_ports(dpid)
p[0].name = port_name
threads = []
m.monitored_ports[port_name] = ({}, 1)
with hub.Timeout(2):
threads.append(hub.spawn(m._port_status_thread))
threads.append(hub.spawn(m._monitor_thread, port_name,
m.monitored_ports,
m._port_stats_generator))
hub.sleep(0.5)
for t in threads:
hub.kill(t)
hub.joinall(threads)
assert len(dp.sent)
for m in dp.sent:
eq_(m.__class__, ofpp.OFPPortStatsRequest)
eq_(m.port_no, ofp.OFPP_ANY)
eq_(m.port_no, port_no)
def test_port_status_thread_12(self):
self._test_port_status_thread(ofproto_v1_2, ofproto_v1_2_parser)