From 9ce8698b21caa72bb1fd7c62eee297d677bbcce9 Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Sat, 22 Feb 2014 10:47:20 +0900 Subject: [PATCH] apgw: add queue stats support RPC API: monitor_queue [{'queue_id': 1, 'interval': 5, 'contexts': {'hello':'world'}}] if 'interval' is zero, ofwire stops monitoring the specified queue. Signed-off-by: FUJITA Tomonori --- ryu/controller/api.py | 70 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 64 insertions(+), 6 deletions(-) diff --git a/ryu/controller/api.py b/ryu/controller/api.py index 02917799..6b00d00d 100644 --- a/ryu/controller/api.py +++ b/ryu/controller/api.py @@ -82,6 +82,7 @@ class RpcOFPManager(app_manager.RyuApp): self.monitored_ports = {} self.monitored_flows = {} self.monitored_meters = {} + self.monitored_queues = {} self.pending_rpc_requests = [] self._rpc_events = hub.Queue(128) # per 30 secs by default @@ -105,6 +106,8 @@ class RpcOFPManager(app_manager.RyuApp): result = self._handle_ofprotocol(msgid, params) elif target_method == "monitor_port": result = self._monitor_port(msgid, params) + elif target_method == "monitor_queue": + result = self._monitor_queue(msgid, params) elif target_method == "ofconfig": self._ofconfig(peer, msgid, params) else: @@ -284,6 +287,22 @@ class RpcOFPManager(app_manager.RyuApp): stats.update(contexts) self.logger.info(_(msg=stats, log_type='stats')) + @handler.set_ev_cls(ofp_event.EventOFPQueueStatsReply, + handler.MAIN_DISPATCHER) + def _queue_stats_reply_handler(self, ev): + msg = ev.msg + dp = msg.datapath + for stat in msg.body: + if stat.queue_id in self.monitored_queues: + contexts, interval_ = self.monitored_queues[stat.queue_id] + stats = {'queue_id': stat.queue_id, + 'port_no': stat.port_no, + 'tx_bytes': stat.tx_bytes, + 'tx_packets': stat.tx_packets, + 'tx_errors': stat.tx_errors} + stats.update(contexts) + self.logger.info(_(msg=stats, log_type='stats')) + @handler.set_ev_cls(ofp_event.EventOFPStatsReply, handler.MAIN_DISPATCHER) def _stats_reply_handler(self, ev): @@ -626,28 +645,67 @@ class RpcOFPManager(app_manager.RyuApp): self.ofconfig.put((peer, msgid, params)) raise NoRPCResponse() - def _monitor_port(self, msgid, params): + def _monitor(self, resource_name, msgid, params): try: param_dict = params[0] except: raise RPCError('parameters are missing') name = None contexts = None + interval = 60 + for k, v in param_dict.items(): - if k == 'physical_port_no': + if k == resource_name: name = v elif k == 'contexts': contexts = v elif k == 'interval': - self.port_monitor_interval = v + interval = v else: raise RPCError('unknown parameters, %s' % k) - if contexts is None: + if contexts is None and interval > 0: raise RPCError('"contexts" parameter is necessary') - if not isinstance(contexts, dict): + if contexts is not None and not isinstance(contexts, dict): raise RPCError('"contexts" parameter must be dictionary') if name is None: - raise RPCError('"physical_port_no" parameter is necessary') + raise RPCError('"%s" parameter is necessary' % resource_name) + return name, contexts, interval + + def _monitor_port(self, msgid, params): + name, contexts, interval = self._monitor('physical_port_no', + msgid, params) self.monitored_ports[name] = contexts + self.port_monitor_interval = interval return {} + + def _monitor_queue(self, msgid, params): + queue_id, contexts, interval = self._monitor('queue_id', + msgid, params) + + if interval == 0: + if queue_id in self.monitored_queues: + del self.monitored_queues[queue_id] + else: + raise RPCError('queue id %d does not exist' % queue_id) + else: + need_spawn = False + if not queue_id in self.monitored_queues: + need_spawn = True + self.monitored_queues[queue_id] = (contexts, interval) + if need_spawn: + hub.spawn(self._monitor_queue_thread, queue_id) + return {} + + def _monitor_queue_thread(self, queue_id): + while queue_id in self.monitored_queues: + _contexts, interval = self.monitored_queues[queue_id] + for k, dp in self.dpset.get_all(): + try: + ofmsg = dp.ofproto_parser.OFPQueueStatsRequest( + datapath=dp, queue_id=queue_id) + dp.send_msg(ofmsg) + except: + # ignore the error due to dead datapath + pass + hub.sleep(interval)