From a0501a12b16784694daad05118b1ef00a44941ea Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Mon, 3 Feb 2014 20:28:55 +0900 Subject: [PATCH] apgw: add RPC API Signed-off-by: FUJITA Tomonori --- ryu/controller/api.py | 465 ++++++++++++++++++++++ ryu/tests/unit/app/test_apgw_rpc.py | 592 ++++++++++++++++++++++++++++ 2 files changed, 1057 insertions(+) create mode 100644 ryu/controller/api.py create mode 100644 ryu/tests/unit/app/test_apgw_rpc.py diff --git a/ryu/controller/api.py b/ryu/controller/api.py new file mode 100644 index 00000000..cad325ae --- /dev/null +++ b/ryu/controller/api.py @@ -0,0 +1,465 @@ +from operator import attrgetter +from oslo.config import cfg +from ryu.base import app_manager +from ryu.controller import handler +from ryu.controller import dpset +from ryu.controller import ofp_event +from ryu.ofproto import ofproto_parser +from ryu.ofproto import ofproto_v1_2 +from ryu.ofproto import ofproto_v1_2_parser +from ryu.ofproto import ofproto_v1_3 +from ryu.ofproto import ofproto_v1_3_parser +from ryu.lib import hub +from ryu.lib import apgw +from ryu.lib import rpc +from ryu.lib.packet import packet +from ryu.lib.packet import ethernet +from ryu.lib.packet import icmp +from ryu.lib.packet import ipv4 +from ryu.lib.packet import vlan + + +_ = type('', (apgw.StructuredMessage,), {}) +_.COMPONENT_NAME = 'ofwire' + + +class RPCError(Exception): + pass + + +class NoRPCResponse(Exception): + def __init__(self, dpid, xid, msgid): + self.dpid = dpid + self.xid = xid + self.msgid = msgid + + +class PendingRPC(Exception): + pass + + +class Peer(object): + def __init__(self, queue): + super(Peer, self).__init__() + self._queue = queue + self.wait_for_ofp_resepnse = {} + + def _handle_rpc_request(self, data): + self._queue.put((self, rpc.MessageType.REQUEST, data)) + + def _handle_rpc_notify(self, data): + self._queue.put((self, rpc.MessageType.NOTIFY, data)) + + +class RpcOFPManager(app_manager.RyuApp): + OFP_VERSIONS = [ofproto_v1_2.OFP_VERSION, ofproto_v1_3.OFP_VERSION] + LOGGER_NAME = 'ofwire' + _CONTEXTS = { + 'dpset': dpset.DPSet, + } + + def __init__(self, *args, **kwargs): + super(RpcOFPManager, self).__init__(*args, **kwargs) + self.dpset = kwargs['dpset'] + self._peers = [] + self.traceroute_source = {} + self.monitored_ports = {} + self.monitored_flows = {} + self.pending_rpc_requests = [] + self._rpc_events = hub.Queue(128) + # per 30 secs by default + self.port_monitor_interval = 30 + hub.spawn(self._peer_accept_thread) + hub.spawn(self._port_status_thread) + hub.spawn(self._rpc_message_thread) + apgw.update_syslog_format() + + def _rpc_message_thread(self): + while True: + (peer, _type, data) = self._rpc_events.get() + error = None + result = None + try: + msgid, target_method, params = data + if _type == rpc.MessageType.REQUEST: + if target_method == "ofp": + result = self._handle_ofprotocol(msgid, params) + elif target_method == "monitor_port": + result = self._monitor_port(msgid, params) + else: + error = 'Unknown method %s' % (target_method) + elif _type == rpc.MessageType.NOTIFY: + if target_method == 'traceroute': + result = self._register_traceroute(msgid, params) + else: + error = 'Unknown method %s' % (target_method) + except RPCError as e: + error = str(e) + except PendingRPC as e: + # we handle the RPC request after a datapath joins. + self.pending_rpc_requests.append((peer, data)) + continue + except NoRPCResponse as e: + # we'll send RPC sesponse after we get a response from + # datapath. + d = peer.wait_for_ofp_resepnse.setdefault(e.dpid, {}) + d[e.xid] = e.msgid + continue + + peer._endpoint.send_response(msgid, error=error, result=result) + + def _peer_loop_thread(self, peer): + peer._endpoint.serve() + # the peer connection is closed + self._peers.remove(peer) + + def peer_accept_handler(self, new_sock, addr): + peer = Peer(self._rpc_events) + table = { + rpc.MessageType.REQUEST: peer._handle_rpc_request, + rpc.MessageType.NOTIFY: peer._handle_rpc_notify, + } + peer._endpoint = rpc.EndPoint(new_sock, disp_table=table) + self._peers.append(peer) + hub.spawn(self._peer_loop_thread, peer) + + def _peer_accept_thread(self): + server = hub.StreamServer(('', 50001), + self.peer_accept_handler) + server.serve_forever() + + def _port_status_thread(self): + while True: + for k, dp in self.dpset.get_all(): + try: + ofmsg = dp.ofproto_parser.OFPPortStatsRequest(datapath=dp) + dp.send_msg(ofmsg) + except: + # ignore the error due to dead datapath + pass + hub.sleep(self.port_monitor_interval) + + def _send_waited_rpc_response(self, msg): + for peer in self._peers: + if not msg.datapath.id in peer.wait_for_ofp_resepnse: + continue + if msg.xid in peer.wait_for_ofp_resepnse[msg.datapath.id]: + msgid = peer.wait_for_ofp_resepnse[msg.datapath.id][msg.xid] + peer._endpoint.send_response(msgid, error=None, + result=msg.to_jsondict()) + del peer.wait_for_ofp_resepnse[msg.datapath.id][msg.xid] + return + + def compare_key(self, k1, k2): + k1 = eval(k1) + k2 = eval(k2) + l1 = k1['OFPMatch']['oxm_fields'] + l2 = k2['OFPMatch']['oxm_fields'] + return sorted(l1) == sorted(l2) + + def format_key(self, match_json): + del match_json['OFPMatch']['length'] + for t in match_json['OFPMatch']['oxm_fields']: + tlv = t['OXMTlv'] + if tlv['field'] in ['ipv4_dst', 'ipv4_src']: + if tlv['mask'] == '255.255.255.255': + tlv['mask'] = None + return str(match_json) + + @handler.set_ev_cls(dpset.EventDP) + def _handler_datapath(self, ev): + if ev.enter: + dp = ev.dp + parser = dp.ofproto_parser + ofp = dp.ofproto + if ofp.OFP_VERSION == ofproto_v1_2.OFP_VERSION: + m = parser.OFPSetConfig(dp, + ofp.OFPC_INVALID_TTL_TO_CONTROLLER, + ofp.OFPCML_MAX) + elif ofp.OFP_VERSION == ofproto_v1_3.OFP_VERSION: + packet_in_mask = ofp.OFPR_ACTION | ofp.OFPR_INVALID_TTL + port_status_mask = (ofp.OFPPR_ADD | ofp.OFPPR_DELETE | + ofp.OFPPR_MODIFY) + m = parser.OFPSetAsync(dp, [packet_in_mask, 0], + [port_status_mask, 0], + [0, 0]) + dp.send_msg(m) + + log_msg = {"event": "dp connected", "dpid": ev.dp.id} + notify_param = {'secure_channel_state': 'Up'} + for p in self.pending_rpc_requests: + (peer, data) = p + self._rpc_events.put((peer, rpc.MessageType.REQUEST, data)) + else: + log_msg = {"event": "dp disconnected"} + notify_param = {'secure_channel_state': 'Down'} + for peer in self._peers: + if ev.dp.id in peer.wait_for_ofp_resepnse: + del peer.wait_for_ofp_resepnse[ev.dp.id] + + self.logger.info(_(log_msg)) + for peer in self._peers: + peer._endpoint.send_notification("state", [notify_param]) + + @handler.set_ev_cls(ofp_event.EventOFPErrorMsg, + handler.MAIN_DISPATCHER) + def _error_msg_handler(self, ev): + self.logger.info(_(ev.msg.to_jsondict())) + + @handler.set_ev_cls(ofp_event.EventOFPBarrierReply, + handler.MAIN_DISPATCHER) + def _barrier_reply_handler(self, ev): + self._send_waited_rpc_response(ev.msg) + + @handler.set_ev_cls(ofp_event.EventOFPFlowStatsReply, + handler.MAIN_DISPATCHER) + def _flow_stats_reply_handler(self, ev): + msg = ev.msg + for body in msg.body: + key = self.format_key(body.match.to_jsondict()) + contexts = None + for k in self.monitored_flows.keys(): + if self.compare_key(k, key): + contexts = self.monitored_flows[k] + break + if contexts is not None: + stats = {'byte_count': body.byte_count, + 'packet_count': body.packet_count, + 'match': body.match.to_jsondict(), + 'table_id': body.table_id} + stats.update(contexts) + self.logger.info(_(msg=stats, log_type='stats')) + + @handler.set_ev_cls(ofp_event.EventOFPPortStatsReply, + handler.MAIN_DISPATCHER) + def _port_stats_reply_handler(self, ev): + msg = ev.msg + dp = msg.datapath + for stat in sorted(msg.body, key=attrgetter('port_no')): + try: + port = self.dpset.get_port(dp.id, stat.port_no) + except: + continue + if port.name in self.monitored_ports: + stats = {'physical_port_no': port.name} + stats.update(stat.to_jsondict()['OFPPortStats']) + stats.update(self.monitored_ports[port.name]) + self.logger.info(_(msg=stats, log_type='stats')) + + @handler.set_ev_cls(ofp_event.EventOFPStatsReply, + handler.MAIN_DISPATCHER) + def _stats_reply_handler(self, ev): + msg = ev.msg + self._send_waited_rpc_response(msg) + + if msg.type == ofproto_v1_2.OFPST_FLOW: + self._flow_stats_reply_handler(ev) + elif msg.type == ofproto_v1_2.OFPST_PORT: + self._port_stats_reply_handler(ev) + + @handler.set_ev_cls(ofp_event.EventOFPPacketIn) + def _packet_in_handler(self, ev): + msg = ev.msg + dp = msg.datapath + self.logger.info(_({"event": "packet_in", "reason": msg.reason})) + if dp.ofproto.OFPR_INVALID_TTL != msg.reason: + return + + if not 'in_port' in msg.match: + return + in_port = msg.match['in_port'] + + pkt = packet.Packet(msg.data) + if not pkt.get_protocol(ipv4.ipv4): + return + + o_vlan = pkt.get_protocol(vlan.vlan) + if o_vlan is None: + return + vlan_p = vlan.vlan(vid=o_vlan.vid) + + o_eth = pkt.get_protocol(ethernet.ethernet) + eth = ethernet.ethernet(o_eth.src, o_eth.dst, o_eth.ethertype) + o_ip = pkt.get_protocol(ipv4.ipv4) + # needs to set src properly for either side (vlan or mpls) + # ip = ipv4.ipv4(src=ip_lib.ipv4_to_bin(V1_GS_IP), dst=o_ip.src, + # proto=1) + try: + src_ip = self.traceroute_source[o_vlan.vid]['ip'] + in_port = self.traceroute_source[o_vlan.vid]['port'] + except: + self.logger.info(_({"event": "traceroute error", + "reason": "can't find ip", "vid": o_vlan.vid})) + return + ip = ipv4.ipv4(src=src_ip, dst=o_ip.src, proto=1) + ip_offset = 14 + 4 + # ether + vlan headers + data = msg.data[ip_offset:ip_offset + + (o_ip.header_length * 4 + 8)] + ic = icmp.icmp(icmp.ICMP_TIME_EXCEEDED, 0, 0, + icmp.TimeExceeded(data_len=len(data), data=data)) + + p = packet.Packet(protocols=[eth, vlan_p, ip, ic]) + p.serialize() + actions = [dp.ofproto_parser.OFPActionOutput(dp.ofproto.OFPP_TABLE, 0)] + dp.send_packet_out(in_port=in_port, actions=actions, data=p.data) + + @handler.set_ev_cls(ofp_event.EventOFPPortStatus) + def _port_status_handler(self, ev): + if hasattr(ev, 'msg'): + msg = ev.msg + + reason = msg.reason + datapath = msg.datapath + port = msg.desc + ofproto = datapath.ofproto + self.logger.info(_({"event": "port status change", + "reason": reason, + "port_no": port.port_no, "state": port.state}, + log_type='states')) + # For now just port modifications are reported + if reason == ofproto.OFPPR_MODIFY: + params = {'port_no': port.port_no, 'port_state': port.state} + for peer in self._peers: + peer._endpoint.send_notification("port_status", [params]) + + def _flow_stats_loop(self, dp, table_id, match, interval, key): + while True: + if not key in self.monitored_flows: + break + msg = dp.ofproto_parser.OFPFlowStatsRequest(datapath=dp, + table_id=table_id, + match=match) + dp.send_msg(msg) + hub.sleep(interval) + + def _handle_ofprotocol(self, msgid, params): + try: + param_dict = params[0] + except: + raise RPCError('parameters are missing') + + send_response = True + + dp = None + if 'dpid' in param_dict: + dp = self.dpset.get(int(param_dict['dpid'])) + param_dict.pop('dpid') + else: + # use the first datapath + for k, v in self.dpset.get_all(): + dp = v + break + + if dp is None: + self.logger.info(_({"event": "no datapath, queued", + "msg": str(param_dict)})) + raise PendingRPC() + + contexts = None + ofmsg = None + # default interval + interval = 60 + for k, v in param_dict.items(): + if k == 'ofmsg': + try: + ofmsg = ofproto_parser.ofp_msg_from_jsondict(dp, v) + except: + raise RPCError('parameters are invalid, %s' % + (str(param_dict))) + elif k == 'interval': + interval = int(v) + elif k == 'contexts': + contexts = v + if ofmsg is None: + raise RPCError('"ofmsg" parameter is invalid, %s' % + (str(param_dict))) + if contexts is not None and not isinstance(contexts, dict): + raise RPCError('"contexts" must be dictionary, %s' % + (str(param_dict))) + if contexts is not None and interval == 0: + raise RPCError('"interval" must be non zero with "contexts", %s' % + (str(param_dict))) + + dp.set_xid(ofmsg) + ofmsg.serialize() + if dp.ofproto.OFP_VERSION == ofproto_v1_2.OFP_VERSION: + msg_types = (dp.ofproto.OFPT_STATS_REQUEST, + dp.ofproto.OFPT_BARRIER_REQUEST) + else: + msg_types = (dp.ofproto.OFPT_MULTIPART_REQUEST, + dp.ofproto.OFPT_BARRIER_REQUEST) + + if ofmsg.msg_type in msg_types: + dp.send_msg(ofmsg) + raise NoRPCResponse(dpid=dp.id, xid=ofmsg.xid, msgid=msgid) + + result = {'xid': ofmsg.xid} + if ofmsg.msg_type is dp.ofproto.OFPT_FLOW_MOD: + if contexts is not None: + key = self.format_key(ofmsg.match.to_jsondict()) + if ofmsg.command is dp.ofproto.OFPFC_ADD: + if key in self.monitored_flows: + raise RPCError('the existing flow, %s' % (str(key))) + + self.monitored_flows[key] = contexts + hub.spawn(self._flow_stats_loop, + dp, ofmsg.table_id, ofmsg.match, + interval, key) + + elif ofmsg.command in (dp.ofproto.OFPFC_DELETE, + dp.ofproto.OFPFC_DELETE_STRICT): + try: + del self.monitored_flows[key] + except: + raise RPCError('unknown key, %s' % (str(key))) + else: + raise RPCError('unknown of message, %s' % (str(param_dict))) + + dp.send_msg(ofmsg) + return result + + def _register_traceroute(self, msgid, params): + try: + param_dict = params[0] + except: + raise RPCError('parameters are missing') + try: + self.traceroute_source[param_dict['vlan']] = { + 'ip': param_dict['ip'], + 'port': param_dict['port'] + } + except Exception as e: + raise RPCError('parameters are invalid, %s' % (str(param_dict))) + + self.logger.info(_({'event': 'register traceroute source', + 'vlan': param_dict['vlan'], + 'ip': param_dict['ip'], + 'port': param_dict['port']})) + return {} + + def _monitor_port(self, msgid, params): + try: + param_dict = params[0] + except: + raise RPCError('parameters are missing') + name = None + contexts = None + for k, v in param_dict.items(): + if k == 'physical_port_no': + name = v + elif k == 'contexts': + contexts = v + elif k == 'interval': + self.port_monitor_interval = v + else: + raise RPCError('unknown parameters, %s' % k) + + if contexts is None: + raise RPCError('"contexts" parameter is necessary') + if not isinstance(contexts, dict): + raise RPCError('"contexts" parameter must be dictionary') + if name is None: + raise RPCError('"physical_port_no" parameter is necessary') + self.monitored_ports[name] = contexts + return {} diff --git a/ryu/tests/unit/app/test_apgw_rpc.py b/ryu/tests/unit/app/test_apgw_rpc.py new file mode 100644 index 00000000..64949c58 --- /dev/null +++ b/ryu/tests/unit/app/test_apgw_rpc.py @@ -0,0 +1,592 @@ +import unittest +import logging +import socket +from nose.tools import eq_ +from nose.tools import raises + +from ryu.controller import api +from ryu.controller import dpset + +from ryu.ofproto.ofproto_parser import MsgBase +from ryu.ofproto import ether +from ryu.ofproto import inet +from ryu.ofproto import ofproto_parser +from ryu.ofproto import ofproto_v1_2 +from ryu.ofproto import ofproto_v1_2_parser +from ryu.ofproto import ofproto_v1_3 +from ryu.ofproto import ofproto_v1_3_parser + +from ryu.controller import ofp_event + +from ryu.lib import hub +from ryu.lib import rpc +from ryu.lib.packet import packet +from ryu.lib.packet import ethernet +from ryu.lib.packet import vlan +from ryu.lib.packet import ipv4 +from ryu.lib.packet import icmp + + +class DummyEndpoint(object): + def __init__(self): + self.response = [] + self.notification = [] + + def send_response(self, msgid, error, result): + self.response.append((msgid, error, result)) + + def send_notification(self, method, params): + self.notification.append((method, params)) + + +class DummyDatapath(object): + def __init__(self, ofp=None, ofpp=None): + if ofp is None: + ofp = ofproto_v1_2 + self.ofproto = ofp + if ofpp is None: + ofpp = ofproto_v1_2_parser + self.ofproto_parser = ofpp + self.port_state = {} + self.ports = {} + self.sent = [] + + def set_xid(self, msg): + msg.set_xid(1) + + def send_msg(self, msg): + assert isinstance(msg, MsgBase) + msg.serialize() + self.sent.append(msg) + + def send_packet_out(self, in_port, actions, data): + self.sent.append((in_port, actions, data)) + + +class TestRpcOFPManager(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + def test_logger(self): + _ = api._ + m = {'hello': 'world'} + r = eval(str(_(m))) + eq_(r['msg'], m) + eq_(r['component_name'], _.COMPONENT_NAME) + eq_(r['log_type'], 'log') + + def test_monitor_port(self): + m = api.RpcOFPManager(dpset=None) + msgid = 1 + try: + m._monitor_port(msgid, {}) + except api.RPCError as e: + pass + + try: + m._monitor_port(msgid, {'physical_port_no': 1}) + except api.RPCError as e: + pass + + contents = {'hoge': 'jail'} + r = m._monitor_port(msgid, [{'physical_port_no': 1, + 'contexts': contents, + 'interval': 30}]) + eq_(r, {}) + eq_(m.monitored_ports[1], contents) + + def test_register_traceroute(self): + m = api.RpcOFPManager(dpset=None) + msgid = 1 + try: + m._register_traceroute(msgid, [{}]) + except api.RPCError as e: + pass + + try: + m._register_traceroute(msgid, [{'vlan': 1}]) + except api.RPCError as e: + pass + + vlan_id = 1 + port_no = 10 + m._register_traceroute(msgid, [{'vlan': vlan_id, + 'ip': '192.168.1.1', + 'port': port_no}]) + + def test_handle_ofprotocol_without_dp(self): + m = api.RpcOFPManager(dpset=dpset.DPSet()) + msgid = 1 + try: + m._handle_ofprotocol(msgid, [{}]) + except api.PendingRPC: + pass + + try: + m._handle_ofprotocol(msgid, [{'dpid': 1}]) + except api.PendingRPC: + pass + + def _create_dpset(self, dpid=0, ports=None, ofp=None, ofpp=None): + dps = dpset.DPSet() + dp = DummyDatapath(ofp=ofp, ofpp=ofpp) + dp.id = dpid + if ports: + class DummyPort(object): + def __init__(self, port_no): + self.name = port_no + dps.ports = dict(map((lambda n: (n, DummyPort(n))), ports)) + dps.get_ports = lambda dpid: dps.ports + + dps._register(dp) + return dps + + def _test_handle_ofprotocol_flowmod(self, ofp, ofpp): + dpid = 10 + dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp) + m = api.RpcOFPManager(dpset=dps) + msgid = 1 + nr_sent = 0 + + try: + m._handle_ofprotocol(msgid, [{'dpid': dpid}]) + except api.RPCError: + pass + + dp = dps.get(dpid) + match = ofpp.OFPMatch(in_port=1) + ofmsg = ofpp.OFPFlowMod(datapath=dp, match=match) + + r = m._handle_ofprotocol(msgid, [{'dpid': dpid, + 'ofmsg': ofmsg.to_jsondict()}]) + nr_sent += 1 + eq_(r, {'xid': 1}) + eq_(len(dp.sent), nr_sent) + + contexts = {'hello': 'world'} + r = m._handle_ofprotocol(msgid, [{'ofmsg': ofmsg.to_jsondict(), + 'internal': 30, + 'contexts': contexts}]) + nr_sent += 1 + eq_(r, {'xid': 1}) + eq_(len(dp.sent), nr_sent) + eq_(len(m.monitored_flows), 1) + eq_(m.monitored_flows[m.format_key(match.to_jsondict())], + contexts) + + match = ofpp.OFPMatch(in_port=1) + ofmsg = ofpp.OFPFlowMod(datapath=dp, + command=ofproto_v1_2.OFPFC_DELETE, + match=match) + + r = m._handle_ofprotocol(msgid, [{'dpid': dpid, + 'ofmsg': ofmsg.to_jsondict(), + 'contexts': contexts}]) + nr_sent += 1 + eq_(r, {'xid': 1}) + eq_(len(dp.sent), nr_sent) + + try: + r = m._handle_ofprotocol(msgid, [{'dpid': dpid, + 'ofmsg': ofmsg.to_jsondict(), + 'contexts': contexts}]) + except api.RPCError as e: + assert 'unknown key' in str(e) + eq_(len(dp.sent), nr_sent) + + def test_handle_ofprotocol_flowmod_12(self): + self._test_handle_ofprotocol_flowmod(ofproto_v1_2, ofproto_v1_2_parser) + + def test_handle_ofprotocol_flowmod_13(self): + self._test_handle_ofprotocol_flowmod(ofproto_v1_3, ofproto_v1_3_parser) + + def _test_handle_ofprotocol(self, ofp, ofpp): + dpid = 10 + dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp) + m = api.RpcOFPManager(dpset=dps) + msgid = 1 + nr_sent = 0 + + dp = dps.get(dpid) + ofmsg = ofpp.OFPBarrierRequest(datapath=dp) + nr_sent += 1 + try: + r = m._handle_ofprotocol(msgid, [{'dpid': dpid, + 'ofmsg': ofmsg.to_jsondict()}]) + except api.NoRPCResponse as e: + eq_(e.dpid, dpid) + eq_(e.xid, 1) + eq_(e.msgid, msgid) + eq_(len(dp.sent), nr_sent) + + ofmsg = ofpp.OFPFlowStatsRequest(datapath=dp) + nr_sent += 1 + try: + r = m._handle_ofprotocol(msgid, [{'dpid': dpid, + 'ofmsg': ofmsg.to_jsondict()}]) + except api.NoRPCResponse as e: + eq_(e.dpid, dpid) + eq_(e.xid, 1) + eq_(e.msgid, msgid) + eq_(len(dp.sent), nr_sent) + + ofmsg = ofpp.OFPHello(datapath=dp) + try: + r = m._handle_ofprotocol(msgid, [{'dpid': dpid, + 'ofmsg': ofmsg.to_jsondict()}]) + except api.RPCError as e: + assert 'unknown of message' in str(e) + eq_(len(dp.sent), nr_sent) + + def test_handle_ofprotocol_12(self): + self._test_handle_ofprotocol(ofproto_v1_2, ofproto_v1_2_parser) + + def test_handle_ofprotocol_13(self): + self._test_handle_ofprotocol(ofproto_v1_3, ofproto_v1_3_parser) + + def _test_port_status_handler(self, ofp, ofpp): + dpid = 10 + dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp) + dp = dps.get(dpid) + port = ofpp.OFPPort(1, 'aa:aa:aa:aa:aa:aa', 'hoge', 0, 0, 0, + 0, 0, 0, 0, 0) + msg = ofpp.OFPPortStatus(datapath=dp, reason=ofp.OFPPR_MODIFY, + desc=port) + ev = ofp_event.EventOFPPortStatus(msg) + + m = api.RpcOFPManager(dpset=dps) + peer = api.Peer(None) + peer._endpoint = DummyEndpoint() + m._peers.append(peer) + m._port_status_handler(ev) + + eq_(len(peer._endpoint.notification), 1) + (method, params) = peer._endpoint.notification[0] + eq_(method, 'port_status') + eq_(params[0], {'port_no': port.port_no, 'port_state': port.state}) + + def test_port_status_handler_12(self): + self._test_port_status_handler(ofproto_v1_2, ofproto_v1_2_parser) + + def test_port_status_handler_13(self): + self._test_port_status_handler(ofproto_v1_3, ofproto_v1_3_parser) + + def _test_packet_in_handler(self, ofp, ofpp): + dpid = 10 + src_ip = '10.0.0.1' + dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp) + dp = dps.get(dpid) + msg = ofpp.OFPPacketIn(datapath=dp) + ev = ofp_event.EventOFPPacketIn(msg) + + in_port = 10 + vlan_id = 100 + target_ip = '192.168.1.1' + msg.match = ofpp.OFPMatch(in_port=in_port) + protocols = [ethernet.ethernet(ethertype=ether.ETH_TYPE_8021Q), + vlan.vlan(vid=vlan_id), ipv4.ipv4(src=target_ip)] + p = packet.Packet(protocols=protocols) + p.serialize() + msg.data = p.data + msg.reason = ofp.OFPR_INVALID_TTL + + m = api.RpcOFPManager(dpset=dps) + port_no = 5 + msgid = 7 + + # test the failure case + m._packet_in_handler(ev) + + m._register_traceroute(msgid, [{'vlan': vlan_id, + 'ip': src_ip, + 'port': port_no}]) + m._packet_in_handler(ev) + + (in_port, actions, data) = dp.sent[0] + eq_(in_port, port_no) + pkt = packet.Packet(data) + ip = pkt.get_protocol(ipv4.ipv4) + v = pkt.get_protocol(vlan.vlan) + ic = pkt.get_protocol(icmp.icmp) + eq_(ip.src, src_ip) + eq_(ip.dst, target_ip) + eq_(ip.proto, inet.IPPROTO_ICMP) + eq_(v.vid, vlan_id) + eq_(ic.type, icmp.ICMP_TIME_EXCEEDED) + eq_(len(actions), 1) + eq_(actions[0].port, ofp.OFPP_TABLE) + + def test_packet_in_handler_12(self): + self._test_packet_in_handler(ofproto_v1_2, ofproto_v1_2_parser) + + def test_packet_in_handler_13(self): + self._test_packet_in_handler(ofproto_v1_3, ofproto_v1_3_parser) + + def _test_handler_datapath(self, ofp, parser): + dpid = 10 + dps = self._create_dpset(dpid, ofp=ofp, ofpp=parser) + dp = dps.get(dpid) + ev = dpset.EventDP(dp=dp, enter_leave=True) + + m = api.RpcOFPManager(dpset=dps) + + peer = api.Peer(None) + peer._endpoint = DummyEndpoint() + m._peers.append(peer) + + m._handler_datapath(ev) + eq_(len(peer._endpoint.notification), 1) + (method, params) = peer._endpoint.notification[0] + eq_(method, 'state') + eq_(params[0], {'secure_channel_state': 'Up'}) + + peer._endpoint.notification.pop() + ev = dpset.EventDP(dp=dp, enter_leave=False) + m._handler_datapath(ev) + eq_(len(peer._endpoint.notification), 1) + (method, params) = peer._endpoint.notification[0] + eq_(method, 'state') + eq_(params[0], {'secure_channel_state': 'Down'}) + + eq_(len(dp.sent), 1) + return dp + + def test_handler_datapath_12(self): + dp = self._test_handler_datapath(ofproto_v1_2, ofproto_v1_2_parser) + msg = dp.sent[0] + eq_(msg.__class__, ofproto_v1_2_parser.OFPSetConfig) + eq_(msg.flags, ofproto_v1_2.OFPC_INVALID_TTL_TO_CONTROLLER) + + def test_handler_datapath_13(self): + ofp = ofproto_v1_3 + dp = self._test_handler_datapath(ofp, ofproto_v1_3_parser) + msg = dp.sent[0] + eq_(msg.__class__, ofproto_v1_3_parser.OFPSetAsync) + eq_(msg.packet_in_mask, + [ofp.OFPR_ACTION | ofp.OFPR_INVALID_TTL, 0]) + eq_(msg.port_status_mask, + [(ofp.OFPPR_ADD | ofp.OFPPR_DELETE | ofp.OFPPR_MODIFY), 0]) + + def _test_port_status_thread(self, ofp, ofpp): + dpid = 10 + dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp) + dp = dps.get(dpid) + m = api.RpcOFPManager(dpset=dps) + m.port_monitor_interval = 10 + + threads = [] + with hub.Timeout(2): + threads.append(hub.spawn(m._port_status_thread)) + + hub.sleep(0.5) + for t in threads: + hub.kill(t) + hub.joinall(threads) + + for m in dp.sent: + eq_(m.__class__, ofpp.OFPPortStatsRequest) + eq_(m.port_no, ofp.OFPP_ANY) + + def test_port_status_thread_12(self): + self._test_port_status_thread(ofproto_v1_2, ofproto_v1_2_parser) + + def test_port_status_thread_13(self): + self._test_port_status_thread(ofproto_v1_3, ofproto_v1_3_parser) + + def _test_flow_stats_loop(self, ofp, ofpp): + dpid = 10 + dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp) + dp = dps.get(dpid) + m = api.RpcOFPManager(dpset=dps) + + match = ofpp.OFPMatch(in_port=1) + ofmsg = ofpp.OFPFlowMod(datapath=dp, match=match) + + contexts = {'hello': 'you'} + msgid = 10 + r = m._handle_ofprotocol(msgid, [{'ofmsg': ofmsg.to_jsondict(), + 'internal': 30, + 'contexts': contexts}]) + threads = [] + key = m.format_key(ofmsg.match.to_jsondict()) + with hub.Timeout(5): + threads.append(hub.spawn(m._flow_stats_loop, + dp, ofmsg.table_id, match, 0.1, key)) + hub.sleep(0.5) + + ofmsg = ofpp.OFPFlowMod(datapath=dp, + command=ofp.OFPFC_DELETE, + match=match) + msgid = 11 + r = m._handle_ofprotocol(msgid, [{'ofmsg': ofmsg.to_jsondict(), + 'internal': 30, + 'contexts': contexts}]) + eq_(len(m.monitored_flows), 0) + hub.joinall(threads) + + for m in dp.sent: + if m.__class__ in (ofpp.OFPFlowMod, ofpp.OFPPortStatsRequest): + continue + eq_(m.__class__, ofpp.OFPFlowStatsRequest) + eq_(m.table_id, ofmsg.table_id) + eq_(m.match.to_jsondict(), match.to_jsondict()) + + def test_flow_stats_loop_12(self): + self._test_flow_stats_loop(ofproto_v1_2, ofproto_v1_2_parser) + + def test_flow_stats_loop_13(self): + self._test_flow_stats_loop(ofproto_v1_3, ofproto_v1_3_parser) + + def _test_rpc_message_thread(self, ofp, ofpp): + dpid = 10 + dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp) + dp = dps.get(dpid) + m = api.RpcOFPManager(dpset=dps) + + peer = api.Peer(m._rpc_events) + peer._endpoint = DummyEndpoint() + m._peers.append(peer) + + msgid = 7 + ofmsg = ofpp.OFPBarrierRequest(datapath=dp) + params = {'dpid': dpid, + 'ofmsg': ofmsg.to_jsondict()} + data = (msgid, 'ofp', [params]) + with hub.Timeout(2): + peer._handle_rpc_request(data) + hub.sleep(0.5) + + for s in dp.sent: + if s.__class__ in (ofpp.OFPPortStatsRequest,): + continue + eq_(s.__class__, ofpp.OFPBarrierRequest) + + msg = ofpp.OFPBarrierReply(datapath=dp) + msg.set_xid(1) + ev = ofp_event.EventOFPBarrierReply(msg) + m._barrier_reply_handler(ev) + eq_(len(peer._endpoint.response), 1) + rsp = peer._endpoint.response.pop() + eq_(rsp[0], msgid) + eq_(rsp[1], None) + eq_(rsp[2], msg.to_jsondict()) + eq_(len(peer.wait_for_ofp_resepnse[dpid]), 0) + + m._barrier_reply_handler(ev) + eq_(len(peer._endpoint.response), 0) + + ev = dpset.EventDP(dp=dp, enter_leave=False) + m._handler_datapath(ev) + eq_(len(peer.wait_for_ofp_resepnse), 0) + + def test_rpc_message_thread_12(self): + self._test_rpc_message_thread(ofproto_v1_2, ofproto_v1_2_parser) + + def test_rpc_message_thread_13(self): + self._test_rpc_message_thread(ofproto_v1_3, ofproto_v1_3_parser) + + def _create_port_stats_args(self, port_no=0): + return {'port_no': port_no, 'rx_packets': 10, 'tx_packets': 30, + 'rx_bytes': 1024, 'tx_bytes': 2048, + 'rx_dropped': 0, 'tx_dropped': 1, + 'rx_errors': 0, 'tx_errors': 0, + 'rx_frame_err': 0, 'rx_over_err': 0, 'rx_crc_err': 0, + 'collisions': 0} + + def _test_stats_reply_port_handler(self, ofp, ofpp): + dpid = 10 + dps = self._create_dpset(dpid, (0, 1, 2), ofp, ofpp) + dp = dps.get(dpid) + m = api.RpcOFPManager(dpset=dps) + + msgid = 1 + r = m._monitor_port(msgid, [{'physical_port_no': 0, + 'contexts': {'hello': 'world'}, + 'interval': 30}]) + return m, dp + + def test_stats_reply_port_handler_12(self): + ofp = ofproto_v1_2 + ofpp = ofproto_v1_2_parser + manager, dp = self._test_stats_reply_port_handler(ofp, ofpp) + + msg = ofpp.OFPStatsReply(datapath=dp, type_=ofp.OFPST_PORT) + ev = ofp_event.EventOFPStatsReply(msg) + msg.body = [] + for p in range(2): + port = ofpp.OFPPortStats(**self._create_port_stats_args(p)) + msg.body.append(port) + manager._stats_reply_handler(ev) + + def test_stats_reply_port_handler_13(self): + ofp = ofproto_v1_3 + ofpp = ofproto_v1_3_parser + manager, dp = self._test_stats_reply_port_handler(ofp, ofpp) + msg = ofpp.OFPPortStatsReply(dp) + ev = ofp_event.EventOFPPortStatsReply(msg) + msg.body = [] + for p in range(2): + d = self._create_port_stats_args(p) + d['duration_sec'] = 0 + d['duration_nsec'] = 0 + port = ofpp.OFPPortStats(**d) + msg.body.append(port) + manager._port_stats_reply_handler(ev) + + def _test_stats_reply_flow_handler(self, ofp, ofpp): + dpid = 10 + msgid = 9 + dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp) + m = api.RpcOFPManager(dpset=dps) + + dp = dps.get(dpid) + match = ofpp.OFPMatch(in_port=1) + ofmsg = ofpp.OFPFlowMod(datapath=dp, match=match) + + contexts = {'hello': 'world'} + r = m._handle_ofprotocol(msgid, [{'dpid': dpid, + 'ofmsg': ofmsg.to_jsondict(), + 'contexts': contexts}]) + return m, dp + + def test_stats_reply_flow_handler_12(self): + ofp = ofproto_v1_2 + ofpp = ofproto_v1_2_parser + manager, dp = self._test_stats_reply_flow_handler(ofp, ofpp) + + msg = ofpp.OFPStatsReply(datapath=dp, + type_=ofp.OFPST_FLOW) + ev = ofp_event.EventOFPStatsReply(msg) + s1 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10, + priority=0, idle_timeout=0, hard_timeout=0, + cookie=0, packet_count=10, byte_count=100, + match=ofpp.OFPMatch(in_port=1)) + # not-monitored flow + s2 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10, + priority=0, idle_timeout=0, hard_timeout=0, + cookie=0, packet_count=10, byte_count=100, + match=ofpp.OFPMatch(in_port=2)) + msg.body = [s1, s2] + manager._stats_reply_handler(ev) + + def test_stats_reply_flow_handler_13(self): + ofp = ofproto_v1_3 + ofpp = ofproto_v1_3_parser + manager, dp = self._test_stats_reply_flow_handler(ofp, ofpp) + + msg = ofpp.OFPFlowStatsReply(datapath=dp) + ev = ofp_event.EventOFPStatsReply(msg) + s1 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10, + priority=0, idle_timeout=0, hard_timeout=0, + cookie=0, packet_count=10, byte_count=100, + match=ofpp.OFPMatch(in_port=1)) + # not-monitored flow + s2 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10, + priority=0, idle_timeout=0, hard_timeout=0, + cookie=0, packet_count=10, byte_count=100, + match=ofpp.OFPMatch(in_port=2)) + msg.body = [s1, s2] + manager._flow_stats_reply_handler(ev)