apgw: add queue stats support

RPC API:

monitor_queue [{'queue_id': 1, 'interval': 5, 'contexts': {'hello':'world'}}]

if 'interval' is zero, ofwire stops monitoring the specified queue.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
FUJITA Tomonori 2014-02-22 10:47:20 +09:00
parent a7b6f0a036
commit 9ce8698b21

View File

@ -82,6 +82,7 @@ class RpcOFPManager(app_manager.RyuApp):
self.monitored_ports = {}
self.monitored_flows = {}
self.monitored_meters = {}
self.monitored_queues = {}
self.pending_rpc_requests = []
self._rpc_events = hub.Queue(128)
# per 30 secs by default
@ -105,6 +106,8 @@ class RpcOFPManager(app_manager.RyuApp):
result = self._handle_ofprotocol(msgid, params)
elif target_method == "monitor_port":
result = self._monitor_port(msgid, params)
elif target_method == "monitor_queue":
result = self._monitor_queue(msgid, params)
elif target_method == "ofconfig":
self._ofconfig(peer, msgid, params)
else:
@ -284,6 +287,22 @@ class RpcOFPManager(app_manager.RyuApp):
stats.update(contexts)
self.logger.info(_(msg=stats, log_type='stats'))
@handler.set_ev_cls(ofp_event.EventOFPQueueStatsReply,
handler.MAIN_DISPATCHER)
def _queue_stats_reply_handler(self, ev):
msg = ev.msg
dp = msg.datapath
for stat in msg.body:
if stat.queue_id in self.monitored_queues:
contexts, interval_ = self.monitored_queues[stat.queue_id]
stats = {'queue_id': stat.queue_id,
'port_no': stat.port_no,
'tx_bytes': stat.tx_bytes,
'tx_packets': stat.tx_packets,
'tx_errors': stat.tx_errors}
stats.update(contexts)
self.logger.info(_(msg=stats, log_type='stats'))
@handler.set_ev_cls(ofp_event.EventOFPStatsReply,
handler.MAIN_DISPATCHER)
def _stats_reply_handler(self, ev):
@ -626,28 +645,67 @@ class RpcOFPManager(app_manager.RyuApp):
self.ofconfig.put((peer, msgid, params))
raise NoRPCResponse()
def _monitor_port(self, msgid, params):
def _monitor(self, resource_name, msgid, params):
try:
param_dict = params[0]
except:
raise RPCError('parameters are missing')
name = None
contexts = None
interval = 60
for k, v in param_dict.items():
if k == 'physical_port_no':
if k == resource_name:
name = v
elif k == 'contexts':
contexts = v
elif k == 'interval':
self.port_monitor_interval = v
interval = v
else:
raise RPCError('unknown parameters, %s' % k)
if contexts is None:
if contexts is None and interval > 0:
raise RPCError('"contexts" parameter is necessary')
if not isinstance(contexts, dict):
if contexts is not None and not isinstance(contexts, dict):
raise RPCError('"contexts" parameter must be dictionary')
if name is None:
raise RPCError('"physical_port_no" parameter is necessary')
raise RPCError('"%s" parameter is necessary' % resource_name)
return name, contexts, interval
def _monitor_port(self, msgid, params):
name, contexts, interval = self._monitor('physical_port_no',
msgid, params)
self.monitored_ports[name] = contexts
self.port_monitor_interval = interval
return {}
def _monitor_queue(self, msgid, params):
queue_id, contexts, interval = self._monitor('queue_id',
msgid, params)
if interval == 0:
if queue_id in self.monitored_queues:
del self.monitored_queues[queue_id]
else:
raise RPCError('queue id %d does not exist' % queue_id)
else:
need_spawn = False
if not queue_id in self.monitored_queues:
need_spawn = True
self.monitored_queues[queue_id] = (contexts, interval)
if need_spawn:
hub.spawn(self._monitor_queue_thread, queue_id)
return {}
def _monitor_queue_thread(self, queue_id):
while queue_id in self.monitored_queues:
_contexts, interval = self.monitored_queues[queue_id]
for k, dp in self.dpset.get_all():
try:
ofmsg = dp.ofproto_parser.OFPQueueStatsRequest(
datapath=dp, queue_id=queue_id)
dp.send_msg(ofmsg)
except:
# ignore the error due to dead datapath
pass
hub.sleep(interval)