apgw: add RPC API

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
FUJITA Tomonori 2014-02-03 20:28:55 +09:00
parent 47ef512494
commit a0501a12b1
2 changed files with 1057 additions and 0 deletions

465
ryu/controller/api.py Normal file
View File

@ -0,0 +1,465 @@
from operator import attrgetter
from oslo.config import cfg
from ryu.base import app_manager
from ryu.controller import handler
from ryu.controller import dpset
from ryu.controller import ofp_event
from ryu.ofproto import ofproto_parser
from ryu.ofproto import ofproto_v1_2
from ryu.ofproto import ofproto_v1_2_parser
from ryu.ofproto import ofproto_v1_3
from ryu.ofproto import ofproto_v1_3_parser
from ryu.lib import hub
from ryu.lib import apgw
from ryu.lib import rpc
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet
from ryu.lib.packet import icmp
from ryu.lib.packet import ipv4
from ryu.lib.packet import vlan
_ = type('', (apgw.StructuredMessage,), {})
_.COMPONENT_NAME = 'ofwire'
class RPCError(Exception):
pass
class NoRPCResponse(Exception):
def __init__(self, dpid, xid, msgid):
self.dpid = dpid
self.xid = xid
self.msgid = msgid
class PendingRPC(Exception):
pass
class Peer(object):
def __init__(self, queue):
super(Peer, self).__init__()
self._queue = queue
self.wait_for_ofp_resepnse = {}
def _handle_rpc_request(self, data):
self._queue.put((self, rpc.MessageType.REQUEST, data))
def _handle_rpc_notify(self, data):
self._queue.put((self, rpc.MessageType.NOTIFY, data))
class RpcOFPManager(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_2.OFP_VERSION, ofproto_v1_3.OFP_VERSION]
LOGGER_NAME = 'ofwire'
_CONTEXTS = {
'dpset': dpset.DPSet,
}
def __init__(self, *args, **kwargs):
super(RpcOFPManager, self).__init__(*args, **kwargs)
self.dpset = kwargs['dpset']
self._peers = []
self.traceroute_source = {}
self.monitored_ports = {}
self.monitored_flows = {}
self.pending_rpc_requests = []
self._rpc_events = hub.Queue(128)
# per 30 secs by default
self.port_monitor_interval = 30
hub.spawn(self._peer_accept_thread)
hub.spawn(self._port_status_thread)
hub.spawn(self._rpc_message_thread)
apgw.update_syslog_format()
def _rpc_message_thread(self):
while True:
(peer, _type, data) = self._rpc_events.get()
error = None
result = None
try:
msgid, target_method, params = data
if _type == rpc.MessageType.REQUEST:
if target_method == "ofp":
result = self._handle_ofprotocol(msgid, params)
elif target_method == "monitor_port":
result = self._monitor_port(msgid, params)
else:
error = 'Unknown method %s' % (target_method)
elif _type == rpc.MessageType.NOTIFY:
if target_method == 'traceroute':
result = self._register_traceroute(msgid, params)
else:
error = 'Unknown method %s' % (target_method)
except RPCError as e:
error = str(e)
except PendingRPC as e:
# we handle the RPC request after a datapath joins.
self.pending_rpc_requests.append((peer, data))
continue
except NoRPCResponse as e:
# we'll send RPC sesponse after we get a response from
# datapath.
d = peer.wait_for_ofp_resepnse.setdefault(e.dpid, {})
d[e.xid] = e.msgid
continue
peer._endpoint.send_response(msgid, error=error, result=result)
def _peer_loop_thread(self, peer):
peer._endpoint.serve()
# the peer connection is closed
self._peers.remove(peer)
def peer_accept_handler(self, new_sock, addr):
peer = Peer(self._rpc_events)
table = {
rpc.MessageType.REQUEST: peer._handle_rpc_request,
rpc.MessageType.NOTIFY: peer._handle_rpc_notify,
}
peer._endpoint = rpc.EndPoint(new_sock, disp_table=table)
self._peers.append(peer)
hub.spawn(self._peer_loop_thread, peer)
def _peer_accept_thread(self):
server = hub.StreamServer(('', 50001),
self.peer_accept_handler)
server.serve_forever()
def _port_status_thread(self):
while True:
for k, dp in self.dpset.get_all():
try:
ofmsg = dp.ofproto_parser.OFPPortStatsRequest(datapath=dp)
dp.send_msg(ofmsg)
except:
# ignore the error due to dead datapath
pass
hub.sleep(self.port_monitor_interval)
def _send_waited_rpc_response(self, msg):
for peer in self._peers:
if not msg.datapath.id in peer.wait_for_ofp_resepnse:
continue
if msg.xid in peer.wait_for_ofp_resepnse[msg.datapath.id]:
msgid = peer.wait_for_ofp_resepnse[msg.datapath.id][msg.xid]
peer._endpoint.send_response(msgid, error=None,
result=msg.to_jsondict())
del peer.wait_for_ofp_resepnse[msg.datapath.id][msg.xid]
return
def compare_key(self, k1, k2):
k1 = eval(k1)
k2 = eval(k2)
l1 = k1['OFPMatch']['oxm_fields']
l2 = k2['OFPMatch']['oxm_fields']
return sorted(l1) == sorted(l2)
def format_key(self, match_json):
del match_json['OFPMatch']['length']
for t in match_json['OFPMatch']['oxm_fields']:
tlv = t['OXMTlv']
if tlv['field'] in ['ipv4_dst', 'ipv4_src']:
if tlv['mask'] == '255.255.255.255':
tlv['mask'] = None
return str(match_json)
@handler.set_ev_cls(dpset.EventDP)
def _handler_datapath(self, ev):
if ev.enter:
dp = ev.dp
parser = dp.ofproto_parser
ofp = dp.ofproto
if ofp.OFP_VERSION == ofproto_v1_2.OFP_VERSION:
m = parser.OFPSetConfig(dp,
ofp.OFPC_INVALID_TTL_TO_CONTROLLER,
ofp.OFPCML_MAX)
elif ofp.OFP_VERSION == ofproto_v1_3.OFP_VERSION:
packet_in_mask = ofp.OFPR_ACTION | ofp.OFPR_INVALID_TTL
port_status_mask = (ofp.OFPPR_ADD | ofp.OFPPR_DELETE |
ofp.OFPPR_MODIFY)
m = parser.OFPSetAsync(dp, [packet_in_mask, 0],
[port_status_mask, 0],
[0, 0])
dp.send_msg(m)
log_msg = {"event": "dp connected", "dpid": ev.dp.id}
notify_param = {'secure_channel_state': 'Up'}
for p in self.pending_rpc_requests:
(peer, data) = p
self._rpc_events.put((peer, rpc.MessageType.REQUEST, data))
else:
log_msg = {"event": "dp disconnected"}
notify_param = {'secure_channel_state': 'Down'}
for peer in self._peers:
if ev.dp.id in peer.wait_for_ofp_resepnse:
del peer.wait_for_ofp_resepnse[ev.dp.id]
self.logger.info(_(log_msg))
for peer in self._peers:
peer._endpoint.send_notification("state", [notify_param])
@handler.set_ev_cls(ofp_event.EventOFPErrorMsg,
handler.MAIN_DISPATCHER)
def _error_msg_handler(self, ev):
self.logger.info(_(ev.msg.to_jsondict()))
@handler.set_ev_cls(ofp_event.EventOFPBarrierReply,
handler.MAIN_DISPATCHER)
def _barrier_reply_handler(self, ev):
self._send_waited_rpc_response(ev.msg)
@handler.set_ev_cls(ofp_event.EventOFPFlowStatsReply,
handler.MAIN_DISPATCHER)
def _flow_stats_reply_handler(self, ev):
msg = ev.msg
for body in msg.body:
key = self.format_key(body.match.to_jsondict())
contexts = None
for k in self.monitored_flows.keys():
if self.compare_key(k, key):
contexts = self.monitored_flows[k]
break
if contexts is not None:
stats = {'byte_count': body.byte_count,
'packet_count': body.packet_count,
'match': body.match.to_jsondict(),
'table_id': body.table_id}
stats.update(contexts)
self.logger.info(_(msg=stats, log_type='stats'))
@handler.set_ev_cls(ofp_event.EventOFPPortStatsReply,
handler.MAIN_DISPATCHER)
def _port_stats_reply_handler(self, ev):
msg = ev.msg
dp = msg.datapath
for stat in sorted(msg.body, key=attrgetter('port_no')):
try:
port = self.dpset.get_port(dp.id, stat.port_no)
except:
continue
if port.name in self.monitored_ports:
stats = {'physical_port_no': port.name}
stats.update(stat.to_jsondict()['OFPPortStats'])
stats.update(self.monitored_ports[port.name])
self.logger.info(_(msg=stats, log_type='stats'))
@handler.set_ev_cls(ofp_event.EventOFPStatsReply,
handler.MAIN_DISPATCHER)
def _stats_reply_handler(self, ev):
msg = ev.msg
self._send_waited_rpc_response(msg)
if msg.type == ofproto_v1_2.OFPST_FLOW:
self._flow_stats_reply_handler(ev)
elif msg.type == ofproto_v1_2.OFPST_PORT:
self._port_stats_reply_handler(ev)
@handler.set_ev_cls(ofp_event.EventOFPPacketIn)
def _packet_in_handler(self, ev):
msg = ev.msg
dp = msg.datapath
self.logger.info(_({"event": "packet_in", "reason": msg.reason}))
if dp.ofproto.OFPR_INVALID_TTL != msg.reason:
return
if not 'in_port' in msg.match:
return
in_port = msg.match['in_port']
pkt = packet.Packet(msg.data)
if not pkt.get_protocol(ipv4.ipv4):
return
o_vlan = pkt.get_protocol(vlan.vlan)
if o_vlan is None:
return
vlan_p = vlan.vlan(vid=o_vlan.vid)
o_eth = pkt.get_protocol(ethernet.ethernet)
eth = ethernet.ethernet(o_eth.src, o_eth.dst, o_eth.ethertype)
o_ip = pkt.get_protocol(ipv4.ipv4)
# needs to set src properly for either side (vlan or mpls)
# ip = ipv4.ipv4(src=ip_lib.ipv4_to_bin(V1_GS_IP), dst=o_ip.src,
# proto=1)
try:
src_ip = self.traceroute_source[o_vlan.vid]['ip']
in_port = self.traceroute_source[o_vlan.vid]['port']
except:
self.logger.info(_({"event": "traceroute error",
"reason": "can't find ip", "vid": o_vlan.vid}))
return
ip = ipv4.ipv4(src=src_ip, dst=o_ip.src, proto=1)
ip_offset = 14 + 4
# ether + vlan headers
data = msg.data[ip_offset:ip_offset +
(o_ip.header_length * 4 + 8)]
ic = icmp.icmp(icmp.ICMP_TIME_EXCEEDED, 0, 0,
icmp.TimeExceeded(data_len=len(data), data=data))
p = packet.Packet(protocols=[eth, vlan_p, ip, ic])
p.serialize()
actions = [dp.ofproto_parser.OFPActionOutput(dp.ofproto.OFPP_TABLE, 0)]
dp.send_packet_out(in_port=in_port, actions=actions, data=p.data)
@handler.set_ev_cls(ofp_event.EventOFPPortStatus)
def _port_status_handler(self, ev):
if hasattr(ev, 'msg'):
msg = ev.msg
reason = msg.reason
datapath = msg.datapath
port = msg.desc
ofproto = datapath.ofproto
self.logger.info(_({"event": "port status change",
"reason": reason,
"port_no": port.port_no, "state": port.state},
log_type='states'))
# For now just port modifications are reported
if reason == ofproto.OFPPR_MODIFY:
params = {'port_no': port.port_no, 'port_state': port.state}
for peer in self._peers:
peer._endpoint.send_notification("port_status", [params])
def _flow_stats_loop(self, dp, table_id, match, interval, key):
while True:
if not key in self.monitored_flows:
break
msg = dp.ofproto_parser.OFPFlowStatsRequest(datapath=dp,
table_id=table_id,
match=match)
dp.send_msg(msg)
hub.sleep(interval)
def _handle_ofprotocol(self, msgid, params):
try:
param_dict = params[0]
except:
raise RPCError('parameters are missing')
send_response = True
dp = None
if 'dpid' in param_dict:
dp = self.dpset.get(int(param_dict['dpid']))
param_dict.pop('dpid')
else:
# use the first datapath
for k, v in self.dpset.get_all():
dp = v
break
if dp is None:
self.logger.info(_({"event": "no datapath, queued",
"msg": str(param_dict)}))
raise PendingRPC()
contexts = None
ofmsg = None
# default interval
interval = 60
for k, v in param_dict.items():
if k == 'ofmsg':
try:
ofmsg = ofproto_parser.ofp_msg_from_jsondict(dp, v)
except:
raise RPCError('parameters are invalid, %s' %
(str(param_dict)))
elif k == 'interval':
interval = int(v)
elif k == 'contexts':
contexts = v
if ofmsg is None:
raise RPCError('"ofmsg" parameter is invalid, %s' %
(str(param_dict)))
if contexts is not None and not isinstance(contexts, dict):
raise RPCError('"contexts" must be dictionary, %s' %
(str(param_dict)))
if contexts is not None and interval == 0:
raise RPCError('"interval" must be non zero with "contexts", %s' %
(str(param_dict)))
dp.set_xid(ofmsg)
ofmsg.serialize()
if dp.ofproto.OFP_VERSION == ofproto_v1_2.OFP_VERSION:
msg_types = (dp.ofproto.OFPT_STATS_REQUEST,
dp.ofproto.OFPT_BARRIER_REQUEST)
else:
msg_types = (dp.ofproto.OFPT_MULTIPART_REQUEST,
dp.ofproto.OFPT_BARRIER_REQUEST)
if ofmsg.msg_type in msg_types:
dp.send_msg(ofmsg)
raise NoRPCResponse(dpid=dp.id, xid=ofmsg.xid, msgid=msgid)
result = {'xid': ofmsg.xid}
if ofmsg.msg_type is dp.ofproto.OFPT_FLOW_MOD:
if contexts is not None:
key = self.format_key(ofmsg.match.to_jsondict())
if ofmsg.command is dp.ofproto.OFPFC_ADD:
if key in self.monitored_flows:
raise RPCError('the existing flow, %s' % (str(key)))
self.monitored_flows[key] = contexts
hub.spawn(self._flow_stats_loop,
dp, ofmsg.table_id, ofmsg.match,
interval, key)
elif ofmsg.command in (dp.ofproto.OFPFC_DELETE,
dp.ofproto.OFPFC_DELETE_STRICT):
try:
del self.monitored_flows[key]
except:
raise RPCError('unknown key, %s' % (str(key)))
else:
raise RPCError('unknown of message, %s' % (str(param_dict)))
dp.send_msg(ofmsg)
return result
def _register_traceroute(self, msgid, params):
try:
param_dict = params[0]
except:
raise RPCError('parameters are missing')
try:
self.traceroute_source[param_dict['vlan']] = {
'ip': param_dict['ip'],
'port': param_dict['port']
}
except Exception as e:
raise RPCError('parameters are invalid, %s' % (str(param_dict)))
self.logger.info(_({'event': 'register traceroute source',
'vlan': param_dict['vlan'],
'ip': param_dict['ip'],
'port': param_dict['port']}))
return {}
def _monitor_port(self, msgid, params):
try:
param_dict = params[0]
except:
raise RPCError('parameters are missing')
name = None
contexts = None
for k, v in param_dict.items():
if k == 'physical_port_no':
name = v
elif k == 'contexts':
contexts = v
elif k == 'interval':
self.port_monitor_interval = v
else:
raise RPCError('unknown parameters, %s' % k)
if contexts is None:
raise RPCError('"contexts" parameter is necessary')
if not isinstance(contexts, dict):
raise RPCError('"contexts" parameter must be dictionary')
if name is None:
raise RPCError('"physical_port_no" parameter is necessary')
self.monitored_ports[name] = contexts
return {}

View File

@ -0,0 +1,592 @@
import unittest
import logging
import socket
from nose.tools import eq_
from nose.tools import raises
from ryu.controller import api
from ryu.controller import dpset
from ryu.ofproto.ofproto_parser import MsgBase
from ryu.ofproto import ether
from ryu.ofproto import inet
from ryu.ofproto import ofproto_parser
from ryu.ofproto import ofproto_v1_2
from ryu.ofproto import ofproto_v1_2_parser
from ryu.ofproto import ofproto_v1_3
from ryu.ofproto import ofproto_v1_3_parser
from ryu.controller import ofp_event
from ryu.lib import hub
from ryu.lib import rpc
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet
from ryu.lib.packet import vlan
from ryu.lib.packet import ipv4
from ryu.lib.packet import icmp
class DummyEndpoint(object):
def __init__(self):
self.response = []
self.notification = []
def send_response(self, msgid, error, result):
self.response.append((msgid, error, result))
def send_notification(self, method, params):
self.notification.append((method, params))
class DummyDatapath(object):
def __init__(self, ofp=None, ofpp=None):
if ofp is None:
ofp = ofproto_v1_2
self.ofproto = ofp
if ofpp is None:
ofpp = ofproto_v1_2_parser
self.ofproto_parser = ofpp
self.port_state = {}
self.ports = {}
self.sent = []
def set_xid(self, msg):
msg.set_xid(1)
def send_msg(self, msg):
assert isinstance(msg, MsgBase)
msg.serialize()
self.sent.append(msg)
def send_packet_out(self, in_port, actions, data):
self.sent.append((in_port, actions, data))
class TestRpcOFPManager(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_logger(self):
_ = api._
m = {'hello': 'world'}
r = eval(str(_(m)))
eq_(r['msg'], m)
eq_(r['component_name'], _.COMPONENT_NAME)
eq_(r['log_type'], 'log')
def test_monitor_port(self):
m = api.RpcOFPManager(dpset=None)
msgid = 1
try:
m._monitor_port(msgid, {})
except api.RPCError as e:
pass
try:
m._monitor_port(msgid, {'physical_port_no': 1})
except api.RPCError as e:
pass
contents = {'hoge': 'jail'}
r = m._monitor_port(msgid, [{'physical_port_no': 1,
'contexts': contents,
'interval': 30}])
eq_(r, {})
eq_(m.monitored_ports[1], contents)
def test_register_traceroute(self):
m = api.RpcOFPManager(dpset=None)
msgid = 1
try:
m._register_traceroute(msgid, [{}])
except api.RPCError as e:
pass
try:
m._register_traceroute(msgid, [{'vlan': 1}])
except api.RPCError as e:
pass
vlan_id = 1
port_no = 10
m._register_traceroute(msgid, [{'vlan': vlan_id,
'ip': '192.168.1.1',
'port': port_no}])
def test_handle_ofprotocol_without_dp(self):
m = api.RpcOFPManager(dpset=dpset.DPSet())
msgid = 1
try:
m._handle_ofprotocol(msgid, [{}])
except api.PendingRPC:
pass
try:
m._handle_ofprotocol(msgid, [{'dpid': 1}])
except api.PendingRPC:
pass
def _create_dpset(self, dpid=0, ports=None, ofp=None, ofpp=None):
dps = dpset.DPSet()
dp = DummyDatapath(ofp=ofp, ofpp=ofpp)
dp.id = dpid
if ports:
class DummyPort(object):
def __init__(self, port_no):
self.name = port_no
dps.ports = dict(map((lambda n: (n, DummyPort(n))), ports))
dps.get_ports = lambda dpid: dps.ports
dps._register(dp)
return dps
def _test_handle_ofprotocol_flowmod(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
m = api.RpcOFPManager(dpset=dps)
msgid = 1
nr_sent = 0
try:
m._handle_ofprotocol(msgid, [{'dpid': dpid}])
except api.RPCError:
pass
dp = dps.get(dpid)
match = ofpp.OFPMatch(in_port=1)
ofmsg = ofpp.OFPFlowMod(datapath=dp, match=match)
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict()}])
nr_sent += 1
eq_(r, {'xid': 1})
eq_(len(dp.sent), nr_sent)
contexts = {'hello': 'world'}
r = m._handle_ofprotocol(msgid, [{'ofmsg': ofmsg.to_jsondict(),
'internal': 30,
'contexts': contexts}])
nr_sent += 1
eq_(r, {'xid': 1})
eq_(len(dp.sent), nr_sent)
eq_(len(m.monitored_flows), 1)
eq_(m.monitored_flows[m.format_key(match.to_jsondict())],
contexts)
match = ofpp.OFPMatch(in_port=1)
ofmsg = ofpp.OFPFlowMod(datapath=dp,
command=ofproto_v1_2.OFPFC_DELETE,
match=match)
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
nr_sent += 1
eq_(r, {'xid': 1})
eq_(len(dp.sent), nr_sent)
try:
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
except api.RPCError as e:
assert 'unknown key' in str(e)
eq_(len(dp.sent), nr_sent)
def test_handle_ofprotocol_flowmod_12(self):
self._test_handle_ofprotocol_flowmod(ofproto_v1_2, ofproto_v1_2_parser)
def test_handle_ofprotocol_flowmod_13(self):
self._test_handle_ofprotocol_flowmod(ofproto_v1_3, ofproto_v1_3_parser)
def _test_handle_ofprotocol(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
m = api.RpcOFPManager(dpset=dps)
msgid = 1
nr_sent = 0
dp = dps.get(dpid)
ofmsg = ofpp.OFPBarrierRequest(datapath=dp)
nr_sent += 1
try:
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict()}])
except api.NoRPCResponse as e:
eq_(e.dpid, dpid)
eq_(e.xid, 1)
eq_(e.msgid, msgid)
eq_(len(dp.sent), nr_sent)
ofmsg = ofpp.OFPFlowStatsRequest(datapath=dp)
nr_sent += 1
try:
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict()}])
except api.NoRPCResponse as e:
eq_(e.dpid, dpid)
eq_(e.xid, 1)
eq_(e.msgid, msgid)
eq_(len(dp.sent), nr_sent)
ofmsg = ofpp.OFPHello(datapath=dp)
try:
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict()}])
except api.RPCError as e:
assert 'unknown of message' in str(e)
eq_(len(dp.sent), nr_sent)
def test_handle_ofprotocol_12(self):
self._test_handle_ofprotocol(ofproto_v1_2, ofproto_v1_2_parser)
def test_handle_ofprotocol_13(self):
self._test_handle_ofprotocol(ofproto_v1_3, ofproto_v1_3_parser)
def _test_port_status_handler(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
dp = dps.get(dpid)
port = ofpp.OFPPort(1, 'aa:aa:aa:aa:aa:aa', 'hoge', 0, 0, 0,
0, 0, 0, 0, 0)
msg = ofpp.OFPPortStatus(datapath=dp, reason=ofp.OFPPR_MODIFY,
desc=port)
ev = ofp_event.EventOFPPortStatus(msg)
m = api.RpcOFPManager(dpset=dps)
peer = api.Peer(None)
peer._endpoint = DummyEndpoint()
m._peers.append(peer)
m._port_status_handler(ev)
eq_(len(peer._endpoint.notification), 1)
(method, params) = peer._endpoint.notification[0]
eq_(method, 'port_status')
eq_(params[0], {'port_no': port.port_no, 'port_state': port.state})
def test_port_status_handler_12(self):
self._test_port_status_handler(ofproto_v1_2, ofproto_v1_2_parser)
def test_port_status_handler_13(self):
self._test_port_status_handler(ofproto_v1_3, ofproto_v1_3_parser)
def _test_packet_in_handler(self, ofp, ofpp):
dpid = 10
src_ip = '10.0.0.1'
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
dp = dps.get(dpid)
msg = ofpp.OFPPacketIn(datapath=dp)
ev = ofp_event.EventOFPPacketIn(msg)
in_port = 10
vlan_id = 100
target_ip = '192.168.1.1'
msg.match = ofpp.OFPMatch(in_port=in_port)
protocols = [ethernet.ethernet(ethertype=ether.ETH_TYPE_8021Q),
vlan.vlan(vid=vlan_id), ipv4.ipv4(src=target_ip)]
p = packet.Packet(protocols=protocols)
p.serialize()
msg.data = p.data
msg.reason = ofp.OFPR_INVALID_TTL
m = api.RpcOFPManager(dpset=dps)
port_no = 5
msgid = 7
# test the failure case
m._packet_in_handler(ev)
m._register_traceroute(msgid, [{'vlan': vlan_id,
'ip': src_ip,
'port': port_no}])
m._packet_in_handler(ev)
(in_port, actions, data) = dp.sent[0]
eq_(in_port, port_no)
pkt = packet.Packet(data)
ip = pkt.get_protocol(ipv4.ipv4)
v = pkt.get_protocol(vlan.vlan)
ic = pkt.get_protocol(icmp.icmp)
eq_(ip.src, src_ip)
eq_(ip.dst, target_ip)
eq_(ip.proto, inet.IPPROTO_ICMP)
eq_(v.vid, vlan_id)
eq_(ic.type, icmp.ICMP_TIME_EXCEEDED)
eq_(len(actions), 1)
eq_(actions[0].port, ofp.OFPP_TABLE)
def test_packet_in_handler_12(self):
self._test_packet_in_handler(ofproto_v1_2, ofproto_v1_2_parser)
def test_packet_in_handler_13(self):
self._test_packet_in_handler(ofproto_v1_3, ofproto_v1_3_parser)
def _test_handler_datapath(self, ofp, parser):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=parser)
dp = dps.get(dpid)
ev = dpset.EventDP(dp=dp, enter_leave=True)
m = api.RpcOFPManager(dpset=dps)
peer = api.Peer(None)
peer._endpoint = DummyEndpoint()
m._peers.append(peer)
m._handler_datapath(ev)
eq_(len(peer._endpoint.notification), 1)
(method, params) = peer._endpoint.notification[0]
eq_(method, 'state')
eq_(params[0], {'secure_channel_state': 'Up'})
peer._endpoint.notification.pop()
ev = dpset.EventDP(dp=dp, enter_leave=False)
m._handler_datapath(ev)
eq_(len(peer._endpoint.notification), 1)
(method, params) = peer._endpoint.notification[0]
eq_(method, 'state')
eq_(params[0], {'secure_channel_state': 'Down'})
eq_(len(dp.sent), 1)
return dp
def test_handler_datapath_12(self):
dp = self._test_handler_datapath(ofproto_v1_2, ofproto_v1_2_parser)
msg = dp.sent[0]
eq_(msg.__class__, ofproto_v1_2_parser.OFPSetConfig)
eq_(msg.flags, ofproto_v1_2.OFPC_INVALID_TTL_TO_CONTROLLER)
def test_handler_datapath_13(self):
ofp = ofproto_v1_3
dp = self._test_handler_datapath(ofp, ofproto_v1_3_parser)
msg = dp.sent[0]
eq_(msg.__class__, ofproto_v1_3_parser.OFPSetAsync)
eq_(msg.packet_in_mask,
[ofp.OFPR_ACTION | ofp.OFPR_INVALID_TTL, 0])
eq_(msg.port_status_mask,
[(ofp.OFPPR_ADD | ofp.OFPPR_DELETE | ofp.OFPPR_MODIFY), 0])
def _test_port_status_thread(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
dp = dps.get(dpid)
m = api.RpcOFPManager(dpset=dps)
m.port_monitor_interval = 10
threads = []
with hub.Timeout(2):
threads.append(hub.spawn(m._port_status_thread))
hub.sleep(0.5)
for t in threads:
hub.kill(t)
hub.joinall(threads)
for m in dp.sent:
eq_(m.__class__, ofpp.OFPPortStatsRequest)
eq_(m.port_no, ofp.OFPP_ANY)
def test_port_status_thread_12(self):
self._test_port_status_thread(ofproto_v1_2, ofproto_v1_2_parser)
def test_port_status_thread_13(self):
self._test_port_status_thread(ofproto_v1_3, ofproto_v1_3_parser)
def _test_flow_stats_loop(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
dp = dps.get(dpid)
m = api.RpcOFPManager(dpset=dps)
match = ofpp.OFPMatch(in_port=1)
ofmsg = ofpp.OFPFlowMod(datapath=dp, match=match)
contexts = {'hello': 'you'}
msgid = 10
r = m._handle_ofprotocol(msgid, [{'ofmsg': ofmsg.to_jsondict(),
'internal': 30,
'contexts': contexts}])
threads = []
key = m.format_key(ofmsg.match.to_jsondict())
with hub.Timeout(5):
threads.append(hub.spawn(m._flow_stats_loop,
dp, ofmsg.table_id, match, 0.1, key))
hub.sleep(0.5)
ofmsg = ofpp.OFPFlowMod(datapath=dp,
command=ofp.OFPFC_DELETE,
match=match)
msgid = 11
r = m._handle_ofprotocol(msgid, [{'ofmsg': ofmsg.to_jsondict(),
'internal': 30,
'contexts': contexts}])
eq_(len(m.monitored_flows), 0)
hub.joinall(threads)
for m in dp.sent:
if m.__class__ in (ofpp.OFPFlowMod, ofpp.OFPPortStatsRequest):
continue
eq_(m.__class__, ofpp.OFPFlowStatsRequest)
eq_(m.table_id, ofmsg.table_id)
eq_(m.match.to_jsondict(), match.to_jsondict())
def test_flow_stats_loop_12(self):
self._test_flow_stats_loop(ofproto_v1_2, ofproto_v1_2_parser)
def test_flow_stats_loop_13(self):
self._test_flow_stats_loop(ofproto_v1_3, ofproto_v1_3_parser)
def _test_rpc_message_thread(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
dp = dps.get(dpid)
m = api.RpcOFPManager(dpset=dps)
peer = api.Peer(m._rpc_events)
peer._endpoint = DummyEndpoint()
m._peers.append(peer)
msgid = 7
ofmsg = ofpp.OFPBarrierRequest(datapath=dp)
params = {'dpid': dpid,
'ofmsg': ofmsg.to_jsondict()}
data = (msgid, 'ofp', [params])
with hub.Timeout(2):
peer._handle_rpc_request(data)
hub.sleep(0.5)
for s in dp.sent:
if s.__class__ in (ofpp.OFPPortStatsRequest,):
continue
eq_(s.__class__, ofpp.OFPBarrierRequest)
msg = ofpp.OFPBarrierReply(datapath=dp)
msg.set_xid(1)
ev = ofp_event.EventOFPBarrierReply(msg)
m._barrier_reply_handler(ev)
eq_(len(peer._endpoint.response), 1)
rsp = peer._endpoint.response.pop()
eq_(rsp[0], msgid)
eq_(rsp[1], None)
eq_(rsp[2], msg.to_jsondict())
eq_(len(peer.wait_for_ofp_resepnse[dpid]), 0)
m._barrier_reply_handler(ev)
eq_(len(peer._endpoint.response), 0)
ev = dpset.EventDP(dp=dp, enter_leave=False)
m._handler_datapath(ev)
eq_(len(peer.wait_for_ofp_resepnse), 0)
def test_rpc_message_thread_12(self):
self._test_rpc_message_thread(ofproto_v1_2, ofproto_v1_2_parser)
def test_rpc_message_thread_13(self):
self._test_rpc_message_thread(ofproto_v1_3, ofproto_v1_3_parser)
def _create_port_stats_args(self, port_no=0):
return {'port_no': port_no, 'rx_packets': 10, 'tx_packets': 30,
'rx_bytes': 1024, 'tx_bytes': 2048,
'rx_dropped': 0, 'tx_dropped': 1,
'rx_errors': 0, 'tx_errors': 0,
'rx_frame_err': 0, 'rx_over_err': 0, 'rx_crc_err': 0,
'collisions': 0}
def _test_stats_reply_port_handler(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, (0, 1, 2), ofp, ofpp)
dp = dps.get(dpid)
m = api.RpcOFPManager(dpset=dps)
msgid = 1
r = m._monitor_port(msgid, [{'physical_port_no': 0,
'contexts': {'hello': 'world'},
'interval': 30}])
return m, dp
def test_stats_reply_port_handler_12(self):
ofp = ofproto_v1_2
ofpp = ofproto_v1_2_parser
manager, dp = self._test_stats_reply_port_handler(ofp, ofpp)
msg = ofpp.OFPStatsReply(datapath=dp, type_=ofp.OFPST_PORT)
ev = ofp_event.EventOFPStatsReply(msg)
msg.body = []
for p in range(2):
port = ofpp.OFPPortStats(**self._create_port_stats_args(p))
msg.body.append(port)
manager._stats_reply_handler(ev)
def test_stats_reply_port_handler_13(self):
ofp = ofproto_v1_3
ofpp = ofproto_v1_3_parser
manager, dp = self._test_stats_reply_port_handler(ofp, ofpp)
msg = ofpp.OFPPortStatsReply(dp)
ev = ofp_event.EventOFPPortStatsReply(msg)
msg.body = []
for p in range(2):
d = self._create_port_stats_args(p)
d['duration_sec'] = 0
d['duration_nsec'] = 0
port = ofpp.OFPPortStats(**d)
msg.body.append(port)
manager._port_stats_reply_handler(ev)
def _test_stats_reply_flow_handler(self, ofp, ofpp):
dpid = 10
msgid = 9
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
m = api.RpcOFPManager(dpset=dps)
dp = dps.get(dpid)
match = ofpp.OFPMatch(in_port=1)
ofmsg = ofpp.OFPFlowMod(datapath=dp, match=match)
contexts = {'hello': 'world'}
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
return m, dp
def test_stats_reply_flow_handler_12(self):
ofp = ofproto_v1_2
ofpp = ofproto_v1_2_parser
manager, dp = self._test_stats_reply_flow_handler(ofp, ofpp)
msg = ofpp.OFPStatsReply(datapath=dp,
type_=ofp.OFPST_FLOW)
ev = ofp_event.EventOFPStatsReply(msg)
s1 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10,
priority=0, idle_timeout=0, hard_timeout=0,
cookie=0, packet_count=10, byte_count=100,
match=ofpp.OFPMatch(in_port=1))
# not-monitored flow
s2 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10,
priority=0, idle_timeout=0, hard_timeout=0,
cookie=0, packet_count=10, byte_count=100,
match=ofpp.OFPMatch(in_port=2))
msg.body = [s1, s2]
manager._stats_reply_handler(ev)
def test_stats_reply_flow_handler_13(self):
ofp = ofproto_v1_3
ofpp = ofproto_v1_3_parser
manager, dp = self._test_stats_reply_flow_handler(ofp, ofpp)
msg = ofpp.OFPFlowStatsReply(datapath=dp)
ev = ofp_event.EventOFPStatsReply(msg)
s1 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10,
priority=0, idle_timeout=0, hard_timeout=0,
cookie=0, packet_count=10, byte_count=100,
match=ofpp.OFPMatch(in_port=1))
# not-monitored flow
s2 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10,
priority=0, idle_timeout=0, hard_timeout=0,
cookie=0, packet_count=10, byte_count=100,
match=ofpp.OFPMatch(in_port=2))
msg.body = [s1, s2]
manager._flow_stats_reply_handler(ev)