mirror of
https://github.com/faucetsdn/ryu.git
synced 2026-01-24 18:11:24 +01:00
Compare commits
37 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bc35ba0178 | ||
|
|
a2106b6676 | ||
|
|
e94ee078ee | ||
|
|
be8f4d656a | ||
|
|
fd81201a52 | ||
|
|
df35e35703 | ||
|
|
fa709ba75b | ||
|
|
5a4d429a6d | ||
|
|
690edd4000 | ||
|
|
f0e9c92f78 | ||
|
|
dca44fee6d | ||
|
|
294d84ef33 | ||
|
|
78d3ec5a68 | ||
|
|
a3fda22167 | ||
|
|
bf21ed0b0b | ||
|
|
0edb286f83 | ||
|
|
374f32a20a | ||
|
|
53cfa6dacf | ||
|
|
278bc8fe00 | ||
|
|
9ce8698b21 | ||
|
|
a7b6f0a036 | ||
|
|
3b6842eaed | ||
|
|
dab0aca5cc | ||
|
|
b91a263d07 | ||
|
|
1e56302a56 | ||
|
|
0f087cefeb | ||
|
|
9131d24c00 | ||
|
|
152ad2ea3e | ||
|
|
9b9208d9e6 | ||
|
|
692290fd67 | ||
|
|
87fba4e199 | ||
|
|
393e354b97 | ||
|
|
5b487b165a | ||
|
|
208ba0b039 | ||
|
|
a0501a12b1 | ||
|
|
47ef512494 | ||
|
|
eb026768fc |
@ -40,6 +40,7 @@ from ryu import version
|
||||
from ryu.app import wsgi
|
||||
from ryu.base.app_manager import AppManager
|
||||
from ryu.controller import controller
|
||||
from ryu.controller import api
|
||||
from ryu.topology import switches
|
||||
|
||||
|
||||
|
||||
@ -143,7 +143,7 @@ class Cmd(cmd.Cmd):
|
||||
|
||||
def do_get(self, line):
|
||||
"""get <peer>
|
||||
eg. get_config sw1
|
||||
eg. get sw1
|
||||
"""
|
||||
|
||||
def f(p, args):
|
||||
@ -151,6 +151,26 @@ class Cmd(cmd.Cmd):
|
||||
|
||||
self._request(line, f)
|
||||
|
||||
def do_commit(self, line):
|
||||
"""commit <peer>
|
||||
eg. commit sw1
|
||||
"""
|
||||
|
||||
def f(p, args):
|
||||
print p.commit()
|
||||
|
||||
self._request(line, f)
|
||||
|
||||
def do_discard(self, line):
|
||||
"""discard <peer>
|
||||
eg. discard sw1
|
||||
"""
|
||||
|
||||
def f(p, args):
|
||||
print p.discard_changes()
|
||||
|
||||
self._request(line, f)
|
||||
|
||||
def do_get_config(self, line):
|
||||
"""get_config <peer> <source>
|
||||
eg. get_config sw1 startup
|
||||
|
||||
759
ryu/controller/api.py
Normal file
759
ryu/controller/api.py
Normal file
@ -0,0 +1,759 @@
|
||||
import json
|
||||
import logging
|
||||
from operator import attrgetter
|
||||
from oslo.config import cfg
|
||||
from ryu.base import app_manager
|
||||
from ryu.controller import handler
|
||||
from ryu.controller import dpset
|
||||
from ryu.controller import ofp_event
|
||||
from ryu.ofproto import ofproto_parser
|
||||
from ryu.ofproto import ofproto_v1_2
|
||||
from ryu.ofproto import ofproto_v1_2_parser
|
||||
from ryu.ofproto import ofproto_v1_3
|
||||
from ryu.ofproto import ofproto_v1_3_parser
|
||||
from ryu.lib import hub
|
||||
from ryu.lib import apgw
|
||||
from ryu.lib import rpc
|
||||
from ryu.lib.packet import packet
|
||||
from ryu.lib.packet import ethernet
|
||||
from ryu.lib.packet import icmp
|
||||
from ryu.lib.packet import ipv4
|
||||
from ryu.lib.packet import vlan
|
||||
from ryu.lib.of_config import capable_switch as cs
|
||||
from ryu.lib.of_config import constants as consts
|
||||
import ryu.lib.of_config.classes as ofc
|
||||
import eventlet
|
||||
import sys
|
||||
|
||||
_OFCONFIG_RETRIES = 5
|
||||
_OFCONFIG_TIMEOUT = 280
|
||||
|
||||
_ = type('', (apgw.StructuredMessage,), {})
|
||||
_.COMPONENT_NAME = 'ofwire'
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_cli_opts([
|
||||
cfg.StrOpt('ofconfig-address', default='127.0.0.1',
|
||||
help='of-config switch address'),
|
||||
cfg.IntOpt('ofconfig-port', default=1830,
|
||||
help='of-config switch port'),
|
||||
cfg.StrOpt('ofconfig-user', default='linc',
|
||||
help='of-config user name'),
|
||||
cfg.StrOpt('ofconfig-password', default='linc',
|
||||
help='of-config password'),
|
||||
cfg.StrOpt('ofconfig-timeout', default=_OFCONFIG_TIMEOUT,
|
||||
help='of-config timeout per attempt'),
|
||||
cfg.StrOpt('ofconfig-retries', default=_OFCONFIG_RETRIES,
|
||||
help='of-config retries'),
|
||||
])
|
||||
|
||||
|
||||
class RPCError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NoRPCResponse(Exception):
|
||||
def __init__(self, dpid=None, xid=None, msgid=None):
|
||||
self.dpid = dpid
|
||||
self.xid = xid
|
||||
self.msgid = msgid
|
||||
|
||||
|
||||
class PendingRPC(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Peer(object):
|
||||
def __init__(self, queue):
|
||||
super(Peer, self).__init__()
|
||||
self._queue = queue
|
||||
self.wait_for_ofp_resepnse = {}
|
||||
|
||||
def _handle_rpc_request(self, data):
|
||||
self._queue.put((self, rpc.MessageType.REQUEST, data))
|
||||
|
||||
def _handle_rpc_notify(self, data):
|
||||
self._queue.put((self, rpc.MessageType.NOTIFY, data))
|
||||
|
||||
|
||||
class RpcOFPManager(app_manager.RyuApp):
|
||||
OFP_VERSIONS = [ofproto_v1_2.OFP_VERSION, ofproto_v1_3.OFP_VERSION]
|
||||
LOGGER_NAME = 'ofwire'
|
||||
_CONTEXTS = {
|
||||
'dpset': dpset.DPSet,
|
||||
}
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(RpcOFPManager, self).__init__(*args, **kwargs)
|
||||
self.dpset = kwargs['dpset']
|
||||
self._peers = []
|
||||
self.traceroute_source = {}
|
||||
self.monitored_ports = {}
|
||||
self.monitored_flows = {}
|
||||
self.monitored_meters = {}
|
||||
self.monitored_queues = {}
|
||||
self.pending_rpc_requests = []
|
||||
self._rpc_events = hub.Queue(128)
|
||||
# per 30 secs by default
|
||||
self.ofconfig = hub.Queue(128)
|
||||
hub.spawn(self._peer_accept_thread)
|
||||
hub.spawn(self._rpc_message_thread)
|
||||
hub.spawn(self._ofconfig_thread)
|
||||
apgw.update_syslog_format()
|
||||
|
||||
def _rpc_message_thread(self):
|
||||
while True:
|
||||
(peer, _type, data) = self._rpc_events.get()
|
||||
error = None
|
||||
result = None
|
||||
try:
|
||||
if _type == rpc.MessageType.REQUEST:
|
||||
msgid, target_method, params = data
|
||||
if target_method == "ofp":
|
||||
result = self._handle_ofprotocol(msgid, params)
|
||||
elif target_method == "monitor_port":
|
||||
result = self._monitor_port(msgid, params)
|
||||
elif target_method == "monitor_queue":
|
||||
result = self._monitor_queue(msgid, params)
|
||||
elif target_method == "ofconfig":
|
||||
self._ofconfig(peer, msgid, params)
|
||||
else:
|
||||
error = 'Unknown method %s' % (target_method)
|
||||
elif _type == rpc.MessageType.NOTIFY:
|
||||
target_method, params = data
|
||||
if target_method == 'traceroute':
|
||||
try:
|
||||
self._register_traceroute(params)
|
||||
except RPCError as e:
|
||||
self.logger.error(_({'error': str(e)}))
|
||||
else:
|
||||
self.logger.error(_({'unknown method': target_method}))
|
||||
continue
|
||||
except RPCError as e:
|
||||
error = str(e)
|
||||
except PendingRPC as e:
|
||||
# we handle the RPC request after a datapath joins.
|
||||
self.pending_rpc_requests.append((peer, data))
|
||||
continue
|
||||
except NoRPCResponse as e:
|
||||
# we'll send RPC sesponse after we get a response from
|
||||
# datapath.
|
||||
if e.dpid is not None:
|
||||
d = peer.wait_for_ofp_resepnse.setdefault(e.dpid, {})
|
||||
d[e.xid] = e.msgid
|
||||
continue
|
||||
except:
|
||||
self.logger.info(_({'bogus RPC': data}))
|
||||
|
||||
peer._endpoint.send_response(msgid, error=error, result=result)
|
||||
|
||||
def _peer_loop_thread(self, peer):
|
||||
peer._endpoint.serve()
|
||||
# the peer connection is closed
|
||||
self._peers.remove(peer)
|
||||
|
||||
def peer_accept_handler(self, new_sock, addr):
|
||||
peer = Peer(self._rpc_events)
|
||||
table = {
|
||||
rpc.MessageType.REQUEST: peer._handle_rpc_request,
|
||||
rpc.MessageType.NOTIFY: peer._handle_rpc_notify,
|
||||
}
|
||||
peer._endpoint = rpc.EndPoint(new_sock, disp_table=table)
|
||||
self._peers.append(peer)
|
||||
hub.spawn(self._peer_loop_thread, peer)
|
||||
|
||||
def _peer_accept_thread(self):
|
||||
server = hub.StreamServer(('', 50001),
|
||||
self.peer_accept_handler)
|
||||
server.serve_forever()
|
||||
|
||||
def _send_waited_rpc_response(self, msg):
|
||||
for peer in self._peers:
|
||||
if not msg.datapath.id in peer.wait_for_ofp_resepnse:
|
||||
continue
|
||||
if msg.xid in peer.wait_for_ofp_resepnse[msg.datapath.id]:
|
||||
msgid = peer.wait_for_ofp_resepnse[msg.datapath.id][msg.xid]
|
||||
peer._endpoint.send_response(msgid, error=None,
|
||||
result=msg.to_jsondict())
|
||||
del peer.wait_for_ofp_resepnse[msg.datapath.id][msg.xid]
|
||||
return
|
||||
|
||||
def compare_key(self, k1, k2):
|
||||
k1 = eval(k1)
|
||||
k2 = eval(k2)
|
||||
l1 = k1['OFPMatch']['oxm_fields']
|
||||
l2 = k2['OFPMatch']['oxm_fields']
|
||||
return sorted(l1) == sorted(l2)
|
||||
|
||||
def format_key(self, match_json):
|
||||
del match_json['OFPMatch']['length']
|
||||
for t in match_json['OFPMatch']['oxm_fields']:
|
||||
tlv = t['OXMTlv']
|
||||
if tlv['field'] in ['ipv4_dst', 'ipv4_src']:
|
||||
if tlv['mask'] == '255.255.255.255':
|
||||
tlv['mask'] = None
|
||||
return str(match_json)
|
||||
|
||||
@handler.set_ev_cls(dpset.EventDP)
|
||||
def _handler_datapath(self, ev):
|
||||
if ev.enter:
|
||||
dp = ev.dp
|
||||
parser = dp.ofproto_parser
|
||||
ofp = dp.ofproto
|
||||
if ofp.OFP_VERSION == ofproto_v1_2.OFP_VERSION:
|
||||
m = parser.OFPSetConfig(dp,
|
||||
ofp.OFPC_INVALID_TTL_TO_CONTROLLER,
|
||||
ofp.OFPCML_MAX)
|
||||
elif ofp.OFP_VERSION == ofproto_v1_3.OFP_VERSION:
|
||||
packet_in_mask = (ofp.OFPR_ACTION_MASK |
|
||||
ofp.OFPR_INVALID_TTL_MASK)
|
||||
port_status_mask = (ofp.OFPPR_ADD_MASK |
|
||||
ofp.OFPPR_DELETE_MASK |
|
||||
ofp.OFPPR_MODIFY_MASK)
|
||||
m = parser.OFPSetAsync(dp, [packet_in_mask, 0],
|
||||
[port_status_mask, 0],
|
||||
[0, 0])
|
||||
dp.send_msg(m)
|
||||
|
||||
log_msg = {"event": "dp connected", "dpid": ev.dp.id}
|
||||
notify_param = {'secure_channel_state': 'Up'}
|
||||
for p in self.pending_rpc_requests:
|
||||
(peer, data) = p
|
||||
self._rpc_events.put((peer, rpc.MessageType.REQUEST, data))
|
||||
else:
|
||||
log_msg = {"event": "dp disconnected"}
|
||||
notify_param = {'secure_channel_state': 'Down'}
|
||||
for peer in self._peers:
|
||||
if ev.dp.id in peer.wait_for_ofp_resepnse:
|
||||
del peer.wait_for_ofp_resepnse[ev.dp.id]
|
||||
|
||||
self.logger.critical(_(log_msg))
|
||||
for peer in self._peers:
|
||||
peer._endpoint.send_notification("state", [notify_param])
|
||||
|
||||
@handler.set_ev_cls(ofp_event.EventOFPErrorMsg,
|
||||
handler.MAIN_DISPATCHER)
|
||||
def _error_msg_handler(self, ev):
|
||||
self.logger.info(_(ev.msg.to_jsondict()))
|
||||
|
||||
@handler.set_ev_cls(ofp_event.EventOFPBarrierReply,
|
||||
handler.MAIN_DISPATCHER)
|
||||
def _barrier_reply_handler(self, ev):
|
||||
self._send_waited_rpc_response(ev.msg)
|
||||
|
||||
@handler.set_ev_cls(ofp_event.EventOFPFlowStatsReply,
|
||||
handler.MAIN_DISPATCHER)
|
||||
def _flow_stats_reply_handler(self, ev):
|
||||
msg = ev.msg
|
||||
for body in msg.body:
|
||||
key = self.format_key(body.match.to_jsondict())
|
||||
contexts = None
|
||||
for k in self.monitored_flows.keys():
|
||||
if self.compare_key(k, key):
|
||||
contexts = self.monitored_flows[k]
|
||||
break
|
||||
if contexts is not None:
|
||||
stats = body.to_jsondict()['OFPFlowStats']
|
||||
stats.update(contexts)
|
||||
self.logger.info(_(msg=stats, log_type='stats'))
|
||||
|
||||
@handler.set_ev_cls(ofp_event.EventOFPPortStatsReply,
|
||||
handler.MAIN_DISPATCHER)
|
||||
def _port_stats_reply_handler(self, ev):
|
||||
msg = ev.msg
|
||||
dp = msg.datapath
|
||||
for stat in sorted(msg.body, key=attrgetter('port_no')):
|
||||
try:
|
||||
port = self.dpset.get_port(dp.id, stat.port_no)
|
||||
except:
|
||||
continue
|
||||
if port.name in self.monitored_ports:
|
||||
stats = {'physical_port_no': port.name}
|
||||
stats.update(stat.to_jsondict()['OFPPortStats'])
|
||||
contexts, interval_ = self.monitored_ports[port.name]
|
||||
stats.update(contexts)
|
||||
self.logger.info(_(msg=stats, log_type='stats'))
|
||||
|
||||
def _add_band_name(self, stats):
|
||||
new_stats = [
|
||||
{'band_name': {'pir': stats[0]}},
|
||||
{'band_name': {'cir': stats[1]}}
|
||||
]
|
||||
return new_stats
|
||||
|
||||
@handler.set_ev_cls(ofp_event.EventOFPMeterStatsReply,
|
||||
handler.MAIN_DISPATCHER)
|
||||
def _meter_stats_reply_handler(self, ev):
|
||||
msg = ev.msg
|
||||
dp = msg.datapath
|
||||
for stat in msg.body:
|
||||
if stat.meter_id in self.monitored_meters:
|
||||
contexts = self.monitored_meters[stat.meter_id]
|
||||
stats = stat.to_jsondict()['OFPMeterStats']
|
||||
stats['band_stats'] = self._add_band_name(stats['band_stats'])
|
||||
stats.update(contexts)
|
||||
self.logger.info(_(msg=stats, log_type='stats'))
|
||||
|
||||
@handler.set_ev_cls(ofp_event.EventOFPQueueStatsReply,
|
||||
handler.MAIN_DISPATCHER)
|
||||
def _queue_stats_reply_handler(self, ev):
|
||||
msg = ev.msg
|
||||
dp = msg.datapath
|
||||
for stat in msg.body:
|
||||
if stat.queue_id in self.monitored_queues:
|
||||
contexts, interval_ = self.monitored_queues[stat.queue_id]
|
||||
stats = stat.to_jsondict()['OFPQueueStats']
|
||||
stats = {'queue_id': stat.queue_id,
|
||||
'port_no': stat.port_no,
|
||||
'tx_bytes': stat.tx_bytes,
|
||||
'tx_packets': stat.tx_packets,
|
||||
'tx_errors': stat.tx_errors}
|
||||
stats.update(contexts)
|
||||
self.logger.info(_(msg=stats, log_type='stats'))
|
||||
|
||||
@handler.set_ev_cls(ofp_event.EventOFPStatsReply,
|
||||
handler.MAIN_DISPATCHER)
|
||||
def _stats_reply_handler(self, ev):
|
||||
msg = ev.msg
|
||||
self._send_waited_rpc_response(msg)
|
||||
|
||||
if msg.type == ofproto_v1_2.OFPST_FLOW:
|
||||
self._flow_stats_reply_handler(ev)
|
||||
elif msg.type == ofproto_v1_2.OFPST_PORT:
|
||||
self._port_stats_reply_handler(ev)
|
||||
|
||||
@handler.set_ev_cls(ofp_event.EventOFPPacketIn)
|
||||
def _packet_in_handler(self, ev):
|
||||
msg = ev.msg
|
||||
dp = msg.datapath
|
||||
self.logger.info(_({"event": "packet_in", "reason": msg.reason}))
|
||||
if dp.ofproto.OFPR_INVALID_TTL != msg.reason:
|
||||
return
|
||||
|
||||
if not 'in_port' in msg.match:
|
||||
return
|
||||
in_port = msg.match['in_port']
|
||||
|
||||
pkt = packet.Packet(msg.data)
|
||||
if not pkt.get_protocol(ipv4.ipv4):
|
||||
return
|
||||
|
||||
o_vlan = pkt.get_protocol(vlan.vlan)
|
||||
if o_vlan is None:
|
||||
return
|
||||
vlan_p = vlan.vlan(vid=o_vlan.vid)
|
||||
|
||||
o_eth = pkt.get_protocol(ethernet.ethernet)
|
||||
eth = ethernet.ethernet(o_eth.src, o_eth.dst, o_eth.ethertype)
|
||||
o_ip = pkt.get_protocol(ipv4.ipv4)
|
||||
# needs to set src properly for either side (vlan or mpls)
|
||||
# ip = ipv4.ipv4(src=ip_lib.ipv4_to_bin(V1_GS_IP), dst=o_ip.src,
|
||||
# proto=1)
|
||||
try:
|
||||
src_ip = self.traceroute_source[o_vlan.vid]['ip']
|
||||
in_port = self.traceroute_source[o_vlan.vid]['port']
|
||||
except:
|
||||
self.logger.info(_({"event": "traceroute error",
|
||||
"reason": "can't find ip", "vid": o_vlan.vid}))
|
||||
return
|
||||
ip = ipv4.ipv4(src=src_ip, dst=o_ip.src, proto=1)
|
||||
ip_offset = 14 + 4
|
||||
# ether + vlan headers
|
||||
data = msg.data[ip_offset:ip_offset +
|
||||
(o_ip.header_length * 4 + 8)]
|
||||
ic = icmp.icmp(icmp.ICMP_TIME_EXCEEDED, 0, 0,
|
||||
icmp.TimeExceeded(data_len=len(data), data=data))
|
||||
|
||||
p = packet.Packet(protocols=[eth, vlan_p, ip, ic])
|
||||
p.serialize()
|
||||
actions = [dp.ofproto_parser.OFPActionOutput(dp.ofproto.OFPP_TABLE, 0)]
|
||||
dp.send_packet_out(in_port=in_port, actions=actions, data=p.data)
|
||||
|
||||
@handler.set_ev_cls(ofp_event.EventOFPPortStatus)
|
||||
def _port_status_handler(self, ev):
|
||||
if hasattr(ev, 'msg'):
|
||||
msg = ev.msg
|
||||
|
||||
reason = msg.reason
|
||||
datapath = msg.datapath
|
||||
port = msg.desc
|
||||
ofproto = datapath.ofproto
|
||||
self.logger.info(_({"event": "port status change",
|
||||
"reason": reason,
|
||||
"port_no": port.port_no, "state": port.state},
|
||||
log_type='states'))
|
||||
# For now just port modifications are reported
|
||||
if reason == ofproto.OFPPR_MODIFY:
|
||||
params = {'port_no': port.port_no, 'port_state': port.state}
|
||||
for peer in self._peers:
|
||||
peer._endpoint.send_notification("port_status", [params])
|
||||
|
||||
def _flow_stats_loop(self, dp, table_id, match, interval, key):
|
||||
while True:
|
||||
if not key in self.monitored_flows:
|
||||
break
|
||||
msg = dp.ofproto_parser.OFPFlowStatsRequest(datapath=dp,
|
||||
table_id=table_id,
|
||||
match=match)
|
||||
dp.send_msg(msg)
|
||||
hub.sleep(interval)
|
||||
|
||||
def _meter_stats_loop(self, dp, interval, meter_id):
|
||||
while True:
|
||||
if not meter_id in self.monitored_meters:
|
||||
break
|
||||
msg = dp.ofproto_parser.OFPMeterStatsRequest(datapath=dp,
|
||||
meter_id=meter_id)
|
||||
dp.send_msg(msg)
|
||||
hub.sleep(interval)
|
||||
|
||||
def _handle_ofprotocol(self, msgid, params):
|
||||
try:
|
||||
param_dict = params[0]
|
||||
except:
|
||||
raise RPCError('parameters are missing')
|
||||
|
||||
send_response = True
|
||||
|
||||
dp = None
|
||||
if 'dpid' in param_dict:
|
||||
dp = self.dpset.get(int(param_dict['dpid']))
|
||||
param_dict.pop('dpid')
|
||||
else:
|
||||
# use the first datapath
|
||||
for k, v in self.dpset.get_all():
|
||||
dp = v
|
||||
break
|
||||
|
||||
if dp is None:
|
||||
self.logger.info(_({"event": "no datapath, queued",
|
||||
"msg": str(param_dict)}))
|
||||
raise PendingRPC()
|
||||
|
||||
contexts = None
|
||||
ofmsg = None
|
||||
# default interval
|
||||
interval = 60
|
||||
for k, v in param_dict.items():
|
||||
if k == 'ofmsg':
|
||||
try:
|
||||
ofmsg = ofproto_parser.ofp_msg_from_jsondict(dp, v)
|
||||
except:
|
||||
raise RPCError('parameters are invalid, %s' %
|
||||
(str(param_dict)))
|
||||
elif k == 'interval':
|
||||
interval = int(v)
|
||||
elif k == 'contexts':
|
||||
contexts = v
|
||||
if ofmsg is None:
|
||||
raise RPCError('"ofmsg" parameter is invalid, %s' %
|
||||
(str(param_dict)))
|
||||
if contexts is not None and not isinstance(contexts, dict):
|
||||
raise RPCError('"contexts" must be dictionary, %s' %
|
||||
(str(param_dict)))
|
||||
if contexts is not None and interval == 0:
|
||||
raise RPCError('"interval" must be non zero with "contexts", %s' %
|
||||
(str(param_dict)))
|
||||
|
||||
dp.set_xid(ofmsg)
|
||||
ofmsg.serialize()
|
||||
if dp.ofproto.OFP_VERSION == ofproto_v1_2.OFP_VERSION:
|
||||
msg_types = (dp.ofproto.OFPT_STATS_REQUEST,
|
||||
dp.ofproto.OFPT_BARRIER_REQUEST)
|
||||
else:
|
||||
msg_types = (dp.ofproto.OFPT_MULTIPART_REQUEST,
|
||||
dp.ofproto.OFPT_BARRIER_REQUEST)
|
||||
|
||||
if ofmsg.msg_type in msg_types:
|
||||
dp.send_msg(ofmsg)
|
||||
raise NoRPCResponse(dpid=dp.id, xid=ofmsg.xid, msgid=msgid)
|
||||
|
||||
result = {'xid': ofmsg.xid}
|
||||
if ofmsg.msg_type is dp.ofproto.OFPT_FLOW_MOD:
|
||||
if contexts is not None:
|
||||
key = self.format_key(ofmsg.match.to_jsondict())
|
||||
if ofmsg.command is dp.ofproto.OFPFC_ADD:
|
||||
if key in self.monitored_flows:
|
||||
raise RPCError('the existing flow, %s' % (str(key)))
|
||||
|
||||
self.monitored_flows[key] = contexts
|
||||
hub.spawn(self._flow_stats_loop,
|
||||
dp, ofmsg.table_id, ofmsg.match,
|
||||
interval, key)
|
||||
|
||||
elif ofmsg.command in (dp.ofproto.OFPFC_DELETE,
|
||||
dp.ofproto.OFPFC_DELETE_STRICT):
|
||||
try:
|
||||
del self.monitored_flows[key]
|
||||
except:
|
||||
raise RPCError('unknown key, %s' % (str(key)))
|
||||
elif (dp.ofproto.OFP_VERSION == ofproto_v1_3.OFP_VERSION and
|
||||
ofmsg.msg_type is dp.ofproto.OFPT_METER_MOD):
|
||||
if contexts is not None:
|
||||
if ofmsg.command is dp.ofproto.OFPMC_ADD:
|
||||
if ofmsg.meter_id in self.monitored_meters:
|
||||
raise RPCError('meter already exitsts %d' %
|
||||
(ofmsg.meter_id))
|
||||
self.monitored_meters[ofmsg.meter_id] = contexts
|
||||
hub.spawn(self._meter_stats_loop,
|
||||
dp, interval, ofmsg.meter_id)
|
||||
elif ofmsg.command is dp.ofproto.OFPMC_DELETE:
|
||||
try:
|
||||
del self.monitored_meters[ofmsg.meter_id]
|
||||
except:
|
||||
raise RPCError('unknown meter %d' % (ofmsg.meter_id))
|
||||
elif ofmsg.command is dp.ofproto.OFPMC_MODIFY:
|
||||
raise RPCError('METER_MOD with contexts is not supported')
|
||||
else:
|
||||
raise RPCError('unknown meter_mod command')
|
||||
else:
|
||||
raise RPCError('unknown of message, %s' % (str(param_dict)))
|
||||
|
||||
dp.send_msg(ofmsg)
|
||||
return result
|
||||
|
||||
def _register_traceroute(self, params):
|
||||
try:
|
||||
param_dict = params[0]
|
||||
except:
|
||||
raise RPCError('parameters are missing')
|
||||
try:
|
||||
self.traceroute_source[param_dict['vlan']] = {
|
||||
'ip': param_dict['ip'],
|
||||
'port': param_dict['port']
|
||||
}
|
||||
except Exception as e:
|
||||
raise RPCError('parameters are invalid, %s' % (str(param_dict)))
|
||||
|
||||
self.logger.info(_({'event': 'register traceroute source',
|
||||
'vlan': param_dict['vlan'],
|
||||
'ip': param_dict['ip'],
|
||||
'port': param_dict['port']}))
|
||||
return {}
|
||||
|
||||
def _ofconfig_thread(self):
|
||||
def _handle_edit(s, o, param_dict):
|
||||
target = 'candidate'
|
||||
if 'port' in param_dict:
|
||||
port_id = param_dict.pop('port')
|
||||
config = ofc.OFPortConfigurationType(**param_dict)
|
||||
port_type = ofc.OFPortType(resource_id=port_id,
|
||||
configuration=config)
|
||||
r = ofc.OFCapableSwitchResourcesType(port=[port_type])
|
||||
c = ofc.OFCapableSwitchType(id=o.id, resources=r)
|
||||
s.edit_config(target, c)
|
||||
else:
|
||||
queue = param_dict.pop('queue')
|
||||
if 'experimenter' in param_dict:
|
||||
new_val = param_dict['experimenter']
|
||||
param_dict['experimenter'] = [new_val]
|
||||
prop = ofc.OFQueuePropertiesType(**param_dict)
|
||||
queue_type = ofc.OFQueueType(resource_id=queue,
|
||||
properties=prop)
|
||||
r = ofc.OFCapableSwitchResourcesType(queue=[queue_type])
|
||||
c = ofc.OFCapableSwitchType(id=o.id,
|
||||
resources=r)
|
||||
if 'experimenter' in param_dict:
|
||||
old_val = None
|
||||
for q in o.resources.queue:
|
||||
if q.resource_id == queue:
|
||||
old_val = q.properties.experimenter[0]
|
||||
break
|
||||
if old_val is None:
|
||||
raise RPCError('cannot find queue, %s' % (queue))
|
||||
xml = ofc.NETCONF_Config(capable_switch=c).to_xml()
|
||||
key = '<of111:experimenter>'
|
||||
sidx = xml.find(key)
|
||||
eidx = xml.find('</of111:experimenter>')
|
||||
xml = xml[:sidx] + \
|
||||
'<of111:experimenter operation=\"delete\">' + \
|
||||
str(old_val) + xml[eidx:]
|
||||
key = '/of111:experimenter>\n'
|
||||
sidx = xml.find(key) + len(key)
|
||||
xml = xml[:sidx + 1] + \
|
||||
'<of111:experimenter operation=\"replace\">' + \
|
||||
str(new_val) + '</of111:experimenter>\n' + \
|
||||
xml[sidx + 1:]
|
||||
s.raw_edit_config(target, xml, None)
|
||||
else:
|
||||
s.edit_config(target, c)
|
||||
try:
|
||||
s.commit()
|
||||
except Exception as e:
|
||||
s.discard_changes()
|
||||
raise RPCError(str(e))
|
||||
return {}
|
||||
|
||||
def _handle_get(o):
|
||||
result = {}
|
||||
ports = []
|
||||
for p in o.resources.port:
|
||||
config = p.configuration
|
||||
ports.append({'name': str(p.name),
|
||||
'number': int(p.number),
|
||||
'admin-state': str(config.admin_state),
|
||||
'oper-state': str(p.state.oper_state)})
|
||||
result['OFPort'] = ports
|
||||
|
||||
queues = []
|
||||
for q in o.resources.queue:
|
||||
p = q.properties
|
||||
queues.append({'id': str(q.id),
|
||||
'resource_id': str(q.resource_id),
|
||||
'max-rate': str(p.max_rate),
|
||||
'experimenter': str(p.experimenter)})
|
||||
result['OFQueue'] = queues
|
||||
return result
|
||||
|
||||
def _handle(param_dict):
|
||||
# TODO: don't need to create a new conneciton every time.
|
||||
# FIXME(KK): Nice place to put a context-manager?
|
||||
s = None
|
||||
attempt = 0
|
||||
while attempt < CONF.ofconfig_retries:
|
||||
try:
|
||||
s = cs.OFCapableSwitch(host=CONF.ofconfig_address,
|
||||
port=CONF.ofconfig_port,
|
||||
username=CONF.ofconfig_user,
|
||||
password=CONF.ofconfig_password,
|
||||
unknown_host_cb=lambda h, f: True,
|
||||
timeout=CONF.ofconfig_timeout)
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
_({"event": "ofconfig failed to connect",
|
||||
"reason": "exception: {0!s}".format(e)}))
|
||||
else:
|
||||
break
|
||||
attempt += 1
|
||||
backoff_time = attempt ** 2
|
||||
hub.sleep(backoff_time)
|
||||
if not s:
|
||||
raise RPCError('failed to connect to ofs')
|
||||
|
||||
o = s.get()
|
||||
if len(param_dict) == 0:
|
||||
return _handle_get(o)
|
||||
elif 'port' or 'queue' in param_dict:
|
||||
return _handle_edit(s, o, param_dict)
|
||||
elif 'port' and 'queue' in param_dict:
|
||||
raise RPCError('only one resource at one shot')
|
||||
else:
|
||||
raise RPCError('ununkown resource edit %s' % (str(param_dict)))
|
||||
|
||||
while True:
|
||||
error = None
|
||||
result = None
|
||||
(peer, msgid, params) = self.ofconfig.get()
|
||||
param_dict = params[0]
|
||||
|
||||
try:
|
||||
result = _handle(param_dict)
|
||||
except RPCError as e:
|
||||
error = str(e)
|
||||
except Exception as e:
|
||||
error = str(e)
|
||||
try:
|
||||
peer._endpoint.send_response(msgid, error=error, result=result)
|
||||
except:
|
||||
self.logger.info(_({'RPC send error %s, %s' %
|
||||
(error, result)}))
|
||||
|
||||
def _ofconfig(self, peer, msgid, params):
|
||||
try:
|
||||
param_dict = params[0]
|
||||
except:
|
||||
raise RPCError('parameters are missing')
|
||||
|
||||
self.ofconfig.put((peer, msgid, params))
|
||||
raise NoRPCResponse()
|
||||
|
||||
def _monitor(self, mandatory_params, resource_dict, request_generator,
|
||||
msgid, params):
|
||||
try:
|
||||
param_dict = params[0]
|
||||
except:
|
||||
raise RPCError('parameters are missing')
|
||||
resource_id = None
|
||||
contexts = None
|
||||
interval = 60
|
||||
|
||||
resource_name = mandatory_params.pop(0)
|
||||
for k, v in param_dict.items():
|
||||
if k == resource_name:
|
||||
resource_id = v
|
||||
elif k == 'contexts':
|
||||
contexts = v
|
||||
elif k == 'interval':
|
||||
interval = v
|
||||
elif k in mandatory_params:
|
||||
pass
|
||||
else:
|
||||
raise RPCError('unknown parameters, %s' % k)
|
||||
|
||||
if contexts is None and interval > 0:
|
||||
raise RPCError('"contexts" parameter is necessary')
|
||||
if contexts is not None and not isinstance(contexts, dict):
|
||||
raise RPCError('"contexts" parameter must be dictionary')
|
||||
if resource_id is None:
|
||||
raise RPCError('"%s" parameter is necessary' % resource_name)
|
||||
|
||||
if interval == 0:
|
||||
if resource_id in resource_dict:
|
||||
del resource_dict[resource_id]
|
||||
else:
|
||||
raise RPCError('%s %d does not exist' % (resource_name,
|
||||
resource_id))
|
||||
else:
|
||||
need_spawn = False
|
||||
if not resource_id in resource_dict:
|
||||
need_spawn = True
|
||||
resource_dict[resource_id] = (contexts, interval)
|
||||
if need_spawn:
|
||||
pass
|
||||
hub.spawn(self._monitor_thread, resource_id, resource_dict,
|
||||
param_dict, request_generator)
|
||||
return {}
|
||||
|
||||
def _port_stats_generator(self, dp, port_name, param_dict):
|
||||
port_no = None
|
||||
ports = self.dpset.get_ports(dp.id)
|
||||
for port in ports:
|
||||
if port.name == port_name:
|
||||
port_no = port.port_no
|
||||
break
|
||||
if port_no is None:
|
||||
return None
|
||||
return dp.ofproto_parser.OFPPortStatsRequest(datapath=dp,
|
||||
port_no=port_no)
|
||||
|
||||
def _monitor_port(self, msgid, params):
|
||||
return self._monitor(['physical_port_no'],
|
||||
self.monitored_ports,
|
||||
self._port_stats_generator,
|
||||
msgid, params)
|
||||
|
||||
def _monitor_thread(self, resource_id, resource_dict, param_dict,
|
||||
generator):
|
||||
while resource_id in resource_dict:
|
||||
_contexts, interval = resource_dict[resource_id]
|
||||
for k, dp in self.dpset.get_all():
|
||||
try:
|
||||
ofmsg = generator(dp, resource_id, param_dict)
|
||||
if ofmsg:
|
||||
dp.send_msg(ofmsg)
|
||||
except:
|
||||
# ignore the error due to dead datapath
|
||||
pass
|
||||
hub.sleep(interval)
|
||||
|
||||
def _queue_stats_generator(self, dp, queue_id, param_dict):
|
||||
port_no = param_dict['port_no']
|
||||
return dp.ofproto_parser.OFPQueueStatsRequest(datapath=dp,
|
||||
port_no=port_no,
|
||||
queue_id=queue_id)
|
||||
|
||||
def _monitor_queue(self, msgid, params):
|
||||
return self._monitor(['queue_id', 'port_no'],
|
||||
self.monitored_queues,
|
||||
self._queue_stats_generator,
|
||||
msgid, params)
|
||||
@ -48,7 +48,11 @@ CONF.register_cli_opts([
|
||||
cfg.StrOpt('neutron-controller-addr', default=None,
|
||||
help='openflow method:address:port to set controller of'
|
||||
'ovs bridge',
|
||||
deprecated_name='quantum-controller-addr')
|
||||
deprecated_name='quantum-controller-addr'),
|
||||
cfg.IntOpt('vrrp-rpc-port', default=50004,
|
||||
help='port for vrrp rpc interface'),
|
||||
cfg.BoolOpt('vrrp-use-vmac', default=False,
|
||||
help='use virtual mac')
|
||||
])
|
||||
|
||||
CONF.register_cli_opts([
|
||||
|
||||
39
ryu/lib/apgw.py
Normal file
39
ryu/lib/apgw.py
Normal file
@ -0,0 +1,39 @@
|
||||
import json
|
||||
import datetime
|
||||
import time
|
||||
import logging
|
||||
|
||||
SYSLOG_FORMAT = '%(name)s %(message)s'
|
||||
|
||||
|
||||
class StructuredMessage(object):
|
||||
COMPONENT_NAME = None
|
||||
|
||||
def __init__(self, msg, log_type='log', resource_id=None,
|
||||
resource_name=None):
|
||||
assert self.__class__.COMPONENT_NAME is not None
|
||||
assert isinstance(msg, dict)
|
||||
assert log_type in ('log', 'stats', 'states')
|
||||
self.message = {}
|
||||
cur_time = datetime.datetime.utcfromtimestamp(
|
||||
time.time()).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
|
||||
self.message['log_type'] = log_type
|
||||
self.message['timestamp'] = cur_time
|
||||
self.message['component_name'] = self.__class__.COMPONENT_NAME
|
||||
self.message['msg'] = msg
|
||||
log = logging.getLogger()
|
||||
self.message['level'] = logging.getLevelName(log.level)
|
||||
if resource_id:
|
||||
self.message['resource_id'] = resource_id
|
||||
if resource_name:
|
||||
self.message['resource_name'] = resource_name
|
||||
|
||||
def __str__(self):
|
||||
return json.dumps(self.message)
|
||||
|
||||
|
||||
def update_syslog_format():
|
||||
log = logging.getLogger()
|
||||
for h in log.handlers:
|
||||
if isinstance(h, logging.handlers.SysLogHandler):
|
||||
h.setFormatter(logging.Formatter(SYSLOG_FORMAT))
|
||||
@ -123,5 +123,11 @@ class OFCapableSwitch(object):
|
||||
def copy_config(self, source, target):
|
||||
self.netconf.copy_config(source, target)
|
||||
|
||||
def commit(self):
|
||||
self.netconf.commit()
|
||||
|
||||
def discard_changes(self):
|
||||
self.netconf.discard_changes()
|
||||
|
||||
# TODO: more netconf operations
|
||||
# TODO: convinience(higher level) methods
|
||||
|
||||
@ -30,6 +30,14 @@ class OFPortConfigurationType(_Base):
|
||||
]
|
||||
|
||||
|
||||
class OFPortStateType(_Base):
|
||||
_ELEMENTS = [
|
||||
_e('oper-state', is_list=False),
|
||||
_e('blocked', is_list=False),
|
||||
_e('live', is_list=False),
|
||||
]
|
||||
|
||||
|
||||
class OFPortType(_Base):
|
||||
_ELEMENTS = [
|
||||
_e('resource-id', is_list=False),
|
||||
@ -38,7 +46,7 @@ class OFPortType(_Base):
|
||||
_e('current-rate', is_list=False),
|
||||
_e('max-rate', is_list=False),
|
||||
_ct('configuration', OFPortConfigurationType, is_list=False),
|
||||
_ct('state', None, is_list=False),
|
||||
_ct('state', OFPortStateType, is_list=False),
|
||||
_ct('features', None, is_list=False),
|
||||
_ct('tunnel-type', None, is_list=False),
|
||||
]
|
||||
|
||||
@ -325,7 +325,7 @@ class vrrp(packet_base.PacketBase):
|
||||
self.identification &= 0xffff
|
||||
return self.identification
|
||||
|
||||
def create_packet(self, primary_ip_address, vlan_id=None):
|
||||
def create_packet(self, primary_ip_address, vlan_id=None, src_mac=None):
|
||||
"""Prepare a VRRP packet.
|
||||
|
||||
Returns a newly created ryu.lib.packet.packet.Packet object
|
||||
@ -348,8 +348,10 @@ class vrrp(packet_base.PacketBase):
|
||||
traffic_class = 0xc0 # set tos to internetwork control
|
||||
flow_label = 0
|
||||
payload_length = ipv6.ipv6._MIN_LEN + len(self) # XXX _MIN_LEN
|
||||
if src_mac is None:
|
||||
src_mac = vrrp_ipv6_src_mac_address(self.vrid)
|
||||
e = ethernet.ethernet(VRRP_IPV6_DST_MAC_ADDRESS,
|
||||
vrrp_ipv6_src_mac_address(self.vrid),
|
||||
src_mac,
|
||||
ether.ETH_TYPE_IPV6)
|
||||
ip = ipv6.ipv6(6, traffic_class, flow_label, payload_length,
|
||||
inet.IPPROTO_VRRP, VRRP_IPV6_HOP_LIMIT,
|
||||
@ -359,8 +361,10 @@ class vrrp(packet_base.PacketBase):
|
||||
total_length = 0
|
||||
tos = 0xc0 # set tos to internetwork control
|
||||
identification = self.get_identification()
|
||||
if src_mac is None:
|
||||
src_mac = vrrp_ipv4_src_mac_address(self.vrid)
|
||||
e = ethernet.ethernet(VRRP_IPV4_DST_MAC_ADDRESS,
|
||||
vrrp_ipv4_src_mac_address(self.vrid),
|
||||
src_mac,
|
||||
ether.ETH_TYPE_IP)
|
||||
ip = ipv4.ipv4(4, header_length, tos, total_length, identification,
|
||||
0, 0, VRRP_IPV4_TTL, inet.IPPROTO_VRRP, 0,
|
||||
|
||||
@ -32,7 +32,7 @@ CONF.register_cli_opts([
|
||||
cfg.IntOpt('default-log-level', default=None, help='default log level'),
|
||||
cfg.BoolOpt('verbose', default=False, help='show debug output'),
|
||||
cfg.BoolOpt('use-stderr', default=True, help='log to standard error'),
|
||||
cfg.BoolOpt('use-syslog', default=False, help='output to syslog'),
|
||||
cfg.BoolOpt('use-syslog', default=True, help='output to syslog'),
|
||||
cfg.StrOpt('log-dir', default=None, help='log file directory'),
|
||||
cfg.StrOpt('log-file', default=None, help='log file name'),
|
||||
cfg.StrOpt('log-file-mode', default='0644',
|
||||
|
||||
@ -993,7 +993,7 @@ class OFPInstructionWriteMetadata(OFPInstruction):
|
||||
metadata_mask Metadata write bitmask
|
||||
================ ======================================================
|
||||
"""
|
||||
def __init__(self, metadata, metadata_mask, len_=None):
|
||||
def __init__(self, metadata, metadata_mask, type_=None, len_=None):
|
||||
super(OFPInstructionWriteMetadata, self).__init__()
|
||||
self.type = ofproto_v1_2.OFPIT_WRITE_METADATA
|
||||
self.len = ofproto_v1_2.OFP_INSTRUCTION_WRITE_METADATA_SIZE
|
||||
|
||||
@ -744,6 +744,13 @@ OFPR_NO_MATCH = 0 # No matching flow.
|
||||
OFPR_ACTION = 1 # Action explicitly output to controller.
|
||||
OFPR_INVALID_TTL = 2 # Packet has invalid TTL.
|
||||
|
||||
# Masks used for OFPT_SET_ASYNC, OFPT_GET_ASYNC_REPLY messages.
|
||||
# Used for setting packet_in_mask field.
|
||||
OFPR_NO_MATCH_MASK = 1 << OFPR_NO_MATCH
|
||||
OFPR_ACTION_MASK = 1 << OFPR_ACTION
|
||||
OFPR_INVALID_TTL_MASK = 1 << OFPR_INVALID_TTL
|
||||
|
||||
|
||||
# struct ofp_flow_removed
|
||||
_OFP_FLOW_REMOVED_PACK_STR0 = 'QHBBIIHHQQ'
|
||||
OFP_FLOW_REMOVED_PACK_STR = '!' + _OFP_FLOW_REMOVED_PACK_STR0 + \
|
||||
@ -759,6 +766,15 @@ OFPRR_HARD_TIMEOUT = 1 # Time exceeded hard_timeout.
|
||||
OFPRR_DELETE = 2 # Evicted by a DELETE flow mod.
|
||||
OFPRR_GROUP_DELETE = 3 # Group was removed.
|
||||
|
||||
# Masks used for OFPT_SET_ASYNC, OFPT_GET_ASYNC_REPLY messages.
|
||||
# Used for setting flow_removed_mask.
|
||||
|
||||
OFPRR_IDLE_TIMEOUT_MASK = 1 << OFPRR_IDLE_TIMEOUT
|
||||
OFPRR_HARD_TIMEOUT_MASK = 1 << OFPRR_HARD_TIMEOUT
|
||||
OFPRR_DELETE_MASK = 1 << OFPRR_DELETE
|
||||
OFPRR_GROUP_DELETE_MASK = 1 << OFPRR_GROUP_DELETE
|
||||
|
||||
|
||||
# struct ofp_port_status
|
||||
OFP_PORT_STATUS_PACK_STR = '!B7x' + _OFP_PORT_PACK_STR
|
||||
OFP_PORT_STATUS_DESC_OFFSET = OFP_HEADER_SIZE + 8
|
||||
@ -771,6 +787,12 @@ OFPPR_ADD = 0 # The port was added.
|
||||
OFPPR_DELETE = 1 # The port was removed.
|
||||
OFPPR_MODIFY = 2 # Some attribute of the port has changed.
|
||||
|
||||
# Masks used for OFPT_SET_ASYNC, OFPT_GET_ASYNC_REPLY messages.
|
||||
# Used for setting port_status_mask.
|
||||
OFPPR_ADD_MASK = 1 << OFPPR_ADD
|
||||
OFPPR_DELETE_MASK = 1 << OFPPR_DELETE
|
||||
OFPPR_MODIFY_MASK = 1 << OFPPR_MODIFY
|
||||
|
||||
# OFPMP_EXPERIMENTER
|
||||
# struct onf_experimenter_multipart_msg
|
||||
# (experimenter == ONF_EXPERIMENTER_ID)
|
||||
|
||||
@ -3357,7 +3357,8 @@ class OFPMeterMod(MsgBase):
|
||||
OFPMeterBandExperimenter
|
||||
================ ======================================================
|
||||
"""
|
||||
def __init__(self, datapath, command, flags, meter_id, bands):
|
||||
def __init__(self, datapath, command=ofproto_v1_3.OFPMC_ADD,
|
||||
flags=ofproto_v1_3.OFPMF_KBPS, meter_id=1, bands=[]):
|
||||
super(OFPMeterMod, self).__init__(datapath)
|
||||
self.command = command
|
||||
self.flags = flags
|
||||
|
||||
@ -120,7 +120,8 @@ class VRRPConfig(object):
|
||||
priority=vrrp.VRRP_PRIORITY_BACKUP_DEFAULT, ip_addresses=None,
|
||||
advertisement_interval=vrrp.VRRP_MAX_ADVER_INT_DEFAULT_IN_SEC,
|
||||
preempt_mode=True, preempt_delay=0, accept_mode=False,
|
||||
statistics_interval=30, resource_id=None):
|
||||
statistics_interval=30, resource_id=None,
|
||||
use_virtual_mac=False):
|
||||
# To allow version and priority default
|
||||
assert vrid is not None
|
||||
assert ip_addresses is not None
|
||||
@ -138,6 +139,7 @@ class VRRPConfig(object):
|
||||
self.is_ipv6 = vrrp.is_ipv6(ip_addresses[0])
|
||||
self.statistics_interval = statistics_interval
|
||||
self.resource_id = resource_id
|
||||
self.use_virtual_mac = use_virtual_mac
|
||||
|
||||
@property
|
||||
def address_owner(self):
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
import contextlib
|
||||
import socket
|
||||
import struct
|
||||
import fcntl
|
||||
|
||||
from ryu.controller import handler
|
||||
from ryu.ofproto import ether
|
||||
@ -69,6 +70,11 @@ class VRRPInterfaceMonitorNetworkDevice(monitor.VRRPInterfaceMonitor):
|
||||
# socket module doesn't define IPPROTO_VRRP
|
||||
self.ip_socket = socket.socket(family, socket.SOCK_RAW,
|
||||
inet.IPPROTO_VRRP)
|
||||
if self.config.use_virtual_mac is False:
|
||||
i = fcntl.ioctl(self.ip_socket.fileno(), 0x8927,
|
||||
struct.pack('256s',
|
||||
self.interface.device_name[:15]))
|
||||
mac_address = addrconv.mac.bin_to_text(i[18:24])
|
||||
|
||||
self.packet_socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
|
||||
socket.htons(ether_type))
|
||||
@ -78,6 +84,7 @@ class VRRPInterfaceMonitorNetworkDevice(monitor.VRRPInterfaceMonitor):
|
||||
addrconv.mac.text_to_bin(mac_address)))
|
||||
|
||||
self.ifindex = if_nametoindex(self.interface.device_name)
|
||||
self.config.src_mac = mac_address
|
||||
|
||||
def start(self):
|
||||
# discard received packets before joining multicast membership
|
||||
@ -104,10 +111,7 @@ class VRRPInterfaceMonitorNetworkDevice(monitor.VRRPInterfaceMonitor):
|
||||
# multicast are aligned in the same way on all the archtectures.
|
||||
def _join_multicast_membership(self, join_leave):
|
||||
config = self.config
|
||||
if config.is_ipv6:
|
||||
mac_address = vrrp.vrrp_ipv6_src_mac_address(config.vrid)
|
||||
else:
|
||||
mac_address = vrrp.vrrp_ipv4_src_mac_address(config.vrid)
|
||||
mac_address = self.config.src_mac
|
||||
if join_leave:
|
||||
add_drop = PACKET_ADD_MEMBERSHIP
|
||||
else:
|
||||
|
||||
@ -26,11 +26,16 @@ from ryu.base import app_manager
|
||||
from ryu.controller import event
|
||||
from ryu.controller import handler
|
||||
from ryu.lib import hub
|
||||
from ryu.lib import apgw
|
||||
from ryu.lib.packet import vrrp
|
||||
from ryu.services.protocols.vrrp import event as vrrp_event
|
||||
from ryu.services.protocols.vrrp import api as vrrp_api
|
||||
|
||||
|
||||
_ = type('', (apgw.StructuredMessage,), {})
|
||||
_.COMPONENT_NAME = 'vrrp'
|
||||
|
||||
|
||||
# TODO: improve Timer service and move it into framework
|
||||
class Timer(object):
|
||||
def __init__(self, handler_):
|
||||
@ -139,6 +144,7 @@ class VRRPRouter(app_manager.RyuApp):
|
||||
_EVENTS = [vrrp_event.EventVRRPStateChanged]
|
||||
_CONSTRUCTORS = {}
|
||||
_STATE_MAP = {} # should be overrided by concrete class
|
||||
LOGGER_NAME = 'vrrp'
|
||||
|
||||
@staticmethod
|
||||
def register(version):
|
||||
@ -215,7 +221,7 @@ class VRRPRouter(app_manager.RyuApp):
|
||||
# create packet frame each time to generate new ip identity
|
||||
interface = self.interface
|
||||
packet_ = vrrp_.create_packet(interface.primary_ip_address,
|
||||
interface.vlan_id)
|
||||
interface.vlan_id, self.config.src_mac)
|
||||
packet_.serialize()
|
||||
vrrp_api.vrrp_transmit(self, self.monitor_name, packet_.data)
|
||||
self.statistics.tx_vrrp_packets += 1
|
||||
@ -271,8 +277,8 @@ class VRRPRouter(app_manager.RyuApp):
|
||||
|
||||
@handler.set_ev_handler(_EventStatisticsOut)
|
||||
def statistics_handler(self, ev):
|
||||
# sends stats to somewhere here
|
||||
# print self.statistics.get_stats()
|
||||
stats = self.statistics.get_stats()
|
||||
self.logger.info(_(msg=stats, log_type='stats'))
|
||||
self.stats_out_timer.start(self.statistics.statistics_interval)
|
||||
|
||||
# RFC defines that start timer, then change the state.
|
||||
|
||||
@ -24,13 +24,12 @@ from ryu.services.protocols.vrrp import api as vrrp_api
|
||||
from ryu.lib import rpc
|
||||
from ryu.lib import hub
|
||||
from ryu.lib import mac
|
||||
from ryu.lib import apgw
|
||||
|
||||
VRRP_RPC_PORT = 50004 # random
|
||||
CONF = cfg.CONF
|
||||
|
||||
CONF.register_opts([
|
||||
cfg.IntOpt('vrrp-rpc-port', default=VRRP_RPC_PORT,
|
||||
help='port for vrrp rpc interface')])
|
||||
_ = type('', (apgw.StructuredMessage,), {})
|
||||
_.COMPONENT_NAME = 'vrrp'
|
||||
|
||||
|
||||
class RPCError(Exception):
|
||||
@ -47,6 +46,8 @@ class Peer(object):
|
||||
|
||||
|
||||
class RpcVRRPManager(app_manager.RyuApp):
|
||||
LOGGER_NAME = 'vrrp'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(RpcVRRPManager, self).__init__(*args, **kwargs)
|
||||
self._args = args
|
||||
@ -55,6 +56,7 @@ class RpcVRRPManager(app_manager.RyuApp):
|
||||
self._rpc_events = hub.Queue(128)
|
||||
self.server_thread = hub.spawn(self._peer_accept_thread)
|
||||
self.event_thread = hub.spawn(self._rpc_request_loop_thread)
|
||||
apgw.update_syslog_format()
|
||||
|
||||
def _rpc_request_loop_thread(self):
|
||||
while True:
|
||||
@ -63,6 +65,9 @@ class RpcVRRPManager(app_manager.RyuApp):
|
||||
error = None
|
||||
result = None
|
||||
try:
|
||||
self.logger.info(_(msg={'msgid': msgid,
|
||||
'target_method': target_method,
|
||||
'params': params}))
|
||||
if target_method == "vrrp_config":
|
||||
result = self._config(msgid, params)
|
||||
elif target_method == "vrrp_list":
|
||||
@ -73,6 +78,9 @@ class RpcVRRPManager(app_manager.RyuApp):
|
||||
error = 'Unknown method %s' % (target_method)
|
||||
except RPCError as e:
|
||||
error = str(e)
|
||||
self.logger.info(_(msg={'msgid': msgid,
|
||||
'error': error,
|
||||
'result': result}))
|
||||
peer._endpoint.send_response(msgid, error=error, result=result)
|
||||
|
||||
def _peer_loop_thread(self, peer):
|
||||
@ -102,15 +110,22 @@ class RpcVRRPManager(app_manager.RyuApp):
|
||||
return d
|
||||
|
||||
def _config(self, msgid, params):
|
||||
self.logger.debug('handle vrrp_config request')
|
||||
try:
|
||||
param_dict = params[0]
|
||||
except:
|
||||
raise RPCError('parameters are missing')
|
||||
|
||||
if_params = self._params_to_dict(param_dict,
|
||||
('primary_ip_address',
|
||||
'device_name'))
|
||||
('ip_address',
|
||||
'ifname'))
|
||||
try:
|
||||
if_params['primary_ip_address'] = if_params.pop('ip_address')
|
||||
except:
|
||||
raise RPCError('ip_addr parameter is missing')
|
||||
try:
|
||||
if_params['device_name'] = if_params.pop('ifname')
|
||||
except:
|
||||
raise RPCError('ifname parameter is missing')
|
||||
# drop vlan support later
|
||||
if_params['vlan_id'] = None
|
||||
if_params['mac_address'] = mac.DONTCARE_STR
|
||||
@ -121,7 +136,7 @@ class RpcVRRPManager(app_manager.RyuApp):
|
||||
|
||||
config_params = self._params_to_dict(param_dict,
|
||||
('vrid', # mandatory
|
||||
'ip_addresses', # mandatory
|
||||
'ip_addr', # mandatory
|
||||
'version',
|
||||
'admin_state',
|
||||
'priority',
|
||||
@ -129,6 +144,12 @@ class RpcVRRPManager(app_manager.RyuApp):
|
||||
'preempt_mode',
|
||||
'preempt_delay',
|
||||
'statistics_interval'))
|
||||
if CONF.vrrp_use_vmac:
|
||||
config_params.update({'use_virtual_mac': True})
|
||||
try:
|
||||
config_params['ip_addresses'] = [config_params.pop('ip_addr')]
|
||||
except:
|
||||
raise RPCError('ip_addr parameter is missing')
|
||||
try:
|
||||
config = vrrp_event.VRRPConfig(**config_params)
|
||||
except:
|
||||
@ -149,7 +170,6 @@ class RpcVRRPManager(app_manager.RyuApp):
|
||||
return None
|
||||
|
||||
def _config_change(self, msgid, params):
|
||||
self.logger.debug('handle vrrp_config_change request')
|
||||
try:
|
||||
config_values = params[0]
|
||||
except:
|
||||
@ -167,7 +187,6 @@ class RpcVRRPManager(app_manager.RyuApp):
|
||||
return {}
|
||||
|
||||
def _list(self, msgid, params):
|
||||
self.logger.debug('handle vrrp_list request')
|
||||
result = vrrp_api.vrrp_list(self)
|
||||
instance_list = result.instance_list
|
||||
ret_list = []
|
||||
@ -186,13 +205,11 @@ class RpcVRRPManager(app_manager.RyuApp):
|
||||
|
||||
@handler.set_ev_cls(vrrp_event.EventVRRPStateChanged)
|
||||
def vrrp_state_changed_handler(self, ev):
|
||||
self.logger.info('handle EventVRRPStateChanged')
|
||||
name = ev.instance_name
|
||||
old_state = ev.old_state
|
||||
new_state = ev.new_state
|
||||
vrid = ev.config.vrid
|
||||
self.logger.info('VRID:%s %s: %s -> %s', vrid, name, old_state,
|
||||
new_state)
|
||||
params = {'vrid': vrid, 'old_state': old_state, 'new_state': new_state}
|
||||
self.logger.critical(_(msg=params))
|
||||
for peer in self._peers:
|
||||
peer._endpoint.send_notification("notify_status", [params])
|
||||
|
||||
777
ryu/tests/unit/app/test_apgw_rpc.py
Normal file
777
ryu/tests/unit/app/test_apgw_rpc.py
Normal file
@ -0,0 +1,777 @@
|
||||
import unittest
|
||||
import logging
|
||||
import socket
|
||||
from nose.tools import eq_
|
||||
from nose.tools import raises
|
||||
|
||||
from ryu.controller import api
|
||||
from ryu.controller import dpset
|
||||
|
||||
from ryu.ofproto.ofproto_parser import MsgBase
|
||||
from ryu.ofproto import ether
|
||||
from ryu.ofproto import inet
|
||||
from ryu.ofproto import ofproto_parser
|
||||
from ryu.ofproto import ofproto_v1_2
|
||||
from ryu.ofproto import ofproto_v1_2_parser
|
||||
from ryu.ofproto import ofproto_v1_3
|
||||
from ryu.ofproto import ofproto_v1_3_parser
|
||||
|
||||
from ryu.controller import ofp_event
|
||||
|
||||
from ryu.lib import hub
|
||||
from ryu.lib import rpc
|
||||
from ryu.lib.packet import packet
|
||||
from ryu.lib.packet import ethernet
|
||||
from ryu.lib.packet import vlan
|
||||
from ryu.lib.packet import ipv4
|
||||
from ryu.lib.packet import icmp
|
||||
|
||||
|
||||
class DummyEndpoint(object):
|
||||
def __init__(self):
|
||||
self.response = []
|
||||
self.notification = []
|
||||
|
||||
def send_response(self, msgid, error, result):
|
||||
self.response.append((msgid, error, result))
|
||||
|
||||
def send_notification(self, method, params):
|
||||
self.notification.append((method, params))
|
||||
|
||||
|
||||
class DummyDatapath(object):
|
||||
def __init__(self, ofp=None, ofpp=None):
|
||||
if ofp is None:
|
||||
ofp = ofproto_v1_2
|
||||
self.ofproto = ofp
|
||||
if ofpp is None:
|
||||
ofpp = ofproto_v1_2_parser
|
||||
self.ofproto_parser = ofpp
|
||||
self.port_state = {}
|
||||
self.ports = {}
|
||||
self.sent = []
|
||||
|
||||
def set_xid(self, msg):
|
||||
msg.set_xid(1)
|
||||
|
||||
def send_msg(self, msg):
|
||||
assert isinstance(msg, MsgBase)
|
||||
msg.serialize()
|
||||
self.sent.append(msg)
|
||||
|
||||
def send_packet_out(self, in_port, actions, data):
|
||||
self.sent.append((in_port, actions, data))
|
||||
|
||||
|
||||
class TestRpcOFPManager(unittest.TestCase):
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def test_logger(self):
|
||||
_ = api._
|
||||
m = {'hello': 'world'}
|
||||
r = eval(str(_(m)))
|
||||
eq_(r['msg'], m)
|
||||
eq_(r['component_name'], _.COMPONENT_NAME)
|
||||
eq_(r['log_type'], 'log')
|
||||
|
||||
def test_monitor_port(self):
|
||||
m = api.RpcOFPManager(dpset=None)
|
||||
msgid = 1
|
||||
try:
|
||||
m._monitor_port(msgid, {})
|
||||
except api.RPCError as e:
|
||||
pass
|
||||
|
||||
port_name = 'OFP11'
|
||||
interval = 10
|
||||
try:
|
||||
m._monitor_port(msgid, {'physical_port_no': port_name})
|
||||
except api.RPCError as e:
|
||||
pass
|
||||
|
||||
contents = {'hoge': 'jail'}
|
||||
r = m._monitor_port(msgid, [{'physical_port_no': port_name,
|
||||
'contexts': contents,
|
||||
'interval': interval}])
|
||||
eq_(r, {})
|
||||
eq_(m.monitored_ports[port_name], (contents, interval))
|
||||
|
||||
def test_monitor_queue(self):
|
||||
m = api.RpcOFPManager(dpset=None)
|
||||
msgid = 1
|
||||
try:
|
||||
m._monitor_queue(msgid, {})
|
||||
except api.RPCError as e:
|
||||
pass
|
||||
|
||||
queue_id = 10
|
||||
port_no = 10
|
||||
interval = 10
|
||||
try:
|
||||
m._monitor_queue(msgid, {'queue_id': queue_id, 'port_no': port_no})
|
||||
except api.RPCError as e:
|
||||
pass
|
||||
|
||||
contents = {'hoge': 'jail'}
|
||||
r = m._monitor_queue(msgid, [{'queue_id': queue_id,
|
||||
'port_no': port_no,
|
||||
'contexts': contents,
|
||||
'interval': interval}])
|
||||
eq_(r, {})
|
||||
eq_(m.monitored_queues[queue_id], (contents, interval))
|
||||
|
||||
def test_register_traceroute(self):
|
||||
m = api.RpcOFPManager(dpset=None)
|
||||
msgid = 1
|
||||
try:
|
||||
m._register_traceroute([{}])
|
||||
except api.RPCError as e:
|
||||
pass
|
||||
|
||||
try:
|
||||
m._register_traceroute([{'vlan': 1}])
|
||||
except api.RPCError as e:
|
||||
pass
|
||||
|
||||
vlan_id = 1
|
||||
port_no = 10
|
||||
m._register_traceroute([{'vlan': vlan_id,
|
||||
'ip': '192.168.1.1',
|
||||
'port': port_no}])
|
||||
|
||||
def test_handle_ofprotocol_without_dp(self):
|
||||
m = api.RpcOFPManager(dpset=dpset.DPSet())
|
||||
msgid = 1
|
||||
try:
|
||||
m._handle_ofprotocol(msgid, [{}])
|
||||
except api.PendingRPC:
|
||||
pass
|
||||
|
||||
try:
|
||||
m._handle_ofprotocol(msgid, [{'dpid': 1}])
|
||||
except api.PendingRPC:
|
||||
pass
|
||||
|
||||
def _create_dpset(self, dpid=0, ports=None, ofp=None, ofpp=None):
|
||||
dps = dpset.DPSet()
|
||||
dp = DummyDatapath(ofp=ofp, ofpp=ofpp)
|
||||
dp.id = dpid
|
||||
if ports:
|
||||
class DummyPort(object):
|
||||
def __init__(self, port_no):
|
||||
self.port_no = port_no
|
||||
dps.ports = map(lambda n: DummyPort(n), ports)
|
||||
dps.get_ports = lambda dpid: dps.ports
|
||||
|
||||
dps._register(dp)
|
||||
return dps
|
||||
|
||||
def _test_handle_ofprotocol_flowmod(self, ofp, ofpp):
|
||||
dpid = 10
|
||||
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
|
||||
m = api.RpcOFPManager(dpset=dps)
|
||||
msgid = 1
|
||||
nr_sent = 0
|
||||
|
||||
try:
|
||||
m._handle_ofprotocol(msgid, [{'dpid': dpid}])
|
||||
except api.RPCError:
|
||||
pass
|
||||
|
||||
dp = dps.get(dpid)
|
||||
match = ofpp.OFPMatch(metadata=(100, 63),
|
||||
eth_type=2048,
|
||||
ip_proto=112,
|
||||
ipv4_src='172.17.45.1',
|
||||
ipv4_dst='224.0.0.18')
|
||||
inst = [ofpp.OFPInstructionActions(
|
||||
ofp.OFPIT_APPLY_ACTIONS, [ofpp.OFPActionPopVlan()]),
|
||||
ofpp.OFPInstructionGotoTable(35),
|
||||
ofpp.OFPInstructionWriteMetadata(100, 0x255)]
|
||||
|
||||
ofmsg = ofpp.OFPFlowMod(datapath=dp, match=match, instructions=inst)
|
||||
|
||||
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
|
||||
'ofmsg': ofmsg.to_jsondict()}])
|
||||
nr_sent += 1
|
||||
eq_(r, {'xid': 1})
|
||||
eq_(len(dp.sent), nr_sent)
|
||||
|
||||
contexts = {'hello': 'world'}
|
||||
r = m._handle_ofprotocol(msgid, [{'ofmsg': ofmsg.to_jsondict(),
|
||||
'internal': 30,
|
||||
'contexts': contexts}])
|
||||
nr_sent += 1
|
||||
eq_(r, {'xid': 1})
|
||||
eq_(len(dp.sent), nr_sent)
|
||||
eq_(len(m.monitored_flows), 1)
|
||||
eq_(m.monitored_flows[m.format_key(match.to_jsondict())],
|
||||
contexts)
|
||||
|
||||
ofmsg = ofpp.OFPFlowMod(datapath=dp,
|
||||
command=ofp.OFPFC_DELETE,
|
||||
match=match)
|
||||
|
||||
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
|
||||
'ofmsg': ofmsg.to_jsondict(),
|
||||
'contexts': contexts}])
|
||||
nr_sent += 1
|
||||
eq_(r, {'xid': 1})
|
||||
eq_(len(dp.sent), nr_sent)
|
||||
|
||||
try:
|
||||
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
|
||||
'ofmsg': ofmsg.to_jsondict(),
|
||||
'contexts': contexts}])
|
||||
except api.RPCError as e:
|
||||
assert 'unknown key' in str(e)
|
||||
eq_(len(dp.sent), nr_sent)
|
||||
|
||||
def test_handle_ofprotocol_flowmod_12(self):
|
||||
self._test_handle_ofprotocol_flowmod(ofproto_v1_2, ofproto_v1_2_parser)
|
||||
|
||||
def test_handle_ofprotocol_flowmod_13(self):
|
||||
self._test_handle_ofprotocol_flowmod(ofproto_v1_3, ofproto_v1_3_parser)
|
||||
|
||||
def _test_handle_ofprotocol_meter_mod(self, ofp, ofpp):
|
||||
dpid = 10
|
||||
msgid = 1
|
||||
nr_sent = 0
|
||||
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
|
||||
m = api.RpcOFPManager(dpset=dps)
|
||||
|
||||
dp = dps.get(dpid)
|
||||
bands = [ofpp.OFPMeterBandDrop(10, 100)]
|
||||
meter_id = 10
|
||||
ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_ADD,
|
||||
ofp.OFPMF_KBPS, meter_id, bands)
|
||||
|
||||
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
|
||||
'ofmsg': ofmsg.to_jsondict()}])
|
||||
nr_sent += 1
|
||||
eq_(r, {'xid': 1})
|
||||
eq_(len(dp.sent), nr_sent)
|
||||
|
||||
contexts = {'hello': 'world'}
|
||||
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
|
||||
'ofmsg': ofmsg.to_jsondict(),
|
||||
'contexts': contexts}])
|
||||
nr_sent += 1
|
||||
eq_(r, {'xid': 1})
|
||||
eq_(len(dp.sent), nr_sent)
|
||||
eq_(len(m.monitored_meters), 1)
|
||||
eq_(m.monitored_meters[meter_id], contexts)
|
||||
|
||||
ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_DELETE, ofp.OFPMF_KBPS,
|
||||
meter_id)
|
||||
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
|
||||
'ofmsg': ofmsg.to_jsondict(),
|
||||
'contexts': contexts}])
|
||||
nr_sent += 1
|
||||
eq_(r, {'xid': 1})
|
||||
eq_(len(dp.sent), nr_sent)
|
||||
eq_(len(m.monitored_meters), 0)
|
||||
|
||||
def test_handle_ofprotocol_meter_mod_13(self):
|
||||
self._test_handle_ofprotocol_meter_mod(ofproto_v1_3,
|
||||
ofproto_v1_3_parser)
|
||||
|
||||
def _test_handle_ofprotocol(self, ofp, ofpp):
|
||||
dpid = 10
|
||||
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
|
||||
m = api.RpcOFPManager(dpset=dps)
|
||||
msgid = 1
|
||||
nr_sent = 0
|
||||
|
||||
dp = dps.get(dpid)
|
||||
ofmsg = ofpp.OFPBarrierRequest(datapath=dp)
|
||||
nr_sent += 1
|
||||
try:
|
||||
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
|
||||
'ofmsg': ofmsg.to_jsondict()}])
|
||||
except api.NoRPCResponse as e:
|
||||
eq_(e.dpid, dpid)
|
||||
eq_(e.xid, 1)
|
||||
eq_(e.msgid, msgid)
|
||||
eq_(len(dp.sent), nr_sent)
|
||||
|
||||
ofmsg = ofpp.OFPFlowStatsRequest(datapath=dp)
|
||||
nr_sent += 1
|
||||
try:
|
||||
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
|
||||
'ofmsg': ofmsg.to_jsondict()}])
|
||||
except api.NoRPCResponse as e:
|
||||
eq_(e.dpid, dpid)
|
||||
eq_(e.xid, 1)
|
||||
eq_(e.msgid, msgid)
|
||||
eq_(len(dp.sent), nr_sent)
|
||||
|
||||
ofmsg = ofpp.OFPHello(datapath=dp)
|
||||
try:
|
||||
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
|
||||
'ofmsg': ofmsg.to_jsondict()}])
|
||||
except api.RPCError as e:
|
||||
assert 'unknown of message' in str(e)
|
||||
eq_(len(dp.sent), nr_sent)
|
||||
|
||||
def test_handle_ofprotocol_12(self):
|
||||
self._test_handle_ofprotocol(ofproto_v1_2, ofproto_v1_2_parser)
|
||||
|
||||
def test_handle_ofprotocol_13(self):
|
||||
self._test_handle_ofprotocol(ofproto_v1_3, ofproto_v1_3_parser)
|
||||
|
||||
def _test_port_status_handler(self, ofp, ofpp):
|
||||
dpid = 10
|
||||
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
|
||||
dp = dps.get(dpid)
|
||||
port = ofpp.OFPPort(1, 'aa:aa:aa:aa:aa:aa', 'hoge', 0, 0, 0,
|
||||
0, 0, 0, 0, 0)
|
||||
msg = ofpp.OFPPortStatus(datapath=dp, reason=ofp.OFPPR_MODIFY,
|
||||
desc=port)
|
||||
ev = ofp_event.EventOFPPortStatus(msg)
|
||||
|
||||
m = api.RpcOFPManager(dpset=dps)
|
||||
peer = api.Peer(None)
|
||||
peer._endpoint = DummyEndpoint()
|
||||
m._peers.append(peer)
|
||||
m._port_status_handler(ev)
|
||||
|
||||
eq_(len(peer._endpoint.notification), 1)
|
||||
(method, params) = peer._endpoint.notification[0]
|
||||
eq_(method, 'port_status')
|
||||
eq_(params[0], {'port_no': port.port_no, 'port_state': port.state})
|
||||
|
||||
def test_port_status_handler_12(self):
|
||||
self._test_port_status_handler(ofproto_v1_2, ofproto_v1_2_parser)
|
||||
|
||||
def test_port_status_handler_13(self):
|
||||
self._test_port_status_handler(ofproto_v1_3, ofproto_v1_3_parser)
|
||||
|
||||
def _test_packet_in_handler(self, ofp, ofpp):
|
||||
dpid = 10
|
||||
src_ip = '10.0.0.1'
|
||||
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
|
||||
dp = dps.get(dpid)
|
||||
msg = ofpp.OFPPacketIn(datapath=dp)
|
||||
ev = ofp_event.EventOFPPacketIn(msg)
|
||||
|
||||
in_port = 10
|
||||
vlan_id = 100
|
||||
target_ip = '192.168.1.1'
|
||||
msg.match = ofpp.OFPMatch(in_port=in_port)
|
||||
protocols = [ethernet.ethernet(ethertype=ether.ETH_TYPE_8021Q),
|
||||
vlan.vlan(vid=vlan_id), ipv4.ipv4(src=target_ip)]
|
||||
p = packet.Packet(protocols=protocols)
|
||||
p.serialize()
|
||||
msg.data = p.data
|
||||
msg.reason = ofp.OFPR_INVALID_TTL
|
||||
|
||||
m = api.RpcOFPManager(dpset=dps)
|
||||
port_no = 5
|
||||
msgid = 7
|
||||
|
||||
# test the failure case
|
||||
m._packet_in_handler(ev)
|
||||
|
||||
m._register_traceroute([{'vlan': vlan_id,
|
||||
'ip': src_ip,
|
||||
'port': port_no}])
|
||||
m._packet_in_handler(ev)
|
||||
|
||||
(in_port, actions, data) = dp.sent[0]
|
||||
eq_(in_port, port_no)
|
||||
pkt = packet.Packet(data)
|
||||
ip = pkt.get_protocol(ipv4.ipv4)
|
||||
v = pkt.get_protocol(vlan.vlan)
|
||||
ic = pkt.get_protocol(icmp.icmp)
|
||||
eq_(ip.src, src_ip)
|
||||
eq_(ip.dst, target_ip)
|
||||
eq_(ip.proto, inet.IPPROTO_ICMP)
|
||||
eq_(v.vid, vlan_id)
|
||||
eq_(ic.type, icmp.ICMP_TIME_EXCEEDED)
|
||||
eq_(len(actions), 1)
|
||||
eq_(actions[0].port, ofp.OFPP_TABLE)
|
||||
|
||||
def test_packet_in_handler_12(self):
|
||||
self._test_packet_in_handler(ofproto_v1_2, ofproto_v1_2_parser)
|
||||
|
||||
def test_packet_in_handler_13(self):
|
||||
self._test_packet_in_handler(ofproto_v1_3, ofproto_v1_3_parser)
|
||||
|
||||
def _test_handler_datapath(self, ofp, parser):
|
||||
dpid = 10
|
||||
dps = self._create_dpset(dpid, ofp=ofp, ofpp=parser)
|
||||
dp = dps.get(dpid)
|
||||
ev = dpset.EventDP(dp=dp, enter_leave=True)
|
||||
|
||||
m = api.RpcOFPManager(dpset=dps)
|
||||
|
||||
peer = api.Peer(None)
|
||||
peer._endpoint = DummyEndpoint()
|
||||
m._peers.append(peer)
|
||||
|
||||
m._handler_datapath(ev)
|
||||
eq_(len(peer._endpoint.notification), 1)
|
||||
(method, params) = peer._endpoint.notification[0]
|
||||
eq_(method, 'state')
|
||||
eq_(params[0], {'secure_channel_state': 'Up'})
|
||||
|
||||
peer._endpoint.notification.pop()
|
||||
ev = dpset.EventDP(dp=dp, enter_leave=False)
|
||||
m._handler_datapath(ev)
|
||||
eq_(len(peer._endpoint.notification), 1)
|
||||
(method, params) = peer._endpoint.notification[0]
|
||||
eq_(method, 'state')
|
||||
eq_(params[0], {'secure_channel_state': 'Down'})
|
||||
|
||||
eq_(len(dp.sent), 1)
|
||||
return dp
|
||||
|
||||
def test_handler_datapath_12(self):
|
||||
dp = self._test_handler_datapath(ofproto_v1_2, ofproto_v1_2_parser)
|
||||
msg = dp.sent[0]
|
||||
eq_(msg.__class__, ofproto_v1_2_parser.OFPSetConfig)
|
||||
eq_(msg.flags, ofproto_v1_2.OFPC_INVALID_TTL_TO_CONTROLLER)
|
||||
|
||||
def test_handler_datapath_13(self):
|
||||
ofp = ofproto_v1_3
|
||||
dp = self._test_handler_datapath(ofp, ofproto_v1_3_parser)
|
||||
msg = dp.sent[0]
|
||||
eq_(msg.__class__, ofproto_v1_3_parser.OFPSetAsync)
|
||||
eq_(msg.packet_in_mask,
|
||||
[1 << ofp.OFPR_ACTION | 1 << ofp.OFPR_INVALID_TTL, 0])
|
||||
eq_(msg.port_status_mask,
|
||||
[(1 << ofp.OFPPR_ADD | 1 << ofp.OFPPR_DELETE |
|
||||
1 << ofp.OFPPR_MODIFY), 0])
|
||||
|
||||
def _test_port_status_thread(self, ofp, ofpp):
|
||||
dpid = 10
|
||||
port_no = 1
|
||||
port_name = 'OFP11'
|
||||
dps = self._create_dpset(dpid, ports=(port_no,), ofp=ofp, ofpp=ofpp)
|
||||
dp = dps.get(dpid)
|
||||
m = api.RpcOFPManager(dpset=dps)
|
||||
p = dps.get_ports(dpid)
|
||||
p[0].name = port_name
|
||||
threads = []
|
||||
m.monitored_ports[port_name] = ({}, 1)
|
||||
with hub.Timeout(2):
|
||||
threads.append(hub.spawn(m._monitor_thread, port_name,
|
||||
m.monitored_ports, {},
|
||||
m._port_stats_generator))
|
||||
hub.sleep(0.5)
|
||||
for t in threads:
|
||||
hub.kill(t)
|
||||
hub.joinall(threads)
|
||||
|
||||
assert len(dp.sent)
|
||||
for m in dp.sent:
|
||||
eq_(m.__class__, ofpp.OFPPortStatsRequest)
|
||||
eq_(m.port_no, port_no)
|
||||
|
||||
def test_port_status_thread_12(self):
|
||||
self._test_port_status_thread(ofproto_v1_2, ofproto_v1_2_parser)
|
||||
|
||||
def test_port_status_thread_13(self):
|
||||
self._test_port_status_thread(ofproto_v1_3, ofproto_v1_3_parser)
|
||||
|
||||
def _test_queue_status_thread(self, ofp, ofpp):
|
||||
dpid = 10
|
||||
queue_id = 2
|
||||
port_no = 16
|
||||
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
|
||||
dp = dps.get(dpid)
|
||||
m = api.RpcOFPManager(dpset=dps)
|
||||
p = dps.get_ports(dpid)
|
||||
threads = []
|
||||
m.monitored_queues[queue_id] = ({}, 1)
|
||||
with hub.Timeout(2):
|
||||
threads.append(hub.spawn(m._monitor_thread, queue_id,
|
||||
m.monitored_queues, {'port_no': port_no},
|
||||
m._queue_stats_generator))
|
||||
hub.sleep(0.5)
|
||||
for t in threads:
|
||||
hub.kill(t)
|
||||
hub.joinall(threads)
|
||||
|
||||
assert len(dp.sent)
|
||||
for m in dp.sent:
|
||||
eq_(m.__class__, ofpp.OFPQueueStatsRequest)
|
||||
eq_(m.port_no, port_no)
|
||||
eq_(m.queue_id, queue_id)
|
||||
|
||||
def test_queue_status_thread_12(self):
|
||||
self._test_queue_status_thread(ofproto_v1_2, ofproto_v1_2_parser)
|
||||
|
||||
def test_queue_status_thread_13(self):
|
||||
self._test_queue_status_thread(ofproto_v1_3, ofproto_v1_3_parser)
|
||||
|
||||
def _test_flow_stats_loop(self, ofp, ofpp):
|
||||
dpid = 10
|
||||
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
|
||||
dp = dps.get(dpid)
|
||||
m = api.RpcOFPManager(dpset=dps)
|
||||
|
||||
match = ofpp.OFPMatch(in_port=1)
|
||||
ofmsg = ofpp.OFPFlowMod(datapath=dp, match=match)
|
||||
|
||||
contexts = {'hello': 'you'}
|
||||
msgid = 10
|
||||
r = m._handle_ofprotocol(msgid, [{'ofmsg': ofmsg.to_jsondict(),
|
||||
'internal': 30,
|
||||
'contexts': contexts}])
|
||||
threads = []
|
||||
key = m.format_key(ofmsg.match.to_jsondict())
|
||||
with hub.Timeout(5):
|
||||
threads.append(hub.spawn(m._flow_stats_loop,
|
||||
dp, ofmsg.table_id, match, 0.1, key))
|
||||
hub.sleep(0.5)
|
||||
|
||||
ofmsg = ofpp.OFPFlowMod(datapath=dp,
|
||||
command=ofp.OFPFC_DELETE,
|
||||
match=match)
|
||||
msgid = 11
|
||||
r = m._handle_ofprotocol(msgid, [{'ofmsg': ofmsg.to_jsondict(),
|
||||
'internal': 30,
|
||||
'contexts': contexts}])
|
||||
eq_(len(m.monitored_flows), 0)
|
||||
hub.joinall(threads)
|
||||
|
||||
for m in dp.sent:
|
||||
if m.__class__ in (ofpp.OFPFlowMod, ofpp.OFPPortStatsRequest):
|
||||
continue
|
||||
eq_(m.__class__, ofpp.OFPFlowStatsRequest)
|
||||
eq_(m.table_id, ofmsg.table_id)
|
||||
eq_(m.match.to_jsondict(), match.to_jsondict())
|
||||
|
||||
def test_flow_stats_loop_12(self):
|
||||
self._test_flow_stats_loop(ofproto_v1_2, ofproto_v1_2_parser)
|
||||
|
||||
def test_flow_stats_loop_13(self):
|
||||
self._test_flow_stats_loop(ofproto_v1_3, ofproto_v1_3_parser)
|
||||
|
||||
def _test_meter_stats_loop(self, ofp, ofpp):
|
||||
dpid = 10
|
||||
msgid = 1
|
||||
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
|
||||
dp = dps.get(dpid)
|
||||
m = api.RpcOFPManager(dpset=dps)
|
||||
bands = [ofpp.OFPMeterBandDrop(10, 100)]
|
||||
meter_id = 10
|
||||
ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_ADD,
|
||||
ofp.OFPMF_KBPS, meter_id, bands)
|
||||
contexts = {'hello': 'world'}
|
||||
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
|
||||
'ofmsg': ofmsg.to_jsondict(),
|
||||
'contexts': contexts}])
|
||||
threads = []
|
||||
with hub.Timeout(5):
|
||||
threads.append(hub.spawn(m._meter_stats_loop,
|
||||
dp, 0.1, meter_id))
|
||||
hub.sleep(0.5)
|
||||
ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_DELETE, ofp.OFPMF_KBPS,
|
||||
meter_id)
|
||||
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
|
||||
'ofmsg': ofmsg.to_jsondict(),
|
||||
'contexts': contexts}])
|
||||
eq_(len(m.monitored_meters), 0)
|
||||
hub.joinall(threads)
|
||||
|
||||
for m in dp.sent:
|
||||
if m.__class__ in (ofpp.OFPMeterMod, ofpp.OFPPortStatsRequest):
|
||||
continue
|
||||
eq_(m.__class__, ofpp.OFPMeterStatsRequest)
|
||||
eq_(m.meter_id, ofmsg.meter_id)
|
||||
|
||||
def test_meter_stats_loop_13(self):
|
||||
self._test_meter_stats_loop(ofproto_v1_3, ofproto_v1_3_parser)
|
||||
|
||||
def _test_rpc_message_thread(self, ofp, ofpp):
|
||||
dpid = 10
|
||||
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
|
||||
dp = dps.get(dpid)
|
||||
m = api.RpcOFPManager(dpset=dps)
|
||||
|
||||
peer = api.Peer(m._rpc_events)
|
||||
peer._endpoint = DummyEndpoint()
|
||||
m._peers.append(peer)
|
||||
|
||||
msgid = 7
|
||||
ofmsg = ofpp.OFPBarrierRequest(datapath=dp)
|
||||
params = {'dpid': dpid,
|
||||
'ofmsg': ofmsg.to_jsondict()}
|
||||
data = (msgid, 'ofp', [params])
|
||||
with hub.Timeout(2):
|
||||
peer._handle_rpc_request(data)
|
||||
hub.sleep(0.5)
|
||||
|
||||
for s in dp.sent:
|
||||
if s.__class__ in (ofpp.OFPPortStatsRequest,):
|
||||
continue
|
||||
eq_(s.__class__, ofpp.OFPBarrierRequest)
|
||||
|
||||
msg = ofpp.OFPBarrierReply(datapath=dp)
|
||||
msg.set_xid(1)
|
||||
ev = ofp_event.EventOFPBarrierReply(msg)
|
||||
m._barrier_reply_handler(ev)
|
||||
eq_(len(peer._endpoint.response), 1)
|
||||
rsp = peer._endpoint.response.pop()
|
||||
eq_(rsp[0], msgid)
|
||||
eq_(rsp[1], None)
|
||||
eq_(rsp[2], msg.to_jsondict())
|
||||
eq_(len(peer.wait_for_ofp_resepnse[dpid]), 0)
|
||||
|
||||
m._barrier_reply_handler(ev)
|
||||
eq_(len(peer._endpoint.response), 0)
|
||||
|
||||
ev = dpset.EventDP(dp=dp, enter_leave=False)
|
||||
m._handler_datapath(ev)
|
||||
eq_(len(peer.wait_for_ofp_resepnse), 0)
|
||||
|
||||
# bogus RPC
|
||||
with hub.Timeout(2):
|
||||
m._rpc_events.put((peer, rpc.MessageType.REQUEST,
|
||||
(msgid, 'you')))
|
||||
hub.sleep(0.5)
|
||||
|
||||
def test_rpc_message_thread_12(self):
|
||||
self._test_rpc_message_thread(ofproto_v1_2, ofproto_v1_2_parser)
|
||||
|
||||
def test_rpc_message_thread_13(self):
|
||||
self._test_rpc_message_thread(ofproto_v1_3, ofproto_v1_3_parser)
|
||||
|
||||
def _create_port_stats_args(self, port_no=0):
|
||||
return {'port_no': port_no, 'rx_packets': 10, 'tx_packets': 30,
|
||||
'rx_bytes': 1024, 'tx_bytes': 2048,
|
||||
'rx_dropped': 0, 'tx_dropped': 1,
|
||||
'rx_errors': 0, 'tx_errors': 0,
|
||||
'rx_frame_err': 0, 'rx_over_err': 0, 'rx_crc_err': 0,
|
||||
'collisions': 0}
|
||||
|
||||
def _test_stats_reply_port_handler(self, ofp, ofpp):
|
||||
dpid = 10
|
||||
dps = self._create_dpset(dpid, (0, 1, 2), ofp, ofpp)
|
||||
dp = dps.get(dpid)
|
||||
m = api.RpcOFPManager(dpset=dps)
|
||||
|
||||
msgid = 1
|
||||
r = m._monitor_port(msgid, [{'physical_port_no': 0,
|
||||
'contexts': {'hello': 'world'},
|
||||
'interval': 30}])
|
||||
return m, dp
|
||||
|
||||
def test_stats_reply_port_handler_12(self):
|
||||
ofp = ofproto_v1_2
|
||||
ofpp = ofproto_v1_2_parser
|
||||
manager, dp = self._test_stats_reply_port_handler(ofp, ofpp)
|
||||
|
||||
msg = ofpp.OFPStatsReply(datapath=dp, type_=ofp.OFPST_PORT)
|
||||
ev = ofp_event.EventOFPStatsReply(msg)
|
||||
msg.body = []
|
||||
for p in range(2):
|
||||
port = ofpp.OFPPortStats(**self._create_port_stats_args(p))
|
||||
msg.body.append(port)
|
||||
manager._stats_reply_handler(ev)
|
||||
|
||||
def test_stats_reply_port_handler_13(self):
|
||||
ofp = ofproto_v1_3
|
||||
ofpp = ofproto_v1_3_parser
|
||||
manager, dp = self._test_stats_reply_port_handler(ofp, ofpp)
|
||||
msg = ofpp.OFPPortStatsReply(dp)
|
||||
ev = ofp_event.EventOFPPortStatsReply(msg)
|
||||
msg.body = []
|
||||
for p in range(2):
|
||||
d = self._create_port_stats_args(p)
|
||||
d['duration_sec'] = 0
|
||||
d['duration_nsec'] = 0
|
||||
port = ofpp.OFPPortStats(**d)
|
||||
msg.body.append(port)
|
||||
manager._port_stats_reply_handler(ev)
|
||||
|
||||
def _test_stats_reply_flow_handler(self, ofp, ofpp):
|
||||
dpid = 10
|
||||
msgid = 9
|
||||
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
|
||||
m = api.RpcOFPManager(dpset=dps)
|
||||
|
||||
dp = dps.get(dpid)
|
||||
match = ofpp.OFPMatch(in_port=1)
|
||||
ofmsg = ofpp.OFPFlowMod(datapath=dp, match=match)
|
||||
|
||||
contexts = {'hello': 'world'}
|
||||
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
|
||||
'ofmsg': ofmsg.to_jsondict(),
|
||||
'contexts': contexts}])
|
||||
return m, dp
|
||||
|
||||
def test_stats_reply_flow_handler_12(self):
|
||||
ofp = ofproto_v1_2
|
||||
ofpp = ofproto_v1_2_parser
|
||||
manager, dp = self._test_stats_reply_flow_handler(ofp, ofpp)
|
||||
|
||||
msg = ofpp.OFPStatsReply(datapath=dp,
|
||||
type_=ofp.OFPST_FLOW)
|
||||
ev = ofp_event.EventOFPStatsReply(msg)
|
||||
s1 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10,
|
||||
priority=0, idle_timeout=0, hard_timeout=0,
|
||||
cookie=0, packet_count=10, byte_count=100,
|
||||
match=ofpp.OFPMatch(in_port=1))
|
||||
# not-monitored flow
|
||||
s2 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10,
|
||||
priority=0, idle_timeout=0, hard_timeout=0,
|
||||
cookie=0, packet_count=10, byte_count=100,
|
||||
match=ofpp.OFPMatch(in_port=2))
|
||||
msg.body = [s1, s2]
|
||||
manager._stats_reply_handler(ev)
|
||||
|
||||
def test_stats_reply_flow_handler_13(self):
|
||||
ofp = ofproto_v1_3
|
||||
ofpp = ofproto_v1_3_parser
|
||||
manager, dp = self._test_stats_reply_flow_handler(ofp, ofpp)
|
||||
|
||||
msg = ofpp.OFPFlowStatsReply(datapath=dp)
|
||||
ev = ofp_event.EventOFPStatsReply(msg)
|
||||
s1 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10,
|
||||
priority=0, idle_timeout=0, hard_timeout=0,
|
||||
cookie=0, packet_count=10, byte_count=100,
|
||||
match=ofpp.OFPMatch(in_port=1))
|
||||
# not-monitored flow
|
||||
s2 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10,
|
||||
priority=0, idle_timeout=0, hard_timeout=0,
|
||||
cookie=0, packet_count=10, byte_count=100,
|
||||
match=ofpp.OFPMatch(in_port=2))
|
||||
msg.body = [s1, s2]
|
||||
manager._flow_stats_reply_handler(ev)
|
||||
|
||||
def test_stats_reply_meter_handler_13(self):
|
||||
ofp = ofproto_v1_3
|
||||
ofpp = ofproto_v1_3_parser
|
||||
dpid = 10
|
||||
msgid = 9
|
||||
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
|
||||
m = api.RpcOFPManager(dpset=dps)
|
||||
|
||||
dp = dps.get(dpid)
|
||||
bands = [ofpp.OFPMeterBandDrop(10, 100)]
|
||||
meter_id = 10
|
||||
ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_ADD,
|
||||
ofp.OFPMF_KBPS, meter_id, bands)
|
||||
contexts = {'hello': 'world'}
|
||||
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
|
||||
'ofmsg': ofmsg.to_jsondict(),
|
||||
'contexts': contexts}])
|
||||
|
||||
msg = ofpp.OFPMeterStatsReply(datapath=dp)
|
||||
ev = ofp_event.EventOFPStatsReply(msg)
|
||||
s = ofpp.OFPMeterStats(meter_id=meter_id, flow_count=10,
|
||||
packet_in_count=10, byte_in_count=10,
|
||||
duration_sec=10, duration_nsec=10,
|
||||
band_stats=[ofpp.OFPMeterBandStats(1, 8),
|
||||
ofpp.OFPMeterBandStats(2, 16)])
|
||||
|
||||
msg.body = [s]
|
||||
m._meter_stats_reply_handler(ev)
|
||||
107
ryu/tests/unit/app/test_vrrp.py
Normal file
107
ryu/tests/unit/app/test_vrrp.py
Normal file
@ -0,0 +1,107 @@
|
||||
# Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import unittest
|
||||
|
||||
from nose.tools import eq_
|
||||
from nose.tools import raises
|
||||
|
||||
from ryu.lib import hub
|
||||
from ryu.lib import rpc
|
||||
from ryu.services.protocols.vrrp.rpc_manager import RpcVRRPManager, Peer
|
||||
from ryu.services.protocols.vrrp import event as vrrp_event
|
||||
|
||||
|
||||
class DummyEndpoint(object):
|
||||
def __init__(self):
|
||||
self.response = []
|
||||
self.notification = []
|
||||
|
||||
def send_response(self, msgid, error, result):
|
||||
self.response.append((msgid, error, result))
|
||||
|
||||
def send_notification(self, method, params):
|
||||
self.notification.append((method, params))
|
||||
|
||||
|
||||
class TestVRRP(unittest.TestCase):
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def test_rpc_request(self):
|
||||
rm = RpcVRRPManager()
|
||||
sent_events = []
|
||||
rm.send_event = lambda name, ev: sent_events.append((name, ev))
|
||||
peer = Peer(queue=rm._rpc_events)
|
||||
peer._endpoint = DummyEndpoint()
|
||||
rm._peers.append(peer)
|
||||
|
||||
msgid = 10
|
||||
params = {}
|
||||
with hub.Timeout(2):
|
||||
peer._handle_vrrp_request((msgid, 'vrrp_list', [params]))
|
||||
hub.sleep(0.1)
|
||||
|
||||
eq_(len(sent_events), 1)
|
||||
req = sent_events.pop()[1]
|
||||
eq_(req.__class__, vrrp_event.EventVRRPListRequest)
|
||||
|
||||
req.reply_q.put(vrrp_event.EventVRRPListReply([]))
|
||||
hub.sleep(0.1)
|
||||
(msgid_, error, result) = peer._endpoint.response.pop()
|
||||
eq_(error, None)
|
||||
eq_(result, [])
|
||||
|
||||
params = {'vrid': 1}
|
||||
with hub.Timeout(2):
|
||||
peer._handle_vrrp_request((msgid, 'vrrp_config', [params]))
|
||||
hub.sleep(0.1)
|
||||
|
||||
msgid_, error, result = peer._endpoint.response.pop()
|
||||
eq_(result, None)
|
||||
|
||||
params = {'version': 3,
|
||||
'vrid': 1,
|
||||
'ip_addr': '192.168.1.1',
|
||||
'contexts': {'resource_id': 'XXX',
|
||||
'resource_name': 'vrrp_session'},
|
||||
'statistics_log_enabled': True,
|
||||
'statistics_interval': 10,
|
||||
'priority': 100,
|
||||
'ifname': 'veth0',
|
||||
'vlan_id': None,
|
||||
'ip_address': '192.168.1.2',
|
||||
'advertisement_interval': 10,
|
||||
'preempt_mode': True,
|
||||
'preempt_delay': 10,
|
||||
'admin_state_up': True
|
||||
}
|
||||
with hub.Timeout(2):
|
||||
peer._handle_vrrp_request((msgid, 'vrrp_config', [params]))
|
||||
hub.sleep(0.1)
|
||||
|
||||
eq_(len(sent_events), 1)
|
||||
req = sent_events.pop()[1]
|
||||
eq_(req.__class__, vrrp_event.EventVRRPConfigRequest)
|
||||
req.reply_q.put(vrrp_event.EventVRRPConfigReply('hoge',
|
||||
req.interface,
|
||||
req.config))
|
||||
hub.sleep(0.1)
|
||||
(msgid_, error, result) = peer._endpoint.response.pop()
|
||||
eq_(error, None)
|
||||
@ -4,5 +4,5 @@ webob>=1.0.8
|
||||
paramiko
|
||||
lxml
|
||||
netaddr
|
||||
oslo.config
|
||||
oslo.config==1.1.1
|
||||
msgpack-python>=0.4.0
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user