mirror of
https://github.com/faucetsdn/ryu.git
synced 2026-05-09 06:16:10 +02:00
Compare commits
25 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6e804f3e99 | ||
|
|
0438bb3420 | ||
|
|
c30ec9db08 | ||
|
|
8e49e9eb33 | ||
|
|
151a80d7e7 | ||
|
|
1206d2ed0e | ||
|
|
dcbcb2b6d6 | ||
|
|
5646e3f963 | ||
|
|
b5174ce2ab | ||
|
|
2cfa9b6f2c | ||
|
|
9b8577ccaa | ||
|
|
9282101c43 | ||
|
|
6c10625f55 | ||
|
|
d35c2d82dc | ||
|
|
b34a0f2491 | ||
|
|
39edcabd3d | ||
|
|
824069e3d0 | ||
|
|
b337333afc | ||
|
|
978513e833 | ||
|
|
1b063ab24d | ||
|
|
2b58257d13 | ||
|
|
80b3d916fe | ||
|
|
456f149621 | ||
|
|
d6f41a3334 | ||
|
|
dc00ee2398 |
469
ryu/controller/api.py
Normal file
469
ryu/controller/api.py
Normal file
@ -0,0 +1,469 @@
|
||||
import eventlet
|
||||
import select
|
||||
import datetime
|
||||
import time
|
||||
import logging
|
||||
import json
|
||||
import platform
|
||||
import msgpack
|
||||
from oslo.config import cfg
|
||||
from ryu.base import app_manager
|
||||
from ryu.controller import handler
|
||||
from ryu.controller import dpset
|
||||
from ryu.controller import ofp_event
|
||||
from ryu.ofproto.ofproto_parser import MsgBase
|
||||
from ryu.lib.rpc import RpcSession, RpcMessage
|
||||
from ryu.ofproto import ofproto_parser
|
||||
from ryu.ofproto import ofproto_v1_0
|
||||
from ryu.ofproto import ofproto_v1_0_parser
|
||||
from ryu.ofproto import ofproto_v1_2
|
||||
from ryu.ofproto import ofproto_v1_2_parser
|
||||
from ryu.ofproto import ofproto_v1_3
|
||||
from ryu.ofproto import ofproto_v1_3_parser
|
||||
from ryu.lib.packet import packet
|
||||
from ryu.lib.packet import ethernet
|
||||
from ryu.lib.packet import arp
|
||||
from ryu.lib.packet import vlan
|
||||
from ryu.lib.packet import ipv4
|
||||
from ryu.lib.packet import ipv6
|
||||
from ryu.lib.packet import udp
|
||||
from ryu.lib.packet import tcp
|
||||
from ryu.lib.packet import icmp
|
||||
from ryu.lib import mac
|
||||
|
||||
traceroute_source = {}
|
||||
flow_sem = eventlet.semaphore.Semaphore()
|
||||
monitored_flows = {}
|
||||
monitored_ports = {'interval': 15}
|
||||
|
||||
CONF = cfg.CONF
|
||||
STATS = logging.getLogger('apgw')
|
||||
STATS.addHandler(logging.FileHandler(CONF.stats_file, mode='w'))
|
||||
STATS.setLevel(logging.INFO)
|
||||
|
||||
|
||||
log = logging.getLogger('sys')
|
||||
log.setLevel(logging.DEBUG)
|
||||
if platform.system() == 'Darwin':
|
||||
address = '/var/run/syslog'
|
||||
else:
|
||||
address = '/dev/log'
|
||||
syslog = logging.handlers.SysLogHandler(address=address)
|
||||
log.addHandler(syslog)
|
||||
|
||||
def format_key(match_json):
|
||||
del match_json['OFPMatch']['length']
|
||||
for t in match_json['OFPMatch']['oxm_fields']:
|
||||
tlv = t['OXMTlv']
|
||||
if tlv['field'] in ['ipv4_dst', 'ipv4_src']:
|
||||
if tlv['mask'] == '255.255.255.255':
|
||||
tlv['mask'] = None
|
||||
return str(match_json)
|
||||
|
||||
|
||||
class StructuredMessage(object):
|
||||
def __init__(self, message):
|
||||
self.message = message
|
||||
cur_time = datetime.datetime.utcfromtimestamp(
|
||||
time.time()).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
|
||||
self.message['timestamp'] = cur_time
|
||||
self.message['component'] = 'ofwire'
|
||||
|
||||
def __str__(self):
|
||||
return json.dumps(self.message)
|
||||
|
||||
_ = StructuredMessage
|
||||
|
||||
|
||||
class OFWireRpcSession(object):
|
||||
def __init__(self, socket, dpset):
|
||||
self.socket = socket
|
||||
self.dpset = dpset
|
||||
self.session = RpcSession()
|
||||
self.pending = []
|
||||
self.pool = eventlet.GreenPool()
|
||||
self.send_queue = eventlet.queue.Queue()
|
||||
self.pool.spawn_n(self._send)
|
||||
|
||||
def _send(self):
|
||||
while True:
|
||||
msg = self.send_queue.get()
|
||||
try:
|
||||
m = msgpack.loads(msg)
|
||||
log.info(_({"event": "send rpc response",
|
||||
"msgid": m[1],
|
||||
"error": m[2],
|
||||
"result": m[3]}))
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
self.socket.sendall(msg)
|
||||
except:
|
||||
pass
|
||||
|
||||
def _flow_stats_loop(self, dp, table_id, match, interval, key):
|
||||
while True:
|
||||
if not key in monitored_flows:
|
||||
break
|
||||
msg = dp.ofproto_parser.OFPFlowStatsRequest(dp,
|
||||
table_id,
|
||||
dp.ofproto.OFPP_ANY,
|
||||
dp.ofproto.OFPG_ANY,
|
||||
0, 0,
|
||||
match)
|
||||
dp.send_msg(msg)
|
||||
eventlet.sleep(interval)
|
||||
|
||||
def ofp_handle_request(self, msg):
|
||||
send_response = True
|
||||
|
||||
param_dict = msg[3][0]
|
||||
if 'dpid' in param_dict:
|
||||
dp = self.dpset.get(int(param_dict['dpid']))
|
||||
else:
|
||||
dp = None
|
||||
if dp is None:
|
||||
for k, v in self.dpset.get_all():
|
||||
dp = v
|
||||
break
|
||||
|
||||
if dp is None:
|
||||
log.info(_({"event": "no datapath, queued",
|
||||
"msg": str(msg)}))
|
||||
self.pending.append(msg)
|
||||
return
|
||||
|
||||
ofmsg = None
|
||||
# default interval
|
||||
interval = 60
|
||||
contexts = None
|
||||
for k, v in param_dict.items():
|
||||
if k == 'dpid':
|
||||
continue
|
||||
elif k == 'ofmsg':
|
||||
try:
|
||||
ofmsg = ofproto_parser.ofp_msg_from_jsondict(dp, v)
|
||||
except:
|
||||
log.warning(_({"event": "invalid ofmsg format",
|
||||
"ofmsg": v}))
|
||||
elif k == 'interval':
|
||||
interval = int(v)
|
||||
elif k == 'contexts':
|
||||
contexts = v
|
||||
if ofmsg is None or (contexts and interval == 0):
|
||||
m = self.session.create_response(msg[1], None,
|
||||
[{'error': 'invalid'}])
|
||||
self.send_queue.put(m)
|
||||
return
|
||||
|
||||
dp.set_xid(ofmsg)
|
||||
ofmsg.serialize()
|
||||
if ofmsg.msg_type in (dp.ofproto.OFPT_STATS_REQUEST,
|
||||
dp.ofproto.OFPT_BARRIER_REQUEST):
|
||||
self.waiters[dp.id] = (ofmsg.xid, msg[1])
|
||||
else:
|
||||
error = 0
|
||||
result = {'xid': ofmsg.xid}
|
||||
if contexts:
|
||||
flow_sem.acquire()
|
||||
key = format_key(ofmsg.match.to_jsondict())
|
||||
if ofmsg.command is dp.ofproto.OFPFC_ADD:
|
||||
if key in monitored_flows:
|
||||
error = None
|
||||
result = [{'error': 'the existing flow'}]
|
||||
else:
|
||||
monitored_flows[key] = contexts
|
||||
self.pool.spawn_n(self._flow_stats_loop,
|
||||
dp, ofmsg.table_id, ofmsg.match,
|
||||
interval, key)
|
||||
|
||||
elif ofmsg.command in (dp.ofproto.OFPFC_DELETE,
|
||||
dp.ofproto.OFPFC_DELETE_STRICT):
|
||||
if key in monitored_flows:
|
||||
del monitored_flows[key]
|
||||
flow_sem.release()
|
||||
|
||||
r = self.session.create_response(msg[1], error, result)
|
||||
self.send_queue.put(r)
|
||||
dp.send_msg(ofmsg)
|
||||
|
||||
def _tr_handle_notify(self, msg):
|
||||
params = msg[2][0]
|
||||
traceroute_source[params['vlan']] = {
|
||||
'ip': params['ip'],
|
||||
'port': params['port']
|
||||
}
|
||||
log.info(_({"event": "register traceroute source",
|
||||
"ip address": params['ip'], "port": params['port']}))
|
||||
|
||||
def monitor_port(self, msg):
|
||||
param_dict = msg[3][0]
|
||||
name = None
|
||||
contexts = None
|
||||
for k, v in param_dict.items():
|
||||
if k == 'physical_port_no':
|
||||
name = v
|
||||
elif k == 'contexts':
|
||||
contexts = v
|
||||
elif k == 'interval':
|
||||
monitored_ports['interval'] = v
|
||||
|
||||
if not contexts or not name:
|
||||
m = self.session.create_response(msg[1], None,
|
||||
[{'error': 'invalid'}])
|
||||
self.send_queue.put(m)
|
||||
return
|
||||
|
||||
monitored_ports[name] = contexts
|
||||
r = self.session.create_response(msg[1], 0, [])
|
||||
self.send_queue.put(r)
|
||||
|
||||
def _handle_rpc_message(self, m):
|
||||
if m[0] == RpcMessage.REQUEST:
|
||||
try:
|
||||
log.info(_({"event": "recv RPC request",
|
||||
"method": m[2], "msgid": m[1],
|
||||
"params": m[3]}))
|
||||
except:
|
||||
pass
|
||||
|
||||
if m[2] == 'ofp':
|
||||
self.ofp_handle_request(m)
|
||||
elif m[2] == 'monitor_port':
|
||||
self.monitor_port(m)
|
||||
elif m[0] == RpcMessage.RESPONSE:
|
||||
pass
|
||||
elif m[0] == RpcMessage.NOTIFY:
|
||||
try:
|
||||
log.info(_({"event": "recv RPC notification",
|
||||
"method": m[1],
|
||||
"params": m[2]}))
|
||||
except:
|
||||
pass
|
||||
|
||||
if m[1] == 'traceroute':
|
||||
self._tr_handle_notify(m)
|
||||
else:
|
||||
log.info(_({"event": "invalid RPC message",
|
||||
"type": m[0]}))
|
||||
|
||||
def serve(self):
|
||||
while True:
|
||||
rready, _, _ = select.select([self.socket], [], [], 5)
|
||||
|
||||
for idx in range(len(self.pending)):
|
||||
msg = self.pending.pop(0)
|
||||
self._handle_rpc_message(msg)
|
||||
|
||||
if len(rready) > 0:
|
||||
ret = self.socket.recv(4096)
|
||||
if len(ret) == 0:
|
||||
break
|
||||
for m in self.session.get_messages(ret):
|
||||
self._handle_rpc_message(m)
|
||||
|
||||
|
||||
class RPCApi(app_manager.RyuApp):
|
||||
_CONTEXTS = {
|
||||
'dpset': dpset.DPSet,
|
||||
}
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.dpset = kwargs['dpset']
|
||||
super(RPCApi, self).__init__(*args, **kwargs)
|
||||
self.server = eventlet.listen(('', 50001))
|
||||
self.pool = eventlet.GreenPool()
|
||||
self.sessions = []
|
||||
self.dp_joined = False
|
||||
self.pool.spawn_n(self.serve)
|
||||
self.pool.spawn_n(self._port_status_loop)
|
||||
|
||||
def _port_status_loop(self):
|
||||
while True:
|
||||
for k, dp in self.dpset.get_all():
|
||||
try:
|
||||
port = dp.ofproto.OFPP_ANY
|
||||
ofmsg = dp.ofproto_parser.OFPPortStatsRequest(dp, port)
|
||||
dp.send_msg(ofmsg)
|
||||
except:
|
||||
pass
|
||||
eventlet.sleep(monitored_ports['interval'])
|
||||
|
||||
def serve(self):
|
||||
while True:
|
||||
sock, address = self.server.accept()
|
||||
session = OFWireRpcSession(sock, self.dpset)
|
||||
session.waiters = {}
|
||||
self.sessions.append(session)
|
||||
self.pool.spawn_n(session.serve)
|
||||
|
||||
def handle_traceroute(self, msg):
|
||||
dp = msg.datapath
|
||||
in_port = None
|
||||
for f in msg.match.fields:
|
||||
if f.header == ofproto_v1_2.OXM_OF_IN_PORT:
|
||||
in_port = f.value
|
||||
|
||||
if in_port is None:
|
||||
return
|
||||
|
||||
pkt = packet.Packet(msg.data)
|
||||
if not ipv4.ipv4 in pkt:
|
||||
return
|
||||
|
||||
if vlan.vlan in pkt:
|
||||
o_vlan = pkt.get_protocols(vlan.vlan)[0]
|
||||
vlan_p = vlan.vlan(vid=o_vlan.vid)
|
||||
else:
|
||||
return
|
||||
|
||||
o_eth = pkt.get_protocols(ethernet.ethernet)[0]
|
||||
eth = ethernet.ethernet(o_eth.src, o_eth.dst, o_eth.ethertype)
|
||||
o_ip = pkt.get_protocols(ipv4.ipv4)[0]
|
||||
# needs to set src properly for either side (vlan or mpls)
|
||||
# ip = ipv4.ipv4(src=ip_lib.ipv4_to_bin(V1_GS_IP), dst=o_ip.src,
|
||||
# proto=1)
|
||||
|
||||
src_ip = traceroute_source[o_vlan.vid]['ip']
|
||||
in_port = traceroute_source[o_vlan.vid]['port']
|
||||
|
||||
ip = ipv4.ipv4(src=src_ip, dst=o_ip.src, proto=1)
|
||||
ip_offset = 14 + 4
|
||||
# ether + vlan headers
|
||||
data = msg.data[ip_offset:ip_offset +
|
||||
(o_ip.header_length * 4 + 8)]
|
||||
ic = icmp.icmp(icmp.ICMP_TIME_EXCEEDED, 0, 0,
|
||||
icmp.TimeExceeded(data_len=len(data), data=data))
|
||||
|
||||
proto_list = [eth, vlan_p, ip, ic]
|
||||
|
||||
p = packet.Packet(protocols=proto_list)
|
||||
p.serialize()
|
||||
self.send_openflow_packet(dp=dp, packet=p.data,
|
||||
port_no=ofproto_v1_2.OFPP_TABLE,
|
||||
inport=in_port)
|
||||
|
||||
def send_openflow_packet(self, dp, packet, port_no,
|
||||
inport=ofproto_v1_2.OFPP_CONTROLLER):
|
||||
|
||||
actions = [dp.ofproto_parser.OFPActionOutput(port_no, 0)]
|
||||
dp.send_packet_out(in_port=inport, actions=actions, data=packet)
|
||||
|
||||
def _ofp_reply(self, msg):
|
||||
for sess in self.sessions:
|
||||
if msg.datapath.id in sess.waiters:
|
||||
(xid, msgid) = sess.waiters[msg.datapath.id]
|
||||
results = msg.to_jsondict()
|
||||
r = sess.session.create_response(msgid, None, results)
|
||||
sess.send_queue.put(r)
|
||||
|
||||
def compare_key(self, k1, k2):
|
||||
k1 = eval(k1)
|
||||
k2 = eval(k2)
|
||||
l1 = k1['OFPMatch']['oxm_fields']
|
||||
l2 = k2['OFPMatch']['oxm_fields']
|
||||
|
||||
return sorted(l1) == sorted(l2)
|
||||
|
||||
@handler.set_ev_cls(ofp_event.EventOFPBarrierReply,
|
||||
handler.MAIN_DISPATCHER)
|
||||
def barrier_reply_handler(self, ev):
|
||||
msg = ev.msg
|
||||
self._ofp_reply(msg)
|
||||
|
||||
@handler.set_ev_cls(ofp_event.EventOFPStatsReply,
|
||||
handler.MAIN_DISPATCHER)
|
||||
def flow_reply_handler(self, ev):
|
||||
msg = ev.msg
|
||||
timestamp = time.time()
|
||||
cur_time = datetime.datetime.utcfromtimestamp(
|
||||
timestamp).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
|
||||
|
||||
self._ofp_reply(msg)
|
||||
dp = msg.datapath
|
||||
if msg.type == ofproto_v1_2.OFPST_FLOW:
|
||||
for body in msg.body:
|
||||
key = format_key(body.match.to_jsondict())
|
||||
contexts = None
|
||||
flow_sem.acquire()
|
||||
for k in monitored_flows.keys():
|
||||
if self.compare_key(k, key):
|
||||
contexts = monitored_flows[k]
|
||||
flow_sem.release()
|
||||
if contexts:
|
||||
stats = {'byte_count': body.byte_count,
|
||||
'packet_count': body.packet_count,
|
||||
'match': body.match.to_jsondict(),
|
||||
#'inst': body.instructions,
|
||||
'table_id': body.table_id}
|
||||
stats.update(contexts)
|
||||
stats['timestamp'] = cur_time
|
||||
STATS.info(json.dumps(stats))
|
||||
elif msg.type == ofproto_v1_2.OFPST_PORT:
|
||||
for body in msg.body:
|
||||
try:
|
||||
ports = self.dpset.get_ports(dp.id)
|
||||
try:
|
||||
port_name = ports[body.port_no].name
|
||||
except:
|
||||
continue
|
||||
if port_name in monitored_ports:
|
||||
stats = {'timestamp': cur_time,
|
||||
'physical_port_no': port_name}
|
||||
stats.update(body.to_jsondict()['OFPPortStats'])
|
||||
stats.update(monitored_ports[port_name])
|
||||
STATS.info(json.dumps(stats))
|
||||
except:
|
||||
pass
|
||||
|
||||
@handler.set_ev_cls(ofp_event.EventOFPPacketIn)
|
||||
def packet_in_handler(self, ev):
|
||||
msg = ev.msg
|
||||
log.info(_({"event": "packet_in", "reason": msg.reason}))
|
||||
if ofproto_v1_2.OFPR_INVALID_TTL == msg.reason:
|
||||
self.handle_traceroute(msg)
|
||||
return
|
||||
|
||||
@handler.set_ev_cls(dpset.EventDP)
|
||||
def handler_datapath(self, ev):
|
||||
if ev.enter:
|
||||
log.info(_({"event": "dp connected",
|
||||
"dpid": ev.dp.id}))
|
||||
else:
|
||||
log.info(_({"event": "dp disconnected"}))
|
||||
|
||||
if ev.enter:
|
||||
dp = ev.dp
|
||||
m = dp.ofproto_parser.OFPSetConfig(dp, 1 << 2, miss_send_len=1600)
|
||||
dp.send_msg(m)
|
||||
self.dp_joined = True
|
||||
|
||||
if ev.enter:
|
||||
params = {'secure_channel_state': 'Up'}
|
||||
else:
|
||||
params = {'secure_channel_state': 'Down'}
|
||||
|
||||
for s in self.sessions:
|
||||
m = s.session.create_notification('state', [params])
|
||||
s.send_queue.put(m)
|
||||
|
||||
def handle_port_status(self, msg):
|
||||
reason = msg.reason
|
||||
datapath = msg.datapath
|
||||
port = msg.desc
|
||||
ofproto = datapath.ofproto
|
||||
log.info(_({"event": "port status change", "reason": reason,
|
||||
"port_no": port.port_no, "state": port.state}))
|
||||
# For now just port modifications are reported
|
||||
if reason == ofproto.OFPPR_MODIFY:
|
||||
params = {'port_no': port.port_no, 'port_state': port.state}
|
||||
for s in self.sessions:
|
||||
m = s.session.create_notification('port_status', params)
|
||||
s.send_queue.put(m)
|
||||
|
||||
@handler.set_ev_cls(ofp_event.EventOFPPortStatus)
|
||||
def port_status_handler(self, ev):
|
||||
if hasattr(ev, 'msg'):
|
||||
msg = ev.msg
|
||||
self.handle_port_status(msg)
|
||||
@ -48,5 +48,6 @@ CONF.register_cli_opts([
|
||||
cfg.StrOpt('neutron-controller-addr', default=None,
|
||||
help='openflow method:address:port to set controller of'
|
||||
'ovs bridge',
|
||||
deprecated_name='quantum-controller-addr')
|
||||
deprecated_name='quantum-controller-addr'),
|
||||
cfg.StrOpt('stats-file', default='/tmp/ofwire-stats.log')
|
||||
])
|
||||
|
||||
37
ryu/lib/rpc.py
Normal file
37
ryu/lib/rpc.py
Normal file
@ -0,0 +1,37 @@
|
||||
import msgpack
|
||||
|
||||
|
||||
class RpcMessage(object):
|
||||
REQUEST = 0
|
||||
RESPONSE = 1
|
||||
NOTIFY = 2
|
||||
|
||||
|
||||
class RpcSession(object):
|
||||
def __init__(self):
|
||||
super(RpcSession, self).__init__()
|
||||
self._packer = msgpack.Packer()
|
||||
self._unpacker = msgpack.Unpacker()
|
||||
self._next_msgid = 0
|
||||
|
||||
def _create_msgid(self):
|
||||
this_id = self._next_msgid
|
||||
self._next_msgid += 1
|
||||
return this_id
|
||||
|
||||
def create_request(self, method, params):
|
||||
msgid = self._create_msgid()
|
||||
return self._packer.pack([RpcMessage.REQUEST, msgid, method, params])
|
||||
|
||||
def create_response(self, msgid, error, result):
|
||||
return self._packer.pack([RpcMessage.RESPONSE, msgid, error, result])
|
||||
|
||||
def create_notification(self, method, params):
|
||||
return self._packer.pack([RpcMessage.NOTIFY, method, params])
|
||||
|
||||
def get_messages(self, data):
|
||||
self._unpacker.feed(data)
|
||||
messages = []
|
||||
for msg in self._unpacker:
|
||||
messages.append(msg)
|
||||
return messages
|
||||
Loading…
x
Reference in New Issue
Block a user