Compare commits

...

37 Commits

Author SHA1 Message Date
Karthik Ramasubramanian
bc35ba0178 Tie oslo.config version
oslo.config > 1.2.1 requires pip >= 1.4 but ubuntu has only 1.1. Tie
oslo.config 1.1.1 so that we continue to use pip 1.1
2014-03-25 17:12:20 -07:00
FUJITA Tomonori
a2106b6676 apgw: add more queue stats unittest
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-03-25 19:17:57 +09:00
FUJITA Tomonori
e94ee078ee apgw: fix unittest failure
fix unittest failure caused by 5a4d429a6d66b587a8dcee2d7749941df05610b1.

also fix pep8

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-03-25 19:01:13 +09:00
FUJITA Tomonori
be8f4d656a apgw: specify port_no for queuestats
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-03-25 19:00:59 +09:00
FUJITA Tomonori
fd81201a52 apgw: fix vrrp rpc API
'ip_address' is a real ip address.
'ip_addr' is a virtual ip address.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-03-24 08:31:20 +09:00
Karthik Ramasubramanian
df35e35703 Revert stats log. Only syslog will be used now 2014-03-20 19:02:09 -07:00
Karthik Ramasubramanian
fa709ba75b Add rudimentary retry mechanism for ofcapable switch 2014-03-20 18:42:12 -07:00
Karthik Ramasubramanian
5a4d429a6d Add band name to meter stats 2014-03-20 18:41:44 -07:00
FUJITA Tomonori
690edd4000 apgw: add level info to log format
also fixed unittests.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-03-14 08:33:47 +09:00
Anantha Ramaiah
f0e9c92f78 This fix addresses the following :-
- fix port status mask to enable port status message reception
- fix packet_in mask to enable invalid_TTL packets to the controller
- This fix *does not* address the TTL packet handling which seems to be broken
2014-03-12 18:29:42 -07:00
FUJITA Tomonori
dca44fee6d apgw: increase ofconfig connection timeout to 180 secs
workaround for netronome startup.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-03-10 08:45:37 +09:00
Karthik Ramasubramanian
294d84ef33 Re-enable stats logging to separate file 2014-03-08 02:50:28 -08:00
FUJITA Tomonori
78d3ec5a68 apgw: fix unittests for ofwire traceroute
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-03-06 22:25:48 +09:00
FUJITA Tomonori
a3fda22167 apgw: fix traceroute notificaiton
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-03-06 21:56:12 +09:00
FUJITA Tomonori
bf21ed0b0b apgw: make vrrp state change log critical
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-03-05 23:25:52 +09:00
FUJITA Tomonori
0edb286f83 apgw: change log level to critical about dp disconnect
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-03-05 15:04:29 +09:00
FUJITA Tomonori
374f32a20a apgw: vrrp uses physical mac address instead of virtual mac
with --vrrp-use-vmac option, vrrp still uses virtual mac

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-03-01 17:13:35 +09:00
FUJITA Tomonori
53cfa6dacf apgw: log all stats parameters
Just log all the attributes in OFP*Stats.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-26 20:45:35 +09:00
FUJITA Tomonori
278bc8fe00 apgw: fix port stats
Support interval per port. refactor to use the same code for port and
queue stats.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-25 17:17:58 +09:00
FUJITA Tomonori
9ce8698b21 apgw: add queue stats support
RPC API:

monitor_queue [{'queue_id': 1, 'interval': 5, 'contexts': {'hello':'world'}}]

if 'interval' is zero, ofwire stops monitoring the specified queue.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-22 10:47:20 +09:00
FUJITA Tomonori
a7b6f0a036 apgw: ofconfig get and edit support
param: {}

{'OFPort': [{'admin-state': 'up', 'name': 'OFP11', 'oper-state': 'down', 'number': 11}], 'OFQueue': [{'max-rate': '10000', 'experimenter': '[4096]', 'id': '1100', 'resource_id': 'QID0-OFP11'}]}

param: [{'port':'OFP11', 'admin-state': 'up'}]

param: [{'queue':'QID0-OFP11', 'max-rate': 10000}]

param: [{'queue': 'QID0-OFP11', 'experimenter': 4096}]

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-15 23:26:12 +09:00
FUJITA Tomonori
3b6842eaed ofconfig: add commit and discard_changes operation support
They are necessary for switches that don't support the feature of
modifying 'running' configuration directly; instead, needs to modify
'candidate' configuration and then commit (or discard).

Also fix do_get method's comment typo.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-15 20:46:04 +09:00
FUJITA Tomonori
dab0aca5cc apgw: add ofconfig oper-state
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-13 20:21:02 +09:00
FUJITA Tomonori
b91a263d07 of_config: add OFPortStateType class
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-13 17:55:08 +09:00
FUJITA Tomonori
1e56302a56 apgw: add initial ofconfig support
suppots getting port information. 'ofconfig' method without parameters
returns:

fujita-air:ryu fujita$ ./bin/rpc-cli --peers=local=localhost:50001
(Cmd) request local ofconfig [{}]
RESULT {'OFPort': [{'admin-state': 'up', 'name': 'Port1', 'number': 1}, {'admin-state': 'up', 'name': 'Port2', 'number': 2}]}

Also added new four command line options to specify the address of
ofconfig-capable switch.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-12 18:25:07 +09:00
FUJITA Tomonori
0f087cefeb apgw: fix log for bogus RPC
write to the proper logger.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-12 10:34:33 +09:00
FUJITA Tomonori
9131d24c00 apgw: adjust VRRP RPC parameters
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-08 06:33:37 -08:00
FUJITA Tomonori
152ad2ea3e apgw: update the log format
Write to syslog in the proper format.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-08 04:32:49 -08:00
FUJITA Tomonori
9b9208d9e6 test: add vrrp unittests
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-08 20:33:12 +09:00
FUJITA Tomonori
692290fd67 apgw: add Meter suport
- handle MeterMod message
- log meter stats periodically

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-07 23:12:31 +09:00
FUJITA Tomonori
87fba4e199 of13: set the default attributes for OFPMeterMod
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-07 23:12:08 +09:00
FUJITA Tomonori
393e354b97 apgw: use flowmod JSON used in production
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-07 10:28:07 +09:00
FUJITA Tomonori
5b487b165a add the debug code to catch bogus RPC messages
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-06 09:52:44 +09:00
FUJITA Tomonori
208ba0b039 of12: fix InstructionWriteMetadata json decoder
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-06 08:27:15 +09:00
FUJITA Tomonori
a0501a12b1 apgw: add RPC API
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-03 20:28:55 +09:00
FUJITA Tomonori
47ef512494 apgw: add logging library
Will be used by VRRP too.

Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-03 20:28:04 +09:00
FUJITA Tomonori
eb026768fc apgw: enable syslog logging by default
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
2014-02-03 20:20:26 +09:00
19 changed files with 1808 additions and 31 deletions

View File

@ -40,6 +40,7 @@ from ryu import version
from ryu.app import wsgi
from ryu.base.app_manager import AppManager
from ryu.controller import controller
from ryu.controller import api
from ryu.topology import switches

View File

@ -143,7 +143,7 @@ class Cmd(cmd.Cmd):
def do_get(self, line):
"""get <peer>
eg. get_config sw1
eg. get sw1
"""
def f(p, args):
@ -151,6 +151,26 @@ class Cmd(cmd.Cmd):
self._request(line, f)
def do_commit(self, line):
"""commit <peer>
eg. commit sw1
"""
def f(p, args):
print p.commit()
self._request(line, f)
def do_discard(self, line):
"""discard <peer>
eg. discard sw1
"""
def f(p, args):
print p.discard_changes()
self._request(line, f)
def do_get_config(self, line):
"""get_config <peer> <source>
eg. get_config sw1 startup

759
ryu/controller/api.py Normal file
View File

@ -0,0 +1,759 @@
import json
import logging
from operator import attrgetter
from oslo.config import cfg
from ryu.base import app_manager
from ryu.controller import handler
from ryu.controller import dpset
from ryu.controller import ofp_event
from ryu.ofproto import ofproto_parser
from ryu.ofproto import ofproto_v1_2
from ryu.ofproto import ofproto_v1_2_parser
from ryu.ofproto import ofproto_v1_3
from ryu.ofproto import ofproto_v1_3_parser
from ryu.lib import hub
from ryu.lib import apgw
from ryu.lib import rpc
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet
from ryu.lib.packet import icmp
from ryu.lib.packet import ipv4
from ryu.lib.packet import vlan
from ryu.lib.of_config import capable_switch as cs
from ryu.lib.of_config import constants as consts
import ryu.lib.of_config.classes as ofc
import eventlet
import sys
_OFCONFIG_RETRIES = 5
_OFCONFIG_TIMEOUT = 280
_ = type('', (apgw.StructuredMessage,), {})
_.COMPONENT_NAME = 'ofwire'
CONF = cfg.CONF
CONF.register_cli_opts([
cfg.StrOpt('ofconfig-address', default='127.0.0.1',
help='of-config switch address'),
cfg.IntOpt('ofconfig-port', default=1830,
help='of-config switch port'),
cfg.StrOpt('ofconfig-user', default='linc',
help='of-config user name'),
cfg.StrOpt('ofconfig-password', default='linc',
help='of-config password'),
cfg.StrOpt('ofconfig-timeout', default=_OFCONFIG_TIMEOUT,
help='of-config timeout per attempt'),
cfg.StrOpt('ofconfig-retries', default=_OFCONFIG_RETRIES,
help='of-config retries'),
])
class RPCError(Exception):
pass
class NoRPCResponse(Exception):
def __init__(self, dpid=None, xid=None, msgid=None):
self.dpid = dpid
self.xid = xid
self.msgid = msgid
class PendingRPC(Exception):
pass
class Peer(object):
def __init__(self, queue):
super(Peer, self).__init__()
self._queue = queue
self.wait_for_ofp_resepnse = {}
def _handle_rpc_request(self, data):
self._queue.put((self, rpc.MessageType.REQUEST, data))
def _handle_rpc_notify(self, data):
self._queue.put((self, rpc.MessageType.NOTIFY, data))
class RpcOFPManager(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_2.OFP_VERSION, ofproto_v1_3.OFP_VERSION]
LOGGER_NAME = 'ofwire'
_CONTEXTS = {
'dpset': dpset.DPSet,
}
def __init__(self, *args, **kwargs):
super(RpcOFPManager, self).__init__(*args, **kwargs)
self.dpset = kwargs['dpset']
self._peers = []
self.traceroute_source = {}
self.monitored_ports = {}
self.monitored_flows = {}
self.monitored_meters = {}
self.monitored_queues = {}
self.pending_rpc_requests = []
self._rpc_events = hub.Queue(128)
# per 30 secs by default
self.ofconfig = hub.Queue(128)
hub.spawn(self._peer_accept_thread)
hub.spawn(self._rpc_message_thread)
hub.spawn(self._ofconfig_thread)
apgw.update_syslog_format()
def _rpc_message_thread(self):
while True:
(peer, _type, data) = self._rpc_events.get()
error = None
result = None
try:
if _type == rpc.MessageType.REQUEST:
msgid, target_method, params = data
if target_method == "ofp":
result = self._handle_ofprotocol(msgid, params)
elif target_method == "monitor_port":
result = self._monitor_port(msgid, params)
elif target_method == "monitor_queue":
result = self._monitor_queue(msgid, params)
elif target_method == "ofconfig":
self._ofconfig(peer, msgid, params)
else:
error = 'Unknown method %s' % (target_method)
elif _type == rpc.MessageType.NOTIFY:
target_method, params = data
if target_method == 'traceroute':
try:
self._register_traceroute(params)
except RPCError as e:
self.logger.error(_({'error': str(e)}))
else:
self.logger.error(_({'unknown method': target_method}))
continue
except RPCError as e:
error = str(e)
except PendingRPC as e:
# we handle the RPC request after a datapath joins.
self.pending_rpc_requests.append((peer, data))
continue
except NoRPCResponse as e:
# we'll send RPC sesponse after we get a response from
# datapath.
if e.dpid is not None:
d = peer.wait_for_ofp_resepnse.setdefault(e.dpid, {})
d[e.xid] = e.msgid
continue
except:
self.logger.info(_({'bogus RPC': data}))
peer._endpoint.send_response(msgid, error=error, result=result)
def _peer_loop_thread(self, peer):
peer._endpoint.serve()
# the peer connection is closed
self._peers.remove(peer)
def peer_accept_handler(self, new_sock, addr):
peer = Peer(self._rpc_events)
table = {
rpc.MessageType.REQUEST: peer._handle_rpc_request,
rpc.MessageType.NOTIFY: peer._handle_rpc_notify,
}
peer._endpoint = rpc.EndPoint(new_sock, disp_table=table)
self._peers.append(peer)
hub.spawn(self._peer_loop_thread, peer)
def _peer_accept_thread(self):
server = hub.StreamServer(('', 50001),
self.peer_accept_handler)
server.serve_forever()
def _send_waited_rpc_response(self, msg):
for peer in self._peers:
if not msg.datapath.id in peer.wait_for_ofp_resepnse:
continue
if msg.xid in peer.wait_for_ofp_resepnse[msg.datapath.id]:
msgid = peer.wait_for_ofp_resepnse[msg.datapath.id][msg.xid]
peer._endpoint.send_response(msgid, error=None,
result=msg.to_jsondict())
del peer.wait_for_ofp_resepnse[msg.datapath.id][msg.xid]
return
def compare_key(self, k1, k2):
k1 = eval(k1)
k2 = eval(k2)
l1 = k1['OFPMatch']['oxm_fields']
l2 = k2['OFPMatch']['oxm_fields']
return sorted(l1) == sorted(l2)
def format_key(self, match_json):
del match_json['OFPMatch']['length']
for t in match_json['OFPMatch']['oxm_fields']:
tlv = t['OXMTlv']
if tlv['field'] in ['ipv4_dst', 'ipv4_src']:
if tlv['mask'] == '255.255.255.255':
tlv['mask'] = None
return str(match_json)
@handler.set_ev_cls(dpset.EventDP)
def _handler_datapath(self, ev):
if ev.enter:
dp = ev.dp
parser = dp.ofproto_parser
ofp = dp.ofproto
if ofp.OFP_VERSION == ofproto_v1_2.OFP_VERSION:
m = parser.OFPSetConfig(dp,
ofp.OFPC_INVALID_TTL_TO_CONTROLLER,
ofp.OFPCML_MAX)
elif ofp.OFP_VERSION == ofproto_v1_3.OFP_VERSION:
packet_in_mask = (ofp.OFPR_ACTION_MASK |
ofp.OFPR_INVALID_TTL_MASK)
port_status_mask = (ofp.OFPPR_ADD_MASK |
ofp.OFPPR_DELETE_MASK |
ofp.OFPPR_MODIFY_MASK)
m = parser.OFPSetAsync(dp, [packet_in_mask, 0],
[port_status_mask, 0],
[0, 0])
dp.send_msg(m)
log_msg = {"event": "dp connected", "dpid": ev.dp.id}
notify_param = {'secure_channel_state': 'Up'}
for p in self.pending_rpc_requests:
(peer, data) = p
self._rpc_events.put((peer, rpc.MessageType.REQUEST, data))
else:
log_msg = {"event": "dp disconnected"}
notify_param = {'secure_channel_state': 'Down'}
for peer in self._peers:
if ev.dp.id in peer.wait_for_ofp_resepnse:
del peer.wait_for_ofp_resepnse[ev.dp.id]
self.logger.critical(_(log_msg))
for peer in self._peers:
peer._endpoint.send_notification("state", [notify_param])
@handler.set_ev_cls(ofp_event.EventOFPErrorMsg,
handler.MAIN_DISPATCHER)
def _error_msg_handler(self, ev):
self.logger.info(_(ev.msg.to_jsondict()))
@handler.set_ev_cls(ofp_event.EventOFPBarrierReply,
handler.MAIN_DISPATCHER)
def _barrier_reply_handler(self, ev):
self._send_waited_rpc_response(ev.msg)
@handler.set_ev_cls(ofp_event.EventOFPFlowStatsReply,
handler.MAIN_DISPATCHER)
def _flow_stats_reply_handler(self, ev):
msg = ev.msg
for body in msg.body:
key = self.format_key(body.match.to_jsondict())
contexts = None
for k in self.monitored_flows.keys():
if self.compare_key(k, key):
contexts = self.monitored_flows[k]
break
if contexts is not None:
stats = body.to_jsondict()['OFPFlowStats']
stats.update(contexts)
self.logger.info(_(msg=stats, log_type='stats'))
@handler.set_ev_cls(ofp_event.EventOFPPortStatsReply,
handler.MAIN_DISPATCHER)
def _port_stats_reply_handler(self, ev):
msg = ev.msg
dp = msg.datapath
for stat in sorted(msg.body, key=attrgetter('port_no')):
try:
port = self.dpset.get_port(dp.id, stat.port_no)
except:
continue
if port.name in self.monitored_ports:
stats = {'physical_port_no': port.name}
stats.update(stat.to_jsondict()['OFPPortStats'])
contexts, interval_ = self.monitored_ports[port.name]
stats.update(contexts)
self.logger.info(_(msg=stats, log_type='stats'))
def _add_band_name(self, stats):
new_stats = [
{'band_name': {'pir': stats[0]}},
{'band_name': {'cir': stats[1]}}
]
return new_stats
@handler.set_ev_cls(ofp_event.EventOFPMeterStatsReply,
handler.MAIN_DISPATCHER)
def _meter_stats_reply_handler(self, ev):
msg = ev.msg
dp = msg.datapath
for stat in msg.body:
if stat.meter_id in self.monitored_meters:
contexts = self.monitored_meters[stat.meter_id]
stats = stat.to_jsondict()['OFPMeterStats']
stats['band_stats'] = self._add_band_name(stats['band_stats'])
stats.update(contexts)
self.logger.info(_(msg=stats, log_type='stats'))
@handler.set_ev_cls(ofp_event.EventOFPQueueStatsReply,
handler.MAIN_DISPATCHER)
def _queue_stats_reply_handler(self, ev):
msg = ev.msg
dp = msg.datapath
for stat in msg.body:
if stat.queue_id in self.monitored_queues:
contexts, interval_ = self.monitored_queues[stat.queue_id]
stats = stat.to_jsondict()['OFPQueueStats']
stats = {'queue_id': stat.queue_id,
'port_no': stat.port_no,
'tx_bytes': stat.tx_bytes,
'tx_packets': stat.tx_packets,
'tx_errors': stat.tx_errors}
stats.update(contexts)
self.logger.info(_(msg=stats, log_type='stats'))
@handler.set_ev_cls(ofp_event.EventOFPStatsReply,
handler.MAIN_DISPATCHER)
def _stats_reply_handler(self, ev):
msg = ev.msg
self._send_waited_rpc_response(msg)
if msg.type == ofproto_v1_2.OFPST_FLOW:
self._flow_stats_reply_handler(ev)
elif msg.type == ofproto_v1_2.OFPST_PORT:
self._port_stats_reply_handler(ev)
@handler.set_ev_cls(ofp_event.EventOFPPacketIn)
def _packet_in_handler(self, ev):
msg = ev.msg
dp = msg.datapath
self.logger.info(_({"event": "packet_in", "reason": msg.reason}))
if dp.ofproto.OFPR_INVALID_TTL != msg.reason:
return
if not 'in_port' in msg.match:
return
in_port = msg.match['in_port']
pkt = packet.Packet(msg.data)
if not pkt.get_protocol(ipv4.ipv4):
return
o_vlan = pkt.get_protocol(vlan.vlan)
if o_vlan is None:
return
vlan_p = vlan.vlan(vid=o_vlan.vid)
o_eth = pkt.get_protocol(ethernet.ethernet)
eth = ethernet.ethernet(o_eth.src, o_eth.dst, o_eth.ethertype)
o_ip = pkt.get_protocol(ipv4.ipv4)
# needs to set src properly for either side (vlan or mpls)
# ip = ipv4.ipv4(src=ip_lib.ipv4_to_bin(V1_GS_IP), dst=o_ip.src,
# proto=1)
try:
src_ip = self.traceroute_source[o_vlan.vid]['ip']
in_port = self.traceroute_source[o_vlan.vid]['port']
except:
self.logger.info(_({"event": "traceroute error",
"reason": "can't find ip", "vid": o_vlan.vid}))
return
ip = ipv4.ipv4(src=src_ip, dst=o_ip.src, proto=1)
ip_offset = 14 + 4
# ether + vlan headers
data = msg.data[ip_offset:ip_offset +
(o_ip.header_length * 4 + 8)]
ic = icmp.icmp(icmp.ICMP_TIME_EXCEEDED, 0, 0,
icmp.TimeExceeded(data_len=len(data), data=data))
p = packet.Packet(protocols=[eth, vlan_p, ip, ic])
p.serialize()
actions = [dp.ofproto_parser.OFPActionOutput(dp.ofproto.OFPP_TABLE, 0)]
dp.send_packet_out(in_port=in_port, actions=actions, data=p.data)
@handler.set_ev_cls(ofp_event.EventOFPPortStatus)
def _port_status_handler(self, ev):
if hasattr(ev, 'msg'):
msg = ev.msg
reason = msg.reason
datapath = msg.datapath
port = msg.desc
ofproto = datapath.ofproto
self.logger.info(_({"event": "port status change",
"reason": reason,
"port_no": port.port_no, "state": port.state},
log_type='states'))
# For now just port modifications are reported
if reason == ofproto.OFPPR_MODIFY:
params = {'port_no': port.port_no, 'port_state': port.state}
for peer in self._peers:
peer._endpoint.send_notification("port_status", [params])
def _flow_stats_loop(self, dp, table_id, match, interval, key):
while True:
if not key in self.monitored_flows:
break
msg = dp.ofproto_parser.OFPFlowStatsRequest(datapath=dp,
table_id=table_id,
match=match)
dp.send_msg(msg)
hub.sleep(interval)
def _meter_stats_loop(self, dp, interval, meter_id):
while True:
if not meter_id in self.monitored_meters:
break
msg = dp.ofproto_parser.OFPMeterStatsRequest(datapath=dp,
meter_id=meter_id)
dp.send_msg(msg)
hub.sleep(interval)
def _handle_ofprotocol(self, msgid, params):
try:
param_dict = params[0]
except:
raise RPCError('parameters are missing')
send_response = True
dp = None
if 'dpid' in param_dict:
dp = self.dpset.get(int(param_dict['dpid']))
param_dict.pop('dpid')
else:
# use the first datapath
for k, v in self.dpset.get_all():
dp = v
break
if dp is None:
self.logger.info(_({"event": "no datapath, queued",
"msg": str(param_dict)}))
raise PendingRPC()
contexts = None
ofmsg = None
# default interval
interval = 60
for k, v in param_dict.items():
if k == 'ofmsg':
try:
ofmsg = ofproto_parser.ofp_msg_from_jsondict(dp, v)
except:
raise RPCError('parameters are invalid, %s' %
(str(param_dict)))
elif k == 'interval':
interval = int(v)
elif k == 'contexts':
contexts = v
if ofmsg is None:
raise RPCError('"ofmsg" parameter is invalid, %s' %
(str(param_dict)))
if contexts is not None and not isinstance(contexts, dict):
raise RPCError('"contexts" must be dictionary, %s' %
(str(param_dict)))
if contexts is not None and interval == 0:
raise RPCError('"interval" must be non zero with "contexts", %s' %
(str(param_dict)))
dp.set_xid(ofmsg)
ofmsg.serialize()
if dp.ofproto.OFP_VERSION == ofproto_v1_2.OFP_VERSION:
msg_types = (dp.ofproto.OFPT_STATS_REQUEST,
dp.ofproto.OFPT_BARRIER_REQUEST)
else:
msg_types = (dp.ofproto.OFPT_MULTIPART_REQUEST,
dp.ofproto.OFPT_BARRIER_REQUEST)
if ofmsg.msg_type in msg_types:
dp.send_msg(ofmsg)
raise NoRPCResponse(dpid=dp.id, xid=ofmsg.xid, msgid=msgid)
result = {'xid': ofmsg.xid}
if ofmsg.msg_type is dp.ofproto.OFPT_FLOW_MOD:
if contexts is not None:
key = self.format_key(ofmsg.match.to_jsondict())
if ofmsg.command is dp.ofproto.OFPFC_ADD:
if key in self.monitored_flows:
raise RPCError('the existing flow, %s' % (str(key)))
self.monitored_flows[key] = contexts
hub.spawn(self._flow_stats_loop,
dp, ofmsg.table_id, ofmsg.match,
interval, key)
elif ofmsg.command in (dp.ofproto.OFPFC_DELETE,
dp.ofproto.OFPFC_DELETE_STRICT):
try:
del self.monitored_flows[key]
except:
raise RPCError('unknown key, %s' % (str(key)))
elif (dp.ofproto.OFP_VERSION == ofproto_v1_3.OFP_VERSION and
ofmsg.msg_type is dp.ofproto.OFPT_METER_MOD):
if contexts is not None:
if ofmsg.command is dp.ofproto.OFPMC_ADD:
if ofmsg.meter_id in self.monitored_meters:
raise RPCError('meter already exitsts %d' %
(ofmsg.meter_id))
self.monitored_meters[ofmsg.meter_id] = contexts
hub.spawn(self._meter_stats_loop,
dp, interval, ofmsg.meter_id)
elif ofmsg.command is dp.ofproto.OFPMC_DELETE:
try:
del self.monitored_meters[ofmsg.meter_id]
except:
raise RPCError('unknown meter %d' % (ofmsg.meter_id))
elif ofmsg.command is dp.ofproto.OFPMC_MODIFY:
raise RPCError('METER_MOD with contexts is not supported')
else:
raise RPCError('unknown meter_mod command')
else:
raise RPCError('unknown of message, %s' % (str(param_dict)))
dp.send_msg(ofmsg)
return result
def _register_traceroute(self, params):
try:
param_dict = params[0]
except:
raise RPCError('parameters are missing')
try:
self.traceroute_source[param_dict['vlan']] = {
'ip': param_dict['ip'],
'port': param_dict['port']
}
except Exception as e:
raise RPCError('parameters are invalid, %s' % (str(param_dict)))
self.logger.info(_({'event': 'register traceroute source',
'vlan': param_dict['vlan'],
'ip': param_dict['ip'],
'port': param_dict['port']}))
return {}
def _ofconfig_thread(self):
def _handle_edit(s, o, param_dict):
target = 'candidate'
if 'port' in param_dict:
port_id = param_dict.pop('port')
config = ofc.OFPortConfigurationType(**param_dict)
port_type = ofc.OFPortType(resource_id=port_id,
configuration=config)
r = ofc.OFCapableSwitchResourcesType(port=[port_type])
c = ofc.OFCapableSwitchType(id=o.id, resources=r)
s.edit_config(target, c)
else:
queue = param_dict.pop('queue')
if 'experimenter' in param_dict:
new_val = param_dict['experimenter']
param_dict['experimenter'] = [new_val]
prop = ofc.OFQueuePropertiesType(**param_dict)
queue_type = ofc.OFQueueType(resource_id=queue,
properties=prop)
r = ofc.OFCapableSwitchResourcesType(queue=[queue_type])
c = ofc.OFCapableSwitchType(id=o.id,
resources=r)
if 'experimenter' in param_dict:
old_val = None
for q in o.resources.queue:
if q.resource_id == queue:
old_val = q.properties.experimenter[0]
break
if old_val is None:
raise RPCError('cannot find queue, %s' % (queue))
xml = ofc.NETCONF_Config(capable_switch=c).to_xml()
key = '<of111:experimenter>'
sidx = xml.find(key)
eidx = xml.find('</of111:experimenter>')
xml = xml[:sidx] + \
'<of111:experimenter operation=\"delete\">' + \
str(old_val) + xml[eidx:]
key = '/of111:experimenter>\n'
sidx = xml.find(key) + len(key)
xml = xml[:sidx + 1] + \
'<of111:experimenter operation=\"replace\">' + \
str(new_val) + '</of111:experimenter>\n' + \
xml[sidx + 1:]
s.raw_edit_config(target, xml, None)
else:
s.edit_config(target, c)
try:
s.commit()
except Exception as e:
s.discard_changes()
raise RPCError(str(e))
return {}
def _handle_get(o):
result = {}
ports = []
for p in o.resources.port:
config = p.configuration
ports.append({'name': str(p.name),
'number': int(p.number),
'admin-state': str(config.admin_state),
'oper-state': str(p.state.oper_state)})
result['OFPort'] = ports
queues = []
for q in o.resources.queue:
p = q.properties
queues.append({'id': str(q.id),
'resource_id': str(q.resource_id),
'max-rate': str(p.max_rate),
'experimenter': str(p.experimenter)})
result['OFQueue'] = queues
return result
def _handle(param_dict):
# TODO: don't need to create a new conneciton every time.
# FIXME(KK): Nice place to put a context-manager?
s = None
attempt = 0
while attempt < CONF.ofconfig_retries:
try:
s = cs.OFCapableSwitch(host=CONF.ofconfig_address,
port=CONF.ofconfig_port,
username=CONF.ofconfig_user,
password=CONF.ofconfig_password,
unknown_host_cb=lambda h, f: True,
timeout=CONF.ofconfig_timeout)
except Exception as e:
self.logger.error(
_({"event": "ofconfig failed to connect",
"reason": "exception: {0!s}".format(e)}))
else:
break
attempt += 1
backoff_time = attempt ** 2
hub.sleep(backoff_time)
if not s:
raise RPCError('failed to connect to ofs')
o = s.get()
if len(param_dict) == 0:
return _handle_get(o)
elif 'port' or 'queue' in param_dict:
return _handle_edit(s, o, param_dict)
elif 'port' and 'queue' in param_dict:
raise RPCError('only one resource at one shot')
else:
raise RPCError('ununkown resource edit %s' % (str(param_dict)))
while True:
error = None
result = None
(peer, msgid, params) = self.ofconfig.get()
param_dict = params[0]
try:
result = _handle(param_dict)
except RPCError as e:
error = str(e)
except Exception as e:
error = str(e)
try:
peer._endpoint.send_response(msgid, error=error, result=result)
except:
self.logger.info(_({'RPC send error %s, %s' %
(error, result)}))
def _ofconfig(self, peer, msgid, params):
try:
param_dict = params[0]
except:
raise RPCError('parameters are missing')
self.ofconfig.put((peer, msgid, params))
raise NoRPCResponse()
def _monitor(self, mandatory_params, resource_dict, request_generator,
msgid, params):
try:
param_dict = params[0]
except:
raise RPCError('parameters are missing')
resource_id = None
contexts = None
interval = 60
resource_name = mandatory_params.pop(0)
for k, v in param_dict.items():
if k == resource_name:
resource_id = v
elif k == 'contexts':
contexts = v
elif k == 'interval':
interval = v
elif k in mandatory_params:
pass
else:
raise RPCError('unknown parameters, %s' % k)
if contexts is None and interval > 0:
raise RPCError('"contexts" parameter is necessary')
if contexts is not None and not isinstance(contexts, dict):
raise RPCError('"contexts" parameter must be dictionary')
if resource_id is None:
raise RPCError('"%s" parameter is necessary' % resource_name)
if interval == 0:
if resource_id in resource_dict:
del resource_dict[resource_id]
else:
raise RPCError('%s %d does not exist' % (resource_name,
resource_id))
else:
need_spawn = False
if not resource_id in resource_dict:
need_spawn = True
resource_dict[resource_id] = (contexts, interval)
if need_spawn:
pass
hub.spawn(self._monitor_thread, resource_id, resource_dict,
param_dict, request_generator)
return {}
def _port_stats_generator(self, dp, port_name, param_dict):
port_no = None
ports = self.dpset.get_ports(dp.id)
for port in ports:
if port.name == port_name:
port_no = port.port_no
break
if port_no is None:
return None
return dp.ofproto_parser.OFPPortStatsRequest(datapath=dp,
port_no=port_no)
def _monitor_port(self, msgid, params):
return self._monitor(['physical_port_no'],
self.monitored_ports,
self._port_stats_generator,
msgid, params)
def _monitor_thread(self, resource_id, resource_dict, param_dict,
generator):
while resource_id in resource_dict:
_contexts, interval = resource_dict[resource_id]
for k, dp in self.dpset.get_all():
try:
ofmsg = generator(dp, resource_id, param_dict)
if ofmsg:
dp.send_msg(ofmsg)
except:
# ignore the error due to dead datapath
pass
hub.sleep(interval)
def _queue_stats_generator(self, dp, queue_id, param_dict):
port_no = param_dict['port_no']
return dp.ofproto_parser.OFPQueueStatsRequest(datapath=dp,
port_no=port_no,
queue_id=queue_id)
def _monitor_queue(self, msgid, params):
return self._monitor(['queue_id', 'port_no'],
self.monitored_queues,
self._queue_stats_generator,
msgid, params)

View File

@ -48,7 +48,11 @@ CONF.register_cli_opts([
cfg.StrOpt('neutron-controller-addr', default=None,
help='openflow method:address:port to set controller of'
'ovs bridge',
deprecated_name='quantum-controller-addr')
deprecated_name='quantum-controller-addr'),
cfg.IntOpt('vrrp-rpc-port', default=50004,
help='port for vrrp rpc interface'),
cfg.BoolOpt('vrrp-use-vmac', default=False,
help='use virtual mac')
])
CONF.register_cli_opts([

39
ryu/lib/apgw.py Normal file
View File

@ -0,0 +1,39 @@
import json
import datetime
import time
import logging
SYSLOG_FORMAT = '%(name)s %(message)s'
class StructuredMessage(object):
COMPONENT_NAME = None
def __init__(self, msg, log_type='log', resource_id=None,
resource_name=None):
assert self.__class__.COMPONENT_NAME is not None
assert isinstance(msg, dict)
assert log_type in ('log', 'stats', 'states')
self.message = {}
cur_time = datetime.datetime.utcfromtimestamp(
time.time()).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
self.message['log_type'] = log_type
self.message['timestamp'] = cur_time
self.message['component_name'] = self.__class__.COMPONENT_NAME
self.message['msg'] = msg
log = logging.getLogger()
self.message['level'] = logging.getLevelName(log.level)
if resource_id:
self.message['resource_id'] = resource_id
if resource_name:
self.message['resource_name'] = resource_name
def __str__(self):
return json.dumps(self.message)
def update_syslog_format():
log = logging.getLogger()
for h in log.handlers:
if isinstance(h, logging.handlers.SysLogHandler):
h.setFormatter(logging.Formatter(SYSLOG_FORMAT))

View File

@ -123,5 +123,11 @@ class OFCapableSwitch(object):
def copy_config(self, source, target):
self.netconf.copy_config(source, target)
def commit(self):
self.netconf.commit()
def discard_changes(self):
self.netconf.discard_changes()
# TODO: more netconf operations
# TODO: convinience(higher level) methods

View File

@ -30,6 +30,14 @@ class OFPortConfigurationType(_Base):
]
class OFPortStateType(_Base):
_ELEMENTS = [
_e('oper-state', is_list=False),
_e('blocked', is_list=False),
_e('live', is_list=False),
]
class OFPortType(_Base):
_ELEMENTS = [
_e('resource-id', is_list=False),
@ -38,7 +46,7 @@ class OFPortType(_Base):
_e('current-rate', is_list=False),
_e('max-rate', is_list=False),
_ct('configuration', OFPortConfigurationType, is_list=False),
_ct('state', None, is_list=False),
_ct('state', OFPortStateType, is_list=False),
_ct('features', None, is_list=False),
_ct('tunnel-type', None, is_list=False),
]

View File

@ -325,7 +325,7 @@ class vrrp(packet_base.PacketBase):
self.identification &= 0xffff
return self.identification
def create_packet(self, primary_ip_address, vlan_id=None):
def create_packet(self, primary_ip_address, vlan_id=None, src_mac=None):
"""Prepare a VRRP packet.
Returns a newly created ryu.lib.packet.packet.Packet object
@ -348,8 +348,10 @@ class vrrp(packet_base.PacketBase):
traffic_class = 0xc0 # set tos to internetwork control
flow_label = 0
payload_length = ipv6.ipv6._MIN_LEN + len(self) # XXX _MIN_LEN
if src_mac is None:
src_mac = vrrp_ipv6_src_mac_address(self.vrid)
e = ethernet.ethernet(VRRP_IPV6_DST_MAC_ADDRESS,
vrrp_ipv6_src_mac_address(self.vrid),
src_mac,
ether.ETH_TYPE_IPV6)
ip = ipv6.ipv6(6, traffic_class, flow_label, payload_length,
inet.IPPROTO_VRRP, VRRP_IPV6_HOP_LIMIT,
@ -359,8 +361,10 @@ class vrrp(packet_base.PacketBase):
total_length = 0
tos = 0xc0 # set tos to internetwork control
identification = self.get_identification()
if src_mac is None:
src_mac = vrrp_ipv4_src_mac_address(self.vrid)
e = ethernet.ethernet(VRRP_IPV4_DST_MAC_ADDRESS,
vrrp_ipv4_src_mac_address(self.vrid),
src_mac,
ether.ETH_TYPE_IP)
ip = ipv4.ipv4(4, header_length, tos, total_length, identification,
0, 0, VRRP_IPV4_TTL, inet.IPPROTO_VRRP, 0,

View File

@ -32,7 +32,7 @@ CONF.register_cli_opts([
cfg.IntOpt('default-log-level', default=None, help='default log level'),
cfg.BoolOpt('verbose', default=False, help='show debug output'),
cfg.BoolOpt('use-stderr', default=True, help='log to standard error'),
cfg.BoolOpt('use-syslog', default=False, help='output to syslog'),
cfg.BoolOpt('use-syslog', default=True, help='output to syslog'),
cfg.StrOpt('log-dir', default=None, help='log file directory'),
cfg.StrOpt('log-file', default=None, help='log file name'),
cfg.StrOpt('log-file-mode', default='0644',

View File

@ -993,7 +993,7 @@ class OFPInstructionWriteMetadata(OFPInstruction):
metadata_mask Metadata write bitmask
================ ======================================================
"""
def __init__(self, metadata, metadata_mask, len_=None):
def __init__(self, metadata, metadata_mask, type_=None, len_=None):
super(OFPInstructionWriteMetadata, self).__init__()
self.type = ofproto_v1_2.OFPIT_WRITE_METADATA
self.len = ofproto_v1_2.OFP_INSTRUCTION_WRITE_METADATA_SIZE

View File

@ -744,6 +744,13 @@ OFPR_NO_MATCH = 0 # No matching flow.
OFPR_ACTION = 1 # Action explicitly output to controller.
OFPR_INVALID_TTL = 2 # Packet has invalid TTL.
# Masks used for OFPT_SET_ASYNC, OFPT_GET_ASYNC_REPLY messages.
# Used for setting packet_in_mask field.
OFPR_NO_MATCH_MASK = 1 << OFPR_NO_MATCH
OFPR_ACTION_MASK = 1 << OFPR_ACTION
OFPR_INVALID_TTL_MASK = 1 << OFPR_INVALID_TTL
# struct ofp_flow_removed
_OFP_FLOW_REMOVED_PACK_STR0 = 'QHBBIIHHQQ'
OFP_FLOW_REMOVED_PACK_STR = '!' + _OFP_FLOW_REMOVED_PACK_STR0 + \
@ -759,6 +766,15 @@ OFPRR_HARD_TIMEOUT = 1 # Time exceeded hard_timeout.
OFPRR_DELETE = 2 # Evicted by a DELETE flow mod.
OFPRR_GROUP_DELETE = 3 # Group was removed.
# Masks used for OFPT_SET_ASYNC, OFPT_GET_ASYNC_REPLY messages.
# Used for setting flow_removed_mask.
OFPRR_IDLE_TIMEOUT_MASK = 1 << OFPRR_IDLE_TIMEOUT
OFPRR_HARD_TIMEOUT_MASK = 1 << OFPRR_HARD_TIMEOUT
OFPRR_DELETE_MASK = 1 << OFPRR_DELETE
OFPRR_GROUP_DELETE_MASK = 1 << OFPRR_GROUP_DELETE
# struct ofp_port_status
OFP_PORT_STATUS_PACK_STR = '!B7x' + _OFP_PORT_PACK_STR
OFP_PORT_STATUS_DESC_OFFSET = OFP_HEADER_SIZE + 8
@ -771,6 +787,12 @@ OFPPR_ADD = 0 # The port was added.
OFPPR_DELETE = 1 # The port was removed.
OFPPR_MODIFY = 2 # Some attribute of the port has changed.
# Masks used for OFPT_SET_ASYNC, OFPT_GET_ASYNC_REPLY messages.
# Used for setting port_status_mask.
OFPPR_ADD_MASK = 1 << OFPPR_ADD
OFPPR_DELETE_MASK = 1 << OFPPR_DELETE
OFPPR_MODIFY_MASK = 1 << OFPPR_MODIFY
# OFPMP_EXPERIMENTER
# struct onf_experimenter_multipart_msg
# (experimenter == ONF_EXPERIMENTER_ID)

View File

@ -3357,7 +3357,8 @@ class OFPMeterMod(MsgBase):
OFPMeterBandExperimenter
================ ======================================================
"""
def __init__(self, datapath, command, flags, meter_id, bands):
def __init__(self, datapath, command=ofproto_v1_3.OFPMC_ADD,
flags=ofproto_v1_3.OFPMF_KBPS, meter_id=1, bands=[]):
super(OFPMeterMod, self).__init__(datapath)
self.command = command
self.flags = flags

View File

@ -120,7 +120,8 @@ class VRRPConfig(object):
priority=vrrp.VRRP_PRIORITY_BACKUP_DEFAULT, ip_addresses=None,
advertisement_interval=vrrp.VRRP_MAX_ADVER_INT_DEFAULT_IN_SEC,
preempt_mode=True, preempt_delay=0, accept_mode=False,
statistics_interval=30, resource_id=None):
statistics_interval=30, resource_id=None,
use_virtual_mac=False):
# To allow version and priority default
assert vrid is not None
assert ip_addresses is not None
@ -138,6 +139,7 @@ class VRRPConfig(object):
self.is_ipv6 = vrrp.is_ipv6(ip_addresses[0])
self.statistics_interval = statistics_interval
self.resource_id = resource_id
self.use_virtual_mac = use_virtual_mac
@property
def address_owner(self):

View File

@ -17,6 +17,7 @@
import contextlib
import socket
import struct
import fcntl
from ryu.controller import handler
from ryu.ofproto import ether
@ -69,6 +70,11 @@ class VRRPInterfaceMonitorNetworkDevice(monitor.VRRPInterfaceMonitor):
# socket module doesn't define IPPROTO_VRRP
self.ip_socket = socket.socket(family, socket.SOCK_RAW,
inet.IPPROTO_VRRP)
if self.config.use_virtual_mac is False:
i = fcntl.ioctl(self.ip_socket.fileno(), 0x8927,
struct.pack('256s',
self.interface.device_name[:15]))
mac_address = addrconv.mac.bin_to_text(i[18:24])
self.packet_socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
socket.htons(ether_type))
@ -78,6 +84,7 @@ class VRRPInterfaceMonitorNetworkDevice(monitor.VRRPInterfaceMonitor):
addrconv.mac.text_to_bin(mac_address)))
self.ifindex = if_nametoindex(self.interface.device_name)
self.config.src_mac = mac_address
def start(self):
# discard received packets before joining multicast membership
@ -104,10 +111,7 @@ class VRRPInterfaceMonitorNetworkDevice(monitor.VRRPInterfaceMonitor):
# multicast are aligned in the same way on all the archtectures.
def _join_multicast_membership(self, join_leave):
config = self.config
if config.is_ipv6:
mac_address = vrrp.vrrp_ipv6_src_mac_address(config.vrid)
else:
mac_address = vrrp.vrrp_ipv4_src_mac_address(config.vrid)
mac_address = self.config.src_mac
if join_leave:
add_drop = PACKET_ADD_MEMBERSHIP
else:

View File

@ -26,11 +26,16 @@ from ryu.base import app_manager
from ryu.controller import event
from ryu.controller import handler
from ryu.lib import hub
from ryu.lib import apgw
from ryu.lib.packet import vrrp
from ryu.services.protocols.vrrp import event as vrrp_event
from ryu.services.protocols.vrrp import api as vrrp_api
_ = type('', (apgw.StructuredMessage,), {})
_.COMPONENT_NAME = 'vrrp'
# TODO: improve Timer service and move it into framework
class Timer(object):
def __init__(self, handler_):
@ -139,6 +144,7 @@ class VRRPRouter(app_manager.RyuApp):
_EVENTS = [vrrp_event.EventVRRPStateChanged]
_CONSTRUCTORS = {}
_STATE_MAP = {} # should be overrided by concrete class
LOGGER_NAME = 'vrrp'
@staticmethod
def register(version):
@ -215,7 +221,7 @@ class VRRPRouter(app_manager.RyuApp):
# create packet frame each time to generate new ip identity
interface = self.interface
packet_ = vrrp_.create_packet(interface.primary_ip_address,
interface.vlan_id)
interface.vlan_id, self.config.src_mac)
packet_.serialize()
vrrp_api.vrrp_transmit(self, self.monitor_name, packet_.data)
self.statistics.tx_vrrp_packets += 1
@ -271,8 +277,8 @@ class VRRPRouter(app_manager.RyuApp):
@handler.set_ev_handler(_EventStatisticsOut)
def statistics_handler(self, ev):
# sends stats to somewhere here
# print self.statistics.get_stats()
stats = self.statistics.get_stats()
self.logger.info(_(msg=stats, log_type='stats'))
self.stats_out_timer.start(self.statistics.statistics_interval)
# RFC defines that start timer, then change the state.

View File

@ -24,13 +24,12 @@ from ryu.services.protocols.vrrp import api as vrrp_api
from ryu.lib import rpc
from ryu.lib import hub
from ryu.lib import mac
from ryu.lib import apgw
VRRP_RPC_PORT = 50004 # random
CONF = cfg.CONF
CONF.register_opts([
cfg.IntOpt('vrrp-rpc-port', default=VRRP_RPC_PORT,
help='port for vrrp rpc interface')])
_ = type('', (apgw.StructuredMessage,), {})
_.COMPONENT_NAME = 'vrrp'
class RPCError(Exception):
@ -47,6 +46,8 @@ class Peer(object):
class RpcVRRPManager(app_manager.RyuApp):
LOGGER_NAME = 'vrrp'
def __init__(self, *args, **kwargs):
super(RpcVRRPManager, self).__init__(*args, **kwargs)
self._args = args
@ -55,6 +56,7 @@ class RpcVRRPManager(app_manager.RyuApp):
self._rpc_events = hub.Queue(128)
self.server_thread = hub.spawn(self._peer_accept_thread)
self.event_thread = hub.spawn(self._rpc_request_loop_thread)
apgw.update_syslog_format()
def _rpc_request_loop_thread(self):
while True:
@ -63,6 +65,9 @@ class RpcVRRPManager(app_manager.RyuApp):
error = None
result = None
try:
self.logger.info(_(msg={'msgid': msgid,
'target_method': target_method,
'params': params}))
if target_method == "vrrp_config":
result = self._config(msgid, params)
elif target_method == "vrrp_list":
@ -73,6 +78,9 @@ class RpcVRRPManager(app_manager.RyuApp):
error = 'Unknown method %s' % (target_method)
except RPCError as e:
error = str(e)
self.logger.info(_(msg={'msgid': msgid,
'error': error,
'result': result}))
peer._endpoint.send_response(msgid, error=error, result=result)
def _peer_loop_thread(self, peer):
@ -102,15 +110,22 @@ class RpcVRRPManager(app_manager.RyuApp):
return d
def _config(self, msgid, params):
self.logger.debug('handle vrrp_config request')
try:
param_dict = params[0]
except:
raise RPCError('parameters are missing')
if_params = self._params_to_dict(param_dict,
('primary_ip_address',
'device_name'))
('ip_address',
'ifname'))
try:
if_params['primary_ip_address'] = if_params.pop('ip_address')
except:
raise RPCError('ip_addr parameter is missing')
try:
if_params['device_name'] = if_params.pop('ifname')
except:
raise RPCError('ifname parameter is missing')
# drop vlan support later
if_params['vlan_id'] = None
if_params['mac_address'] = mac.DONTCARE_STR
@ -121,7 +136,7 @@ class RpcVRRPManager(app_manager.RyuApp):
config_params = self._params_to_dict(param_dict,
('vrid', # mandatory
'ip_addresses', # mandatory
'ip_addr', # mandatory
'version',
'admin_state',
'priority',
@ -129,6 +144,12 @@ class RpcVRRPManager(app_manager.RyuApp):
'preempt_mode',
'preempt_delay',
'statistics_interval'))
if CONF.vrrp_use_vmac:
config_params.update({'use_virtual_mac': True})
try:
config_params['ip_addresses'] = [config_params.pop('ip_addr')]
except:
raise RPCError('ip_addr parameter is missing')
try:
config = vrrp_event.VRRPConfig(**config_params)
except:
@ -149,7 +170,6 @@ class RpcVRRPManager(app_manager.RyuApp):
return None
def _config_change(self, msgid, params):
self.logger.debug('handle vrrp_config_change request')
try:
config_values = params[0]
except:
@ -167,7 +187,6 @@ class RpcVRRPManager(app_manager.RyuApp):
return {}
def _list(self, msgid, params):
self.logger.debug('handle vrrp_list request')
result = vrrp_api.vrrp_list(self)
instance_list = result.instance_list
ret_list = []
@ -186,13 +205,11 @@ class RpcVRRPManager(app_manager.RyuApp):
@handler.set_ev_cls(vrrp_event.EventVRRPStateChanged)
def vrrp_state_changed_handler(self, ev):
self.logger.info('handle EventVRRPStateChanged')
name = ev.instance_name
old_state = ev.old_state
new_state = ev.new_state
vrid = ev.config.vrid
self.logger.info('VRID:%s %s: %s -> %s', vrid, name, old_state,
new_state)
params = {'vrid': vrid, 'old_state': old_state, 'new_state': new_state}
self.logger.critical(_(msg=params))
for peer in self._peers:
peer._endpoint.send_notification("notify_status", [params])

View File

@ -0,0 +1,777 @@
import unittest
import logging
import socket
from nose.tools import eq_
from nose.tools import raises
from ryu.controller import api
from ryu.controller import dpset
from ryu.ofproto.ofproto_parser import MsgBase
from ryu.ofproto import ether
from ryu.ofproto import inet
from ryu.ofproto import ofproto_parser
from ryu.ofproto import ofproto_v1_2
from ryu.ofproto import ofproto_v1_2_parser
from ryu.ofproto import ofproto_v1_3
from ryu.ofproto import ofproto_v1_3_parser
from ryu.controller import ofp_event
from ryu.lib import hub
from ryu.lib import rpc
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet
from ryu.lib.packet import vlan
from ryu.lib.packet import ipv4
from ryu.lib.packet import icmp
class DummyEndpoint(object):
def __init__(self):
self.response = []
self.notification = []
def send_response(self, msgid, error, result):
self.response.append((msgid, error, result))
def send_notification(self, method, params):
self.notification.append((method, params))
class DummyDatapath(object):
def __init__(self, ofp=None, ofpp=None):
if ofp is None:
ofp = ofproto_v1_2
self.ofproto = ofp
if ofpp is None:
ofpp = ofproto_v1_2_parser
self.ofproto_parser = ofpp
self.port_state = {}
self.ports = {}
self.sent = []
def set_xid(self, msg):
msg.set_xid(1)
def send_msg(self, msg):
assert isinstance(msg, MsgBase)
msg.serialize()
self.sent.append(msg)
def send_packet_out(self, in_port, actions, data):
self.sent.append((in_port, actions, data))
class TestRpcOFPManager(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_logger(self):
_ = api._
m = {'hello': 'world'}
r = eval(str(_(m)))
eq_(r['msg'], m)
eq_(r['component_name'], _.COMPONENT_NAME)
eq_(r['log_type'], 'log')
def test_monitor_port(self):
m = api.RpcOFPManager(dpset=None)
msgid = 1
try:
m._monitor_port(msgid, {})
except api.RPCError as e:
pass
port_name = 'OFP11'
interval = 10
try:
m._monitor_port(msgid, {'physical_port_no': port_name})
except api.RPCError as e:
pass
contents = {'hoge': 'jail'}
r = m._monitor_port(msgid, [{'physical_port_no': port_name,
'contexts': contents,
'interval': interval}])
eq_(r, {})
eq_(m.monitored_ports[port_name], (contents, interval))
def test_monitor_queue(self):
m = api.RpcOFPManager(dpset=None)
msgid = 1
try:
m._monitor_queue(msgid, {})
except api.RPCError as e:
pass
queue_id = 10
port_no = 10
interval = 10
try:
m._monitor_queue(msgid, {'queue_id': queue_id, 'port_no': port_no})
except api.RPCError as e:
pass
contents = {'hoge': 'jail'}
r = m._monitor_queue(msgid, [{'queue_id': queue_id,
'port_no': port_no,
'contexts': contents,
'interval': interval}])
eq_(r, {})
eq_(m.monitored_queues[queue_id], (contents, interval))
def test_register_traceroute(self):
m = api.RpcOFPManager(dpset=None)
msgid = 1
try:
m._register_traceroute([{}])
except api.RPCError as e:
pass
try:
m._register_traceroute([{'vlan': 1}])
except api.RPCError as e:
pass
vlan_id = 1
port_no = 10
m._register_traceroute([{'vlan': vlan_id,
'ip': '192.168.1.1',
'port': port_no}])
def test_handle_ofprotocol_without_dp(self):
m = api.RpcOFPManager(dpset=dpset.DPSet())
msgid = 1
try:
m._handle_ofprotocol(msgid, [{}])
except api.PendingRPC:
pass
try:
m._handle_ofprotocol(msgid, [{'dpid': 1}])
except api.PendingRPC:
pass
def _create_dpset(self, dpid=0, ports=None, ofp=None, ofpp=None):
dps = dpset.DPSet()
dp = DummyDatapath(ofp=ofp, ofpp=ofpp)
dp.id = dpid
if ports:
class DummyPort(object):
def __init__(self, port_no):
self.port_no = port_no
dps.ports = map(lambda n: DummyPort(n), ports)
dps.get_ports = lambda dpid: dps.ports
dps._register(dp)
return dps
def _test_handle_ofprotocol_flowmod(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
m = api.RpcOFPManager(dpset=dps)
msgid = 1
nr_sent = 0
try:
m._handle_ofprotocol(msgid, [{'dpid': dpid}])
except api.RPCError:
pass
dp = dps.get(dpid)
match = ofpp.OFPMatch(metadata=(100, 63),
eth_type=2048,
ip_proto=112,
ipv4_src='172.17.45.1',
ipv4_dst='224.0.0.18')
inst = [ofpp.OFPInstructionActions(
ofp.OFPIT_APPLY_ACTIONS, [ofpp.OFPActionPopVlan()]),
ofpp.OFPInstructionGotoTable(35),
ofpp.OFPInstructionWriteMetadata(100, 0x255)]
ofmsg = ofpp.OFPFlowMod(datapath=dp, match=match, instructions=inst)
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict()}])
nr_sent += 1
eq_(r, {'xid': 1})
eq_(len(dp.sent), nr_sent)
contexts = {'hello': 'world'}
r = m._handle_ofprotocol(msgid, [{'ofmsg': ofmsg.to_jsondict(),
'internal': 30,
'contexts': contexts}])
nr_sent += 1
eq_(r, {'xid': 1})
eq_(len(dp.sent), nr_sent)
eq_(len(m.monitored_flows), 1)
eq_(m.monitored_flows[m.format_key(match.to_jsondict())],
contexts)
ofmsg = ofpp.OFPFlowMod(datapath=dp,
command=ofp.OFPFC_DELETE,
match=match)
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
nr_sent += 1
eq_(r, {'xid': 1})
eq_(len(dp.sent), nr_sent)
try:
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
except api.RPCError as e:
assert 'unknown key' in str(e)
eq_(len(dp.sent), nr_sent)
def test_handle_ofprotocol_flowmod_12(self):
self._test_handle_ofprotocol_flowmod(ofproto_v1_2, ofproto_v1_2_parser)
def test_handle_ofprotocol_flowmod_13(self):
self._test_handle_ofprotocol_flowmod(ofproto_v1_3, ofproto_v1_3_parser)
def _test_handle_ofprotocol_meter_mod(self, ofp, ofpp):
dpid = 10
msgid = 1
nr_sent = 0
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
m = api.RpcOFPManager(dpset=dps)
dp = dps.get(dpid)
bands = [ofpp.OFPMeterBandDrop(10, 100)]
meter_id = 10
ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_ADD,
ofp.OFPMF_KBPS, meter_id, bands)
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict()}])
nr_sent += 1
eq_(r, {'xid': 1})
eq_(len(dp.sent), nr_sent)
contexts = {'hello': 'world'}
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
nr_sent += 1
eq_(r, {'xid': 1})
eq_(len(dp.sent), nr_sent)
eq_(len(m.monitored_meters), 1)
eq_(m.monitored_meters[meter_id], contexts)
ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_DELETE, ofp.OFPMF_KBPS,
meter_id)
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
nr_sent += 1
eq_(r, {'xid': 1})
eq_(len(dp.sent), nr_sent)
eq_(len(m.monitored_meters), 0)
def test_handle_ofprotocol_meter_mod_13(self):
self._test_handle_ofprotocol_meter_mod(ofproto_v1_3,
ofproto_v1_3_parser)
def _test_handle_ofprotocol(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
m = api.RpcOFPManager(dpset=dps)
msgid = 1
nr_sent = 0
dp = dps.get(dpid)
ofmsg = ofpp.OFPBarrierRequest(datapath=dp)
nr_sent += 1
try:
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict()}])
except api.NoRPCResponse as e:
eq_(e.dpid, dpid)
eq_(e.xid, 1)
eq_(e.msgid, msgid)
eq_(len(dp.sent), nr_sent)
ofmsg = ofpp.OFPFlowStatsRequest(datapath=dp)
nr_sent += 1
try:
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict()}])
except api.NoRPCResponse as e:
eq_(e.dpid, dpid)
eq_(e.xid, 1)
eq_(e.msgid, msgid)
eq_(len(dp.sent), nr_sent)
ofmsg = ofpp.OFPHello(datapath=dp)
try:
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict()}])
except api.RPCError as e:
assert 'unknown of message' in str(e)
eq_(len(dp.sent), nr_sent)
def test_handle_ofprotocol_12(self):
self._test_handle_ofprotocol(ofproto_v1_2, ofproto_v1_2_parser)
def test_handle_ofprotocol_13(self):
self._test_handle_ofprotocol(ofproto_v1_3, ofproto_v1_3_parser)
def _test_port_status_handler(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
dp = dps.get(dpid)
port = ofpp.OFPPort(1, 'aa:aa:aa:aa:aa:aa', 'hoge', 0, 0, 0,
0, 0, 0, 0, 0)
msg = ofpp.OFPPortStatus(datapath=dp, reason=ofp.OFPPR_MODIFY,
desc=port)
ev = ofp_event.EventOFPPortStatus(msg)
m = api.RpcOFPManager(dpset=dps)
peer = api.Peer(None)
peer._endpoint = DummyEndpoint()
m._peers.append(peer)
m._port_status_handler(ev)
eq_(len(peer._endpoint.notification), 1)
(method, params) = peer._endpoint.notification[0]
eq_(method, 'port_status')
eq_(params[0], {'port_no': port.port_no, 'port_state': port.state})
def test_port_status_handler_12(self):
self._test_port_status_handler(ofproto_v1_2, ofproto_v1_2_parser)
def test_port_status_handler_13(self):
self._test_port_status_handler(ofproto_v1_3, ofproto_v1_3_parser)
def _test_packet_in_handler(self, ofp, ofpp):
dpid = 10
src_ip = '10.0.0.1'
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
dp = dps.get(dpid)
msg = ofpp.OFPPacketIn(datapath=dp)
ev = ofp_event.EventOFPPacketIn(msg)
in_port = 10
vlan_id = 100
target_ip = '192.168.1.1'
msg.match = ofpp.OFPMatch(in_port=in_port)
protocols = [ethernet.ethernet(ethertype=ether.ETH_TYPE_8021Q),
vlan.vlan(vid=vlan_id), ipv4.ipv4(src=target_ip)]
p = packet.Packet(protocols=protocols)
p.serialize()
msg.data = p.data
msg.reason = ofp.OFPR_INVALID_TTL
m = api.RpcOFPManager(dpset=dps)
port_no = 5
msgid = 7
# test the failure case
m._packet_in_handler(ev)
m._register_traceroute([{'vlan': vlan_id,
'ip': src_ip,
'port': port_no}])
m._packet_in_handler(ev)
(in_port, actions, data) = dp.sent[0]
eq_(in_port, port_no)
pkt = packet.Packet(data)
ip = pkt.get_protocol(ipv4.ipv4)
v = pkt.get_protocol(vlan.vlan)
ic = pkt.get_protocol(icmp.icmp)
eq_(ip.src, src_ip)
eq_(ip.dst, target_ip)
eq_(ip.proto, inet.IPPROTO_ICMP)
eq_(v.vid, vlan_id)
eq_(ic.type, icmp.ICMP_TIME_EXCEEDED)
eq_(len(actions), 1)
eq_(actions[0].port, ofp.OFPP_TABLE)
def test_packet_in_handler_12(self):
self._test_packet_in_handler(ofproto_v1_2, ofproto_v1_2_parser)
def test_packet_in_handler_13(self):
self._test_packet_in_handler(ofproto_v1_3, ofproto_v1_3_parser)
def _test_handler_datapath(self, ofp, parser):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=parser)
dp = dps.get(dpid)
ev = dpset.EventDP(dp=dp, enter_leave=True)
m = api.RpcOFPManager(dpset=dps)
peer = api.Peer(None)
peer._endpoint = DummyEndpoint()
m._peers.append(peer)
m._handler_datapath(ev)
eq_(len(peer._endpoint.notification), 1)
(method, params) = peer._endpoint.notification[0]
eq_(method, 'state')
eq_(params[0], {'secure_channel_state': 'Up'})
peer._endpoint.notification.pop()
ev = dpset.EventDP(dp=dp, enter_leave=False)
m._handler_datapath(ev)
eq_(len(peer._endpoint.notification), 1)
(method, params) = peer._endpoint.notification[0]
eq_(method, 'state')
eq_(params[0], {'secure_channel_state': 'Down'})
eq_(len(dp.sent), 1)
return dp
def test_handler_datapath_12(self):
dp = self._test_handler_datapath(ofproto_v1_2, ofproto_v1_2_parser)
msg = dp.sent[0]
eq_(msg.__class__, ofproto_v1_2_parser.OFPSetConfig)
eq_(msg.flags, ofproto_v1_2.OFPC_INVALID_TTL_TO_CONTROLLER)
def test_handler_datapath_13(self):
ofp = ofproto_v1_3
dp = self._test_handler_datapath(ofp, ofproto_v1_3_parser)
msg = dp.sent[0]
eq_(msg.__class__, ofproto_v1_3_parser.OFPSetAsync)
eq_(msg.packet_in_mask,
[1 << ofp.OFPR_ACTION | 1 << ofp.OFPR_INVALID_TTL, 0])
eq_(msg.port_status_mask,
[(1 << ofp.OFPPR_ADD | 1 << ofp.OFPPR_DELETE |
1 << ofp.OFPPR_MODIFY), 0])
def _test_port_status_thread(self, ofp, ofpp):
dpid = 10
port_no = 1
port_name = 'OFP11'
dps = self._create_dpset(dpid, ports=(port_no,), ofp=ofp, ofpp=ofpp)
dp = dps.get(dpid)
m = api.RpcOFPManager(dpset=dps)
p = dps.get_ports(dpid)
p[0].name = port_name
threads = []
m.monitored_ports[port_name] = ({}, 1)
with hub.Timeout(2):
threads.append(hub.spawn(m._monitor_thread, port_name,
m.monitored_ports, {},
m._port_stats_generator))
hub.sleep(0.5)
for t in threads:
hub.kill(t)
hub.joinall(threads)
assert len(dp.sent)
for m in dp.sent:
eq_(m.__class__, ofpp.OFPPortStatsRequest)
eq_(m.port_no, port_no)
def test_port_status_thread_12(self):
self._test_port_status_thread(ofproto_v1_2, ofproto_v1_2_parser)
def test_port_status_thread_13(self):
self._test_port_status_thread(ofproto_v1_3, ofproto_v1_3_parser)
def _test_queue_status_thread(self, ofp, ofpp):
dpid = 10
queue_id = 2
port_no = 16
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
dp = dps.get(dpid)
m = api.RpcOFPManager(dpset=dps)
p = dps.get_ports(dpid)
threads = []
m.monitored_queues[queue_id] = ({}, 1)
with hub.Timeout(2):
threads.append(hub.spawn(m._monitor_thread, queue_id,
m.monitored_queues, {'port_no': port_no},
m._queue_stats_generator))
hub.sleep(0.5)
for t in threads:
hub.kill(t)
hub.joinall(threads)
assert len(dp.sent)
for m in dp.sent:
eq_(m.__class__, ofpp.OFPQueueStatsRequest)
eq_(m.port_no, port_no)
eq_(m.queue_id, queue_id)
def test_queue_status_thread_12(self):
self._test_queue_status_thread(ofproto_v1_2, ofproto_v1_2_parser)
def test_queue_status_thread_13(self):
self._test_queue_status_thread(ofproto_v1_3, ofproto_v1_3_parser)
def _test_flow_stats_loop(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
dp = dps.get(dpid)
m = api.RpcOFPManager(dpset=dps)
match = ofpp.OFPMatch(in_port=1)
ofmsg = ofpp.OFPFlowMod(datapath=dp, match=match)
contexts = {'hello': 'you'}
msgid = 10
r = m._handle_ofprotocol(msgid, [{'ofmsg': ofmsg.to_jsondict(),
'internal': 30,
'contexts': contexts}])
threads = []
key = m.format_key(ofmsg.match.to_jsondict())
with hub.Timeout(5):
threads.append(hub.spawn(m._flow_stats_loop,
dp, ofmsg.table_id, match, 0.1, key))
hub.sleep(0.5)
ofmsg = ofpp.OFPFlowMod(datapath=dp,
command=ofp.OFPFC_DELETE,
match=match)
msgid = 11
r = m._handle_ofprotocol(msgid, [{'ofmsg': ofmsg.to_jsondict(),
'internal': 30,
'contexts': contexts}])
eq_(len(m.monitored_flows), 0)
hub.joinall(threads)
for m in dp.sent:
if m.__class__ in (ofpp.OFPFlowMod, ofpp.OFPPortStatsRequest):
continue
eq_(m.__class__, ofpp.OFPFlowStatsRequest)
eq_(m.table_id, ofmsg.table_id)
eq_(m.match.to_jsondict(), match.to_jsondict())
def test_flow_stats_loop_12(self):
self._test_flow_stats_loop(ofproto_v1_2, ofproto_v1_2_parser)
def test_flow_stats_loop_13(self):
self._test_flow_stats_loop(ofproto_v1_3, ofproto_v1_3_parser)
def _test_meter_stats_loop(self, ofp, ofpp):
dpid = 10
msgid = 1
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
dp = dps.get(dpid)
m = api.RpcOFPManager(dpset=dps)
bands = [ofpp.OFPMeterBandDrop(10, 100)]
meter_id = 10
ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_ADD,
ofp.OFPMF_KBPS, meter_id, bands)
contexts = {'hello': 'world'}
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
threads = []
with hub.Timeout(5):
threads.append(hub.spawn(m._meter_stats_loop,
dp, 0.1, meter_id))
hub.sleep(0.5)
ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_DELETE, ofp.OFPMF_KBPS,
meter_id)
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
eq_(len(m.monitored_meters), 0)
hub.joinall(threads)
for m in dp.sent:
if m.__class__ in (ofpp.OFPMeterMod, ofpp.OFPPortStatsRequest):
continue
eq_(m.__class__, ofpp.OFPMeterStatsRequest)
eq_(m.meter_id, ofmsg.meter_id)
def test_meter_stats_loop_13(self):
self._test_meter_stats_loop(ofproto_v1_3, ofproto_v1_3_parser)
def _test_rpc_message_thread(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
dp = dps.get(dpid)
m = api.RpcOFPManager(dpset=dps)
peer = api.Peer(m._rpc_events)
peer._endpoint = DummyEndpoint()
m._peers.append(peer)
msgid = 7
ofmsg = ofpp.OFPBarrierRequest(datapath=dp)
params = {'dpid': dpid,
'ofmsg': ofmsg.to_jsondict()}
data = (msgid, 'ofp', [params])
with hub.Timeout(2):
peer._handle_rpc_request(data)
hub.sleep(0.5)
for s in dp.sent:
if s.__class__ in (ofpp.OFPPortStatsRequest,):
continue
eq_(s.__class__, ofpp.OFPBarrierRequest)
msg = ofpp.OFPBarrierReply(datapath=dp)
msg.set_xid(1)
ev = ofp_event.EventOFPBarrierReply(msg)
m._barrier_reply_handler(ev)
eq_(len(peer._endpoint.response), 1)
rsp = peer._endpoint.response.pop()
eq_(rsp[0], msgid)
eq_(rsp[1], None)
eq_(rsp[2], msg.to_jsondict())
eq_(len(peer.wait_for_ofp_resepnse[dpid]), 0)
m._barrier_reply_handler(ev)
eq_(len(peer._endpoint.response), 0)
ev = dpset.EventDP(dp=dp, enter_leave=False)
m._handler_datapath(ev)
eq_(len(peer.wait_for_ofp_resepnse), 0)
# bogus RPC
with hub.Timeout(2):
m._rpc_events.put((peer, rpc.MessageType.REQUEST,
(msgid, 'you')))
hub.sleep(0.5)
def test_rpc_message_thread_12(self):
self._test_rpc_message_thread(ofproto_v1_2, ofproto_v1_2_parser)
def test_rpc_message_thread_13(self):
self._test_rpc_message_thread(ofproto_v1_3, ofproto_v1_3_parser)
def _create_port_stats_args(self, port_no=0):
return {'port_no': port_no, 'rx_packets': 10, 'tx_packets': 30,
'rx_bytes': 1024, 'tx_bytes': 2048,
'rx_dropped': 0, 'tx_dropped': 1,
'rx_errors': 0, 'tx_errors': 0,
'rx_frame_err': 0, 'rx_over_err': 0, 'rx_crc_err': 0,
'collisions': 0}
def _test_stats_reply_port_handler(self, ofp, ofpp):
dpid = 10
dps = self._create_dpset(dpid, (0, 1, 2), ofp, ofpp)
dp = dps.get(dpid)
m = api.RpcOFPManager(dpset=dps)
msgid = 1
r = m._monitor_port(msgid, [{'physical_port_no': 0,
'contexts': {'hello': 'world'},
'interval': 30}])
return m, dp
def test_stats_reply_port_handler_12(self):
ofp = ofproto_v1_2
ofpp = ofproto_v1_2_parser
manager, dp = self._test_stats_reply_port_handler(ofp, ofpp)
msg = ofpp.OFPStatsReply(datapath=dp, type_=ofp.OFPST_PORT)
ev = ofp_event.EventOFPStatsReply(msg)
msg.body = []
for p in range(2):
port = ofpp.OFPPortStats(**self._create_port_stats_args(p))
msg.body.append(port)
manager._stats_reply_handler(ev)
def test_stats_reply_port_handler_13(self):
ofp = ofproto_v1_3
ofpp = ofproto_v1_3_parser
manager, dp = self._test_stats_reply_port_handler(ofp, ofpp)
msg = ofpp.OFPPortStatsReply(dp)
ev = ofp_event.EventOFPPortStatsReply(msg)
msg.body = []
for p in range(2):
d = self._create_port_stats_args(p)
d['duration_sec'] = 0
d['duration_nsec'] = 0
port = ofpp.OFPPortStats(**d)
msg.body.append(port)
manager._port_stats_reply_handler(ev)
def _test_stats_reply_flow_handler(self, ofp, ofpp):
dpid = 10
msgid = 9
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
m = api.RpcOFPManager(dpset=dps)
dp = dps.get(dpid)
match = ofpp.OFPMatch(in_port=1)
ofmsg = ofpp.OFPFlowMod(datapath=dp, match=match)
contexts = {'hello': 'world'}
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
return m, dp
def test_stats_reply_flow_handler_12(self):
ofp = ofproto_v1_2
ofpp = ofproto_v1_2_parser
manager, dp = self._test_stats_reply_flow_handler(ofp, ofpp)
msg = ofpp.OFPStatsReply(datapath=dp,
type_=ofp.OFPST_FLOW)
ev = ofp_event.EventOFPStatsReply(msg)
s1 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10,
priority=0, idle_timeout=0, hard_timeout=0,
cookie=0, packet_count=10, byte_count=100,
match=ofpp.OFPMatch(in_port=1))
# not-monitored flow
s2 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10,
priority=0, idle_timeout=0, hard_timeout=0,
cookie=0, packet_count=10, byte_count=100,
match=ofpp.OFPMatch(in_port=2))
msg.body = [s1, s2]
manager._stats_reply_handler(ev)
def test_stats_reply_flow_handler_13(self):
ofp = ofproto_v1_3
ofpp = ofproto_v1_3_parser
manager, dp = self._test_stats_reply_flow_handler(ofp, ofpp)
msg = ofpp.OFPFlowStatsReply(datapath=dp)
ev = ofp_event.EventOFPStatsReply(msg)
s1 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10,
priority=0, idle_timeout=0, hard_timeout=0,
cookie=0, packet_count=10, byte_count=100,
match=ofpp.OFPMatch(in_port=1))
# not-monitored flow
s2 = ofpp.OFPFlowStats(table_id=0, duration_sec=10, duration_nsec=10,
priority=0, idle_timeout=0, hard_timeout=0,
cookie=0, packet_count=10, byte_count=100,
match=ofpp.OFPMatch(in_port=2))
msg.body = [s1, s2]
manager._flow_stats_reply_handler(ev)
def test_stats_reply_meter_handler_13(self):
ofp = ofproto_v1_3
ofpp = ofproto_v1_3_parser
dpid = 10
msgid = 9
dps = self._create_dpset(dpid, ofp=ofp, ofpp=ofpp)
m = api.RpcOFPManager(dpset=dps)
dp = dps.get(dpid)
bands = [ofpp.OFPMeterBandDrop(10, 100)]
meter_id = 10
ofmsg = ofpp.OFPMeterMod(dp, ofp.OFPMC_ADD,
ofp.OFPMF_KBPS, meter_id, bands)
contexts = {'hello': 'world'}
r = m._handle_ofprotocol(msgid, [{'dpid': dpid,
'ofmsg': ofmsg.to_jsondict(),
'contexts': contexts}])
msg = ofpp.OFPMeterStatsReply(datapath=dp)
ev = ofp_event.EventOFPStatsReply(msg)
s = ofpp.OFPMeterStats(meter_id=meter_id, flow_count=10,
packet_in_count=10, byte_in_count=10,
duration_sec=10, duration_nsec=10,
band_stats=[ofpp.OFPMeterBandStats(1, 8),
ofpp.OFPMeterBandStats(2, 16)])
msg.body = [s]
m._meter_stats_reply_handler(ev)

View File

@ -0,0 +1,107 @@
# Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from nose.tools import eq_
from nose.tools import raises
from ryu.lib import hub
from ryu.lib import rpc
from ryu.services.protocols.vrrp.rpc_manager import RpcVRRPManager, Peer
from ryu.services.protocols.vrrp import event as vrrp_event
class DummyEndpoint(object):
def __init__(self):
self.response = []
self.notification = []
def send_response(self, msgid, error, result):
self.response.append((msgid, error, result))
def send_notification(self, method, params):
self.notification.append((method, params))
class TestVRRP(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_rpc_request(self):
rm = RpcVRRPManager()
sent_events = []
rm.send_event = lambda name, ev: sent_events.append((name, ev))
peer = Peer(queue=rm._rpc_events)
peer._endpoint = DummyEndpoint()
rm._peers.append(peer)
msgid = 10
params = {}
with hub.Timeout(2):
peer._handle_vrrp_request((msgid, 'vrrp_list', [params]))
hub.sleep(0.1)
eq_(len(sent_events), 1)
req = sent_events.pop()[1]
eq_(req.__class__, vrrp_event.EventVRRPListRequest)
req.reply_q.put(vrrp_event.EventVRRPListReply([]))
hub.sleep(0.1)
(msgid_, error, result) = peer._endpoint.response.pop()
eq_(error, None)
eq_(result, [])
params = {'vrid': 1}
with hub.Timeout(2):
peer._handle_vrrp_request((msgid, 'vrrp_config', [params]))
hub.sleep(0.1)
msgid_, error, result = peer._endpoint.response.pop()
eq_(result, None)
params = {'version': 3,
'vrid': 1,
'ip_addr': '192.168.1.1',
'contexts': {'resource_id': 'XXX',
'resource_name': 'vrrp_session'},
'statistics_log_enabled': True,
'statistics_interval': 10,
'priority': 100,
'ifname': 'veth0',
'vlan_id': None,
'ip_address': '192.168.1.2',
'advertisement_interval': 10,
'preempt_mode': True,
'preempt_delay': 10,
'admin_state_up': True
}
with hub.Timeout(2):
peer._handle_vrrp_request((msgid, 'vrrp_config', [params]))
hub.sleep(0.1)
eq_(len(sent_events), 1)
req = sent_events.pop()[1]
eq_(req.__class__, vrrp_event.EventVRRPConfigRequest)
req.reply_q.put(vrrp_event.EventVRRPConfigReply('hoge',
req.interface,
req.config))
hub.sleep(0.1)
(msgid_, error, result) = peer._endpoint.response.pop()
eq_(error, None)

View File

@ -4,5 +4,5 @@ webob>=1.0.8
paramiko
lxml
netaddr
oslo.config
oslo.config==1.1.1
msgpack-python>=0.4.0