Compare commits

...

25 Commits

Author SHA1 Message Date
FUJITA Tomonori
6e804f3e99 log an invalid RPC format
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-23 02:37:48 +09:00
FUJITA Tomonori
0438bb3420 use syslogd
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-23 00:51:19 +09:00
FUJITA Tomonori
c30ec9db08 use json format for logging
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-23 00:00:49 +09:00
FUJITA Tomonori
8e49e9eb33 clean up log setup
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-22 21:39:22 +09:00
FUJITA Tomonori
151a80d7e7 fix match comparison
OVS (flowmod) returns a match that is different from a match that we
sent with FlowMod.

This is really hacky. Needs to fix properly later.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-18 09:12:23 +09:00
FUJITA Tomonori
1206d2ed0e fix barrier and flowstat from outside
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-17 08:01:44 +09:00
FUJITA Tomonori
dcbcb2b6d6 don't access to dp when dp has gone
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-17 00:27:20 +09:00
FUJITA Tomonori
5646e3f963 fix wrong usage of dp.ports
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-16 23:11:17 +09:00
FUJITA Tomonori
b5174ce2ab send secure_channel_state notificaiton
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-16 18:44:51 +09:00
Karthik Ramasubramanian
2cfa9b6f2c dump in json format 2013-10-14 19:37:13 -07:00
FUJITA Tomonori
9b8577ccaa update the timestamp format
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-15 07:20:22 +09:00
FUJITA Tomonori
9282101c43 dump table_id and match in flowstats
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-14 20:10:57 +09:00
FUJITA Tomonori
6c10625f55 rename the name of default stats log file
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-14 13:46:23 +09:00
FUJITA Tomonori
d35c2d82dc fix icmp initialization
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-10 10:29:32 +09:00
FUJITA Tomonori
b34a0f2491 fix ether header len
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-10 09:35:49 +09:00
FUJITA Tomonori
39edcabd3d write stats info to file
add '--stats-file' option to specify a file that stats info is written
to. The default is '/tmp/stats.log'.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-10 07:47:39 +09:00
FUJITA Tomonori
824069e3d0 fix traceroute notification regression
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-10 06:18:25 +09:00
FUJITA Tomonori
b337333afc fix handling pending silly bug
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-04 11:36:08 +09:00
FUJITA Tomonori
978513e833 handle datapath join race and disconnection
If no datapath is avalable, requests are queued and will be executed later.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-04 11:09:08 +09:00
FUJITA Tomonori
1b063ab24d improve datapath connect/disconnect log message
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-04 09:02:28 +09:00
FUJITA Tomonori
2b58257d13 return 'no datapath' error instead of 'invalid'
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-10-04 08:41:48 +09:00
FUJITA Tomonori
80b3d916fe handle dp race
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-09-28 05:42:59 -07:00
FUJITA Tomonori
456f149621 add monitor_port API support
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-09-26 18:11:02 -07:00
FUJITA Tomonori
d6f41a3334 Add APGW RPC API
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-09-26 18:11:02 -07:00
FUJITA Tomonori
dc00ee2398 add simple rpc library
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2013-09-26 18:11:02 -07:00
3 changed files with 508 additions and 1 deletions

469
ryu/controller/api.py Normal file
View File

@ -0,0 +1,469 @@
import eventlet
import select
import datetime
import time
import logging
import json
import platform
import msgpack
from oslo.config import cfg
from ryu.base import app_manager
from ryu.controller import handler
from ryu.controller import dpset
from ryu.controller import ofp_event
from ryu.ofproto.ofproto_parser import MsgBase
from ryu.lib.rpc import RpcSession, RpcMessage
from ryu.ofproto import ofproto_parser
from ryu.ofproto import ofproto_v1_0
from ryu.ofproto import ofproto_v1_0_parser
from ryu.ofproto import ofproto_v1_2
from ryu.ofproto import ofproto_v1_2_parser
from ryu.ofproto import ofproto_v1_3
from ryu.ofproto import ofproto_v1_3_parser
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet
from ryu.lib.packet import arp
from ryu.lib.packet import vlan
from ryu.lib.packet import ipv4
from ryu.lib.packet import ipv6
from ryu.lib.packet import udp
from ryu.lib.packet import tcp
from ryu.lib.packet import icmp
from ryu.lib import mac
traceroute_source = {}
flow_sem = eventlet.semaphore.Semaphore()
monitored_flows = {}
monitored_ports = {'interval': 15}
CONF = cfg.CONF
STATS = logging.getLogger('apgw')
STATS.addHandler(logging.FileHandler(CONF.stats_file, mode='w'))
STATS.setLevel(logging.INFO)
log = logging.getLogger('sys')
log.setLevel(logging.DEBUG)
if platform.system() == 'Darwin':
address = '/var/run/syslog'
else:
address = '/dev/log'
syslog = logging.handlers.SysLogHandler(address=address)
log.addHandler(syslog)
def format_key(match_json):
del match_json['OFPMatch']['length']
for t in match_json['OFPMatch']['oxm_fields']:
tlv = t['OXMTlv']
if tlv['field'] in ['ipv4_dst', 'ipv4_src']:
if tlv['mask'] == '255.255.255.255':
tlv['mask'] = None
return str(match_json)
class StructuredMessage(object):
def __init__(self, message):
self.message = message
cur_time = datetime.datetime.utcfromtimestamp(
time.time()).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
self.message['timestamp'] = cur_time
self.message['component'] = 'ofwire'
def __str__(self):
return json.dumps(self.message)
_ = StructuredMessage
class OFWireRpcSession(object):
def __init__(self, socket, dpset):
self.socket = socket
self.dpset = dpset
self.session = RpcSession()
self.pending = []
self.pool = eventlet.GreenPool()
self.send_queue = eventlet.queue.Queue()
self.pool.spawn_n(self._send)
def _send(self):
while True:
msg = self.send_queue.get()
try:
m = msgpack.loads(msg)
log.info(_({"event": "send rpc response",
"msgid": m[1],
"error": m[2],
"result": m[3]}))
except:
pass
try:
self.socket.sendall(msg)
except:
pass
def _flow_stats_loop(self, dp, table_id, match, interval, key):
while True:
if not key in monitored_flows:
break
msg = dp.ofproto_parser.OFPFlowStatsRequest(dp,
table_id,
dp.ofproto.OFPP_ANY,
dp.ofproto.OFPG_ANY,
0, 0,
match)
dp.send_msg(msg)
eventlet.sleep(interval)
def ofp_handle_request(self, msg):
send_response = True
param_dict = msg[3][0]
if 'dpid' in param_dict:
dp = self.dpset.get(int(param_dict['dpid']))
else:
dp = None
if dp is None:
for k, v in self.dpset.get_all():
dp = v
break
if dp is None:
log.info(_({"event": "no datapath, queued",
"msg": str(msg)}))
self.pending.append(msg)
return
ofmsg = None
# default interval
interval = 60
contexts = None
for k, v in param_dict.items():
if k == 'dpid':
continue
elif k == 'ofmsg':
try:
ofmsg = ofproto_parser.ofp_msg_from_jsondict(dp, v)
except:
log.warning(_({"event": "invalid ofmsg format",
"ofmsg": v}))
elif k == 'interval':
interval = int(v)
elif k == 'contexts':
contexts = v
if ofmsg is None or (contexts and interval == 0):
m = self.session.create_response(msg[1], None,
[{'error': 'invalid'}])
self.send_queue.put(m)
return
dp.set_xid(ofmsg)
ofmsg.serialize()
if ofmsg.msg_type in (dp.ofproto.OFPT_STATS_REQUEST,
dp.ofproto.OFPT_BARRIER_REQUEST):
self.waiters[dp.id] = (ofmsg.xid, msg[1])
else:
error = 0
result = {'xid': ofmsg.xid}
if contexts:
flow_sem.acquire()
key = format_key(ofmsg.match.to_jsondict())
if ofmsg.command is dp.ofproto.OFPFC_ADD:
if key in monitored_flows:
error = None
result = [{'error': 'the existing flow'}]
else:
monitored_flows[key] = contexts
self.pool.spawn_n(self._flow_stats_loop,
dp, ofmsg.table_id, ofmsg.match,
interval, key)
elif ofmsg.command in (dp.ofproto.OFPFC_DELETE,
dp.ofproto.OFPFC_DELETE_STRICT):
if key in monitored_flows:
del monitored_flows[key]
flow_sem.release()
r = self.session.create_response(msg[1], error, result)
self.send_queue.put(r)
dp.send_msg(ofmsg)
def _tr_handle_notify(self, msg):
params = msg[2][0]
traceroute_source[params['vlan']] = {
'ip': params['ip'],
'port': params['port']
}
log.info(_({"event": "register traceroute source",
"ip address": params['ip'], "port": params['port']}))
def monitor_port(self, msg):
param_dict = msg[3][0]
name = None
contexts = None
for k, v in param_dict.items():
if k == 'physical_port_no':
name = v
elif k == 'contexts':
contexts = v
elif k == 'interval':
monitored_ports['interval'] = v
if not contexts or not name:
m = self.session.create_response(msg[1], None,
[{'error': 'invalid'}])
self.send_queue.put(m)
return
monitored_ports[name] = contexts
r = self.session.create_response(msg[1], 0, [])
self.send_queue.put(r)
def _handle_rpc_message(self, m):
if m[0] == RpcMessage.REQUEST:
try:
log.info(_({"event": "recv RPC request",
"method": m[2], "msgid": m[1],
"params": m[3]}))
except:
pass
if m[2] == 'ofp':
self.ofp_handle_request(m)
elif m[2] == 'monitor_port':
self.monitor_port(m)
elif m[0] == RpcMessage.RESPONSE:
pass
elif m[0] == RpcMessage.NOTIFY:
try:
log.info(_({"event": "recv RPC notification",
"method": m[1],
"params": m[2]}))
except:
pass
if m[1] == 'traceroute':
self._tr_handle_notify(m)
else:
log.info(_({"event": "invalid RPC message",
"type": m[0]}))
def serve(self):
while True:
rready, _, _ = select.select([self.socket], [], [], 5)
for idx in range(len(self.pending)):
msg = self.pending.pop(0)
self._handle_rpc_message(msg)
if len(rready) > 0:
ret = self.socket.recv(4096)
if len(ret) == 0:
break
for m in self.session.get_messages(ret):
self._handle_rpc_message(m)
class RPCApi(app_manager.RyuApp):
_CONTEXTS = {
'dpset': dpset.DPSet,
}
def __init__(self, *args, **kwargs):
self.dpset = kwargs['dpset']
super(RPCApi, self).__init__(*args, **kwargs)
self.server = eventlet.listen(('', 50001))
self.pool = eventlet.GreenPool()
self.sessions = []
self.dp_joined = False
self.pool.spawn_n(self.serve)
self.pool.spawn_n(self._port_status_loop)
def _port_status_loop(self):
while True:
for k, dp in self.dpset.get_all():
try:
port = dp.ofproto.OFPP_ANY
ofmsg = dp.ofproto_parser.OFPPortStatsRequest(dp, port)
dp.send_msg(ofmsg)
except:
pass
eventlet.sleep(monitored_ports['interval'])
def serve(self):
while True:
sock, address = self.server.accept()
session = OFWireRpcSession(sock, self.dpset)
session.waiters = {}
self.sessions.append(session)
self.pool.spawn_n(session.serve)
def handle_traceroute(self, msg):
dp = msg.datapath
in_port = None
for f in msg.match.fields:
if f.header == ofproto_v1_2.OXM_OF_IN_PORT:
in_port = f.value
if in_port is None:
return
pkt = packet.Packet(msg.data)
if not ipv4.ipv4 in pkt:
return
if vlan.vlan in pkt:
o_vlan = pkt.get_protocols(vlan.vlan)[0]
vlan_p = vlan.vlan(vid=o_vlan.vid)
else:
return
o_eth = pkt.get_protocols(ethernet.ethernet)[0]
eth = ethernet.ethernet(o_eth.src, o_eth.dst, o_eth.ethertype)
o_ip = pkt.get_protocols(ipv4.ipv4)[0]
# needs to set src properly for either side (vlan or mpls)
# ip = ipv4.ipv4(src=ip_lib.ipv4_to_bin(V1_GS_IP), dst=o_ip.src,
# proto=1)
src_ip = traceroute_source[o_vlan.vid]['ip']
in_port = traceroute_source[o_vlan.vid]['port']
ip = ipv4.ipv4(src=src_ip, dst=o_ip.src, proto=1)
ip_offset = 14 + 4
# ether + vlan headers
data = msg.data[ip_offset:ip_offset +
(o_ip.header_length * 4 + 8)]
ic = icmp.icmp(icmp.ICMP_TIME_EXCEEDED, 0, 0,
icmp.TimeExceeded(data_len=len(data), data=data))
proto_list = [eth, vlan_p, ip, ic]
p = packet.Packet(protocols=proto_list)
p.serialize()
self.send_openflow_packet(dp=dp, packet=p.data,
port_no=ofproto_v1_2.OFPP_TABLE,
inport=in_port)
def send_openflow_packet(self, dp, packet, port_no,
inport=ofproto_v1_2.OFPP_CONTROLLER):
actions = [dp.ofproto_parser.OFPActionOutput(port_no, 0)]
dp.send_packet_out(in_port=inport, actions=actions, data=packet)
def _ofp_reply(self, msg):
for sess in self.sessions:
if msg.datapath.id in sess.waiters:
(xid, msgid) = sess.waiters[msg.datapath.id]
results = msg.to_jsondict()
r = sess.session.create_response(msgid, None, results)
sess.send_queue.put(r)
def compare_key(self, k1, k2):
k1 = eval(k1)
k2 = eval(k2)
l1 = k1['OFPMatch']['oxm_fields']
l2 = k2['OFPMatch']['oxm_fields']
return sorted(l1) == sorted(l2)
@handler.set_ev_cls(ofp_event.EventOFPBarrierReply,
handler.MAIN_DISPATCHER)
def barrier_reply_handler(self, ev):
msg = ev.msg
self._ofp_reply(msg)
@handler.set_ev_cls(ofp_event.EventOFPStatsReply,
handler.MAIN_DISPATCHER)
def flow_reply_handler(self, ev):
msg = ev.msg
timestamp = time.time()
cur_time = datetime.datetime.utcfromtimestamp(
timestamp).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
self._ofp_reply(msg)
dp = msg.datapath
if msg.type == ofproto_v1_2.OFPST_FLOW:
for body in msg.body:
key = format_key(body.match.to_jsondict())
contexts = None
flow_sem.acquire()
for k in monitored_flows.keys():
if self.compare_key(k, key):
contexts = monitored_flows[k]
flow_sem.release()
if contexts:
stats = {'byte_count': body.byte_count,
'packet_count': body.packet_count,
'match': body.match.to_jsondict(),
#'inst': body.instructions,
'table_id': body.table_id}
stats.update(contexts)
stats['timestamp'] = cur_time
STATS.info(json.dumps(stats))
elif msg.type == ofproto_v1_2.OFPST_PORT:
for body in msg.body:
try:
ports = self.dpset.get_ports(dp.id)
try:
port_name = ports[body.port_no].name
except:
continue
if port_name in monitored_ports:
stats = {'timestamp': cur_time,
'physical_port_no': port_name}
stats.update(body.to_jsondict()['OFPPortStats'])
stats.update(monitored_ports[port_name])
STATS.info(json.dumps(stats))
except:
pass
@handler.set_ev_cls(ofp_event.EventOFPPacketIn)
def packet_in_handler(self, ev):
msg = ev.msg
log.info(_({"event": "packet_in", "reason": msg.reason}))
if ofproto_v1_2.OFPR_INVALID_TTL == msg.reason:
self.handle_traceroute(msg)
return
@handler.set_ev_cls(dpset.EventDP)
def handler_datapath(self, ev):
if ev.enter:
log.info(_({"event": "dp connected",
"dpid": ev.dp.id}))
else:
log.info(_({"event": "dp disconnected"}))
if ev.enter:
dp = ev.dp
m = dp.ofproto_parser.OFPSetConfig(dp, 1 << 2, miss_send_len=1600)
dp.send_msg(m)
self.dp_joined = True
if ev.enter:
params = {'secure_channel_state': 'Up'}
else:
params = {'secure_channel_state': 'Down'}
for s in self.sessions:
m = s.session.create_notification('state', [params])
s.send_queue.put(m)
def handle_port_status(self, msg):
reason = msg.reason
datapath = msg.datapath
port = msg.desc
ofproto = datapath.ofproto
log.info(_({"event": "port status change", "reason": reason,
"port_no": port.port_no, "state": port.state}))
# For now just port modifications are reported
if reason == ofproto.OFPPR_MODIFY:
params = {'port_no': port.port_no, 'port_state': port.state}
for s in self.sessions:
m = s.session.create_notification('port_status', params)
s.send_queue.put(m)
@handler.set_ev_cls(ofp_event.EventOFPPortStatus)
def port_status_handler(self, ev):
if hasattr(ev, 'msg'):
msg = ev.msg
self.handle_port_status(msg)

View File

@ -48,5 +48,6 @@ CONF.register_cli_opts([
cfg.StrOpt('neutron-controller-addr', default=None,
help='openflow method:address:port to set controller of'
'ovs bridge',
deprecated_name='quantum-controller-addr')
deprecated_name='quantum-controller-addr'),
cfg.StrOpt('stats-file', default='/tmp/ofwire-stats.log')
])

37
ryu/lib/rpc.py Normal file
View File

@ -0,0 +1,37 @@
import msgpack
class RpcMessage(object):
REQUEST = 0
RESPONSE = 1
NOTIFY = 2
class RpcSession(object):
def __init__(self):
super(RpcSession, self).__init__()
self._packer = msgpack.Packer()
self._unpacker = msgpack.Unpacker()
self._next_msgid = 0
def _create_msgid(self):
this_id = self._next_msgid
self._next_msgid += 1
return this_id
def create_request(self, method, params):
msgid = self._create_msgid()
return self._packer.pack([RpcMessage.REQUEST, msgid, method, params])
def create_response(self, msgid, error, result):
return self._packer.pack([RpcMessage.RESPONSE, msgid, error, result])
def create_notification(self, method, params):
return self._packer.pack([RpcMessage.NOTIFY, method, params])
def get_messages(self, data):
self._unpacker.feed(data)
messages = []
for msg in self._unpacker:
messages.append(msg)
return messages