apgw: specify port_no for queuestats

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
FUJITA Tomonori 2014-03-25 18:58:03 +09:00
parent fd81201a52
commit be8f4d656a
2 changed files with 39 additions and 9 deletions

View File

@ -667,7 +667,7 @@ class RpcOFPManager(app_manager.RyuApp):
self.ofconfig.put((peer, msgid, params))
raise NoRPCResponse()
def _monitor(self, resource_name, resource_dict, request_generator,
def _monitor(self, mandatory_params, resource_dict, request_generator,
msgid, params):
try:
param_dict = params[0]
@ -677,6 +677,7 @@ class RpcOFPManager(app_manager.RyuApp):
contexts = None
interval = 60
resource_name = mandatory_params.pop(0)
for k, v in param_dict.items():
if k == resource_name:
resource_id = v
@ -684,6 +685,8 @@ class RpcOFPManager(app_manager.RyuApp):
contexts = v
elif k == 'interval':
interval = v
elif k in mandatory_params:
pass
else:
raise RPCError('unknown parameters, %s' % k)
@ -708,10 +711,10 @@ class RpcOFPManager(app_manager.RyuApp):
if need_spawn:
pass
hub.spawn(self._monitor_thread, resource_id, resource_dict,
request_generator)
param_dict, request_generator)
return {}
def _port_stats_generator(self, dp, port_name):
def _port_stats_generator(self, dp, port_name, param_dict):
port_no = None
ports = self.dpset.get_ports(dp.id)
for port in ports:
@ -724,17 +727,18 @@ class RpcOFPManager(app_manager.RyuApp):
port_no=port_no)
def _monitor_port(self, msgid, params):
return self._monitor('physical_port_no',
return self._monitor(['physical_port_no'],
self.monitored_ports,
self._port_stats_generator,
msgid, params)
def _monitor_thread(self, resource_id, resource_dict, generator):
def _monitor_thread(self, resource_id, resource_dict, param_dict,
generator):
while resource_id in resource_dict:
_contexts, interval = resource_dict[resource_id]
for k, dp in self.dpset.get_all():
try:
ofmsg = generator(dp, resource_id)
ofmsg = generator(dp, resource_id, param_dict)
if ofmsg:
dp.send_msg(ofmsg)
except:
@ -742,12 +746,14 @@ class RpcOFPManager(app_manager.RyuApp):
pass
hub.sleep(interval)
def _queue_stats_generator(self, dp, queue_id):
def _queue_stats_generator(self, dp, queue_id, param_dict):
port_no = param_dict['port_no']
return dp.ofproto_parser.OFPQueueStatsRequest(datapath=dp,
port_no=port_no,
queue_id=queue_id)
def _monitor_queue(self, msgid, params):
return self._monitor('queue_id',
return self._monitor(['queue_id', 'port_no'],
self.monitored_queues,
self._queue_stats_generator,
msgid, params)

View File

@ -100,6 +100,30 @@ class TestRpcOFPManager(unittest.TestCase):
eq_(r, {})
eq_(m.monitored_ports[port_name], (contents, interval))
def test_monitor_queue(self):
m = api.RpcOFPManager(dpset=None)
msgid = 1
try:
m._monitor_queue(msgid, {})
except api.RPCError as e:
pass
queue_id = 10
port_no = 10
interval = 10
try:
m._monitor_queue(msgid, {'queue_id': queue_id, 'port_no': port_no})
except api.RPCError as e:
pass
contents = {'hoge': 'jail'}
r = m._monitor_queue(msgid, [{'queue_id': queue_id,
'port_no': port_no,
'contexts': contents,
'interval': interval}])
eq_(r, {})
eq_(m.monitored_queues[queue_id], (contents, interval))
def test_register_traceroute(self):
m = api.RpcOFPManager(dpset=None)
msgid = 1
@ -437,7 +461,7 @@ class TestRpcOFPManager(unittest.TestCase):
m.monitored_ports[port_name] = ({}, 1)
with hub.Timeout(2):
threads.append(hub.spawn(m._monitor_thread, port_name,
m.monitored_ports,
m.monitored_ports, {},
m._port_stats_generator))
hub.sleep(0.5)
for t in threads: