mirror of
https://github.com/faucetsdn/ryu.git
synced 2026-05-08 13:56:09 +02:00
send all OFP events as notificaiton
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
parent
da393e8e3d
commit
d76b53127d
@ -6,6 +6,7 @@ from ryu.controller import handler
|
||||
from ryu.controller import dpset
|
||||
from ryu.ofproto.ofproto_parser import MsgBase
|
||||
from ryu.lib.rpc import RpcSession, RpcMessage
|
||||
from ryu.ofproto import ofproto_parser
|
||||
from ryu.ofproto import ofproto_v1_0
|
||||
from ryu.ofproto import ofproto_v1_0_parser
|
||||
from ryu.ofproto import ofproto_v1_2
|
||||
@ -14,20 +15,29 @@ from ryu.ofproto import ofproto_v1_3
|
||||
from ryu.ofproto import ofproto_v1_3_parser
|
||||
|
||||
|
||||
def find_ofp_cls(ofp_version, name):
|
||||
parser_name = 'ryu.ofproto.ofproto_v1_' + str(ofp_version - 1) + \
|
||||
'_parser'
|
||||
mod = sys.modules[parser_name]
|
||||
for i in inspect.getmembers(mod, lambda cls: (inspect.isclass(cls))):
|
||||
if i[0] == name:
|
||||
return i[1]
|
||||
return None
|
||||
|
||||
|
||||
class OFWireRpcSession(object):
|
||||
def __init__(self, socket, dpset):
|
||||
self.socket = socket
|
||||
self.dpset = dpset
|
||||
self.session = RpcSession()
|
||||
self.pool = eventlet.GreenPool()
|
||||
self.send_queue = eventlet.queue.Queue()
|
||||
self.pool.spawn_n(self._send)
|
||||
|
||||
def _find_ofp_cls(self, ofp_version, name):
|
||||
parser_name = 'ryu.ofproto.ofproto_v1_' + str(ofp_version - 1) + \
|
||||
'_parser'
|
||||
mod = sys.modules[parser_name]
|
||||
for i in inspect.getmembers(mod, lambda cls: (inspect.isclass(cls))):
|
||||
if i[0] == name:
|
||||
return i[1]
|
||||
return None
|
||||
def _send(self):
|
||||
while True:
|
||||
m = self.send_queue.get()
|
||||
self.socket.sendall(m)
|
||||
|
||||
def _ofp_handle_match(self, clses, params):
|
||||
match = clses()
|
||||
@ -40,7 +50,7 @@ class OFWireRpcSession(object):
|
||||
for k, v in params.items():
|
||||
if type(v) == dict:
|
||||
self._ofp_handle_params(dp, v)
|
||||
clses = self._find_ofp_cls(dp.ofproto.OFP_VERSION, k)
|
||||
clses = find_ofp_cls(dp.ofproto.OFP_VERSION, k)
|
||||
if clses is not None:
|
||||
if issubclass(clses, MsgBase):
|
||||
ins = clses(dp, **v)
|
||||
@ -68,17 +78,17 @@ class OFWireRpcSession(object):
|
||||
params = msg[3][1]
|
||||
dp = self.dpset.get(int(dpid))
|
||||
if dp is None:
|
||||
print self.dpset.get_all()
|
||||
r = self.session.create_response(msg[1], 1, 0)
|
||||
self.socket.sendall(r)
|
||||
self.send_queue.put(r)
|
||||
return
|
||||
|
||||
self._ofp_handle_params(dp, params)
|
||||
result = {}
|
||||
for k, v in params.items():
|
||||
dp.send_msg(v)
|
||||
|
||||
r = self.session.create_response(msg[1], 0, 0)
|
||||
self.socket.sendall(r)
|
||||
result = {'xid': v.xid}
|
||||
r = self.session.create_response(msg[1], 0, result)
|
||||
self.send_queue.put(r)
|
||||
|
||||
def serve(self):
|
||||
while True:
|
||||
@ -98,14 +108,14 @@ class OFWireRpcSession(object):
|
||||
print "invalid type", m
|
||||
|
||||
|
||||
class RpcApi(app_manager.RyuApp):
|
||||
class RPCApi(app_manager.RyuApp):
|
||||
_CONTEXTS = {
|
||||
'dpset': dpset.DPSet,
|
||||
}
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.dpset = kwargs['dpset']
|
||||
super(RpcApi, self).__init__(*args, **kwargs)
|
||||
super(RPCApi, self).__init__(*args, **kwargs)
|
||||
self.server = eventlet.listen(('', 50000))
|
||||
self.pool = eventlet.GreenPool()
|
||||
self.pool.spawn_n(self.serve)
|
||||
@ -118,14 +128,50 @@ class RpcApi(app_manager.RyuApp):
|
||||
self.sessions.append(session)
|
||||
self.pool.spawn_n(session.serve)
|
||||
|
||||
def _ofp_handle_ob(self, dp, msg, params):
|
||||
clses = find_ofp_cls(dp.ofproto.OFP_VERSION,
|
||||
msg.__class__.__name__)
|
||||
if clses:
|
||||
params[msg.__class__.__name__] = {}
|
||||
_params = params[msg.__class__.__name__]
|
||||
else:
|
||||
_params = params
|
||||
|
||||
for i in ofproto_parser.ofp_attr(msg):
|
||||
if i.startswith('_'):
|
||||
continue
|
||||
elif i == 'parser':
|
||||
continue
|
||||
elif i == 'serialize':
|
||||
continue
|
||||
|
||||
v = getattr(msg, i)
|
||||
if type(v) == dict:
|
||||
_params[i] = {}
|
||||
for key in v.keys():
|
||||
_params[i][key] = {}
|
||||
self._ofp_handle_ob(dp, v[key], _params[i][key])
|
||||
elif type(v) == list:
|
||||
ins = []
|
||||
for j in v:
|
||||
d = {}
|
||||
self._ofp_handle_ob(dp, j, d)
|
||||
ins.append(d)
|
||||
_params[i] = ins
|
||||
elif type(v).__name__ == 'builtin_function_or_method':
|
||||
pass
|
||||
else:
|
||||
_params[i] = v
|
||||
|
||||
@handler.observe_all_events('ofp_event')
|
||||
def handler(self, ev):
|
||||
if hasattr(ev, 'msg'):
|
||||
ofmsg = ev.msg
|
||||
params = {}
|
||||
params[type(ev.msg).__name__] = {}
|
||||
for m in inspect.getmembers(ev.msg):
|
||||
if not m[0].startswith('__'):
|
||||
# we need a mechnism to get _only_ OFP defined members.
|
||||
params[type(ev.msg).__name__][m[0]] = m[1]
|
||||
self._ofp_handle_ob(ofmsg.datapath, ofmsg, params)
|
||||
for k in params.keys():
|
||||
params[k]['xid'] = ofmsg.xid
|
||||
|
||||
# send this to all sessions as a notification.
|
||||
for s in self.sessions:
|
||||
m = s.session.create_notification('ofp', params)
|
||||
s.send_queue.put(m)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user