From c30ec9db08903da23bad834aea2b8e4f8f22a2c8 Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Wed, 23 Oct 2013 00:00:49 +0900 Subject: [PATCH] use json format for logging Signed-off-by: FUJITA Tomonori --- ryu/controller/api.py | 75 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 16 deletions(-) diff --git a/ryu/controller/api.py b/ryu/controller/api.py index ab3f2088..778c7f87 100644 --- a/ryu/controller/api.py +++ b/ryu/controller/api.py @@ -4,6 +4,7 @@ import datetime import time import logging import json +import msgpack from oslo.config import cfg from ryu.base import app_manager from ryu.controller import handler @@ -39,6 +40,9 @@ STATS = logging.getLogger('apgw') STATS.addHandler(logging.FileHandler(CONF.stats_file, mode='w')) STATS.setLevel(logging.INFO) + +log = logging.getLogger() + def format_key(match_json): del match_json['OFPMatch']['length'] for t in match_json['OFPMatch']['oxm_fields']: @@ -49,6 +53,20 @@ def format_key(match_json): return str(match_json) +class StructuredMessage(object): + def __init__(self, message): + self.message = message + cur_time = datetime.datetime.utcfromtimestamp( + time.time()).strftime('%Y-%m-%dT%H:%M:%S.%fZ') + self.message['timestamp'] = cur_time + self.message['component'] = 'ofwire' + + def __str__(self): + return json.dumps(self.message) + +_ = StructuredMessage + + class OFWireRpcSession(object): def __init__(self, socket, dpset): self.socket = socket @@ -61,8 +79,20 @@ class OFWireRpcSession(object): def _send(self): while True: - m = self.send_queue.get() - self.socket.sendall(m) + msg = self.send_queue.get() + try: + m = msgpack.loads(msg) + log.info(_({"event": "send rpc response", + "msgid": m[1], + "error": m[2], + "result": m[3]})) + except: + pass + + try: + self.socket.sendall(msg) + except: + pass def _flow_stats_loop(self, dp, table_id, match, interval, key): while True: @@ -91,7 +121,8 @@ class OFWireRpcSession(object): break if dp is None: - print 'no datapath, queued', msg + log.info(_({"event": "no datapath, queued", + "msg": str(msg)})) self.pending.append(msg) return @@ -154,7 +185,8 @@ class OFWireRpcSession(object): 'ip': params['ip'], 'port': params['port'] } - print traceroute_source + log.info(_({"event": "register traceroute source", + "ip address": params['ip'], "port": params['port']})) def monitor_port(self, msg): param_dict = msg[3][0] @@ -180,6 +212,13 @@ class OFWireRpcSession(object): def _handle_rpc_message(self, m): if m[0] == RpcMessage.REQUEST: + try: + log.info(_({"event": "recv RPC request", + "method": m[2], "msgid": m[1], + "params": m[3]})) + except: + pass + if m[2] == 'ofp': self.ofp_handle_request(m) elif m[2] == 'monitor_port': @@ -187,10 +226,18 @@ class OFWireRpcSession(object): elif m[0] == RpcMessage.RESPONSE: pass elif m[0] == RpcMessage.NOTIFY: + try: + log.info(_({"event": "recv RPC notification", + "method": m[1], + "params": m[2]})) + except: + pass + if m[1] == 'traceroute': self._tr_handle_notify(m) else: - print "invalid type", m[0] + log.info(_({"event": "invalid RPC message", + "type": m[0]})) def serve(self): while True: @@ -198,7 +245,6 @@ class OFWireRpcSession(object): for idx in range(len(self.pending)): msg = self.pending.pop(0) - print "found pending", msg self._handle_rpc_message(msg) if len(rready) > 0: @@ -251,19 +297,16 @@ class RPCApi(app_manager.RyuApp): in_port = f.value if in_port is None: - print "in_port is missing" return pkt = packet.Packet(msg.data) if not ipv4.ipv4 in pkt: - print "ip header doesn't exit" return if vlan.vlan in pkt: o_vlan = pkt.get_protocols(vlan.vlan)[0] vlan_p = vlan.vlan(vid=o_vlan.vid) else: - print "vlan header doesn't exit" return o_eth = pkt.get_protocols(ethernet.ethernet)[0] @@ -368,19 +411,19 @@ class RPCApi(app_manager.RyuApp): @handler.set_ev_cls(ofp_event.EventOFPPacketIn) def packet_in_handler(self, ev): msg = ev.msg - print "trace" + log.info(_({"event": "packet_in", "reason": msg.reason})) if ofproto_v1_2.OFPR_INVALID_TTL == msg.reason: - print "zero ttl packet" self.handle_traceroute(msg) return @handler.set_ev_cls(dpset.EventDP) def handler_datapath(self, ev): if ev.enter: - print 'dp connected (id: %x)' % ev.dp.id + log.info(_({"event": "dp connected", + "dpid": ev.dp.id})) else: - print 'dp disconnected' - + log.info(_({"event": "dp disconnected"})) + if ev.enter: dp = ev.dp m = dp.ofproto_parser.OFPSetConfig(dp, 1 << 2, miss_send_len=1600) @@ -401,9 +444,10 @@ class RPCApi(app_manager.RyuApp): datapath = msg.datapath port = msg.desc ofproto = datapath.ofproto + log.info(_({"event": "port status change", "reason": reason, + "port_no": port.port_no, "state": port.state})) # For now just port modifications are reported if reason == ofproto.OFPPR_MODIFY: - print port.port_no, port.state params = {'port_no': port.port_no, 'port_state': port.state} for s in self.sessions: m = s.session.create_notification('port_status', params) @@ -411,7 +455,6 @@ class RPCApi(app_manager.RyuApp): @handler.set_ev_cls(ofp_event.EventOFPPortStatus) def port_status_handler(self, ev): - print "port status called" if hasattr(ev, 'msg'): msg = ev.msg self.handle_port_status(msg)