New API for WebSocket support

- API
 - @websocket decorator
 - WSGIApplication#websocketmanager()
- bugfix
 - Even if a connection is cut, it continues remaining
- remove restriction
 - Two or more connections can be accepted

Signed-off-by: Satoshi Kobayashi <satoshi-k@stratosphere.co.jp>
Reviewed-by: YAMADA Hideki <yamada.hideki@po.ntts.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
Satoshi Kobayashi 2014-05-19 01:51:52 +00:00 committed by FUJITA Tomonori
parent eb0055f0ce
commit 7598ef0561
2 changed files with 81 additions and 28 deletions

View File

@ -25,10 +25,10 @@ $ wsdump.py ws://127.0.0.1:8080/simpleswitch/ws
"""
import json
from webob import Response
from webob import Response
from ryu.app import simple_switch_13
from ryu.app.wsgi import route, ControllerBase, WSGIApplication
from ryu.app.wsgi import route, websocket, ControllerBase, WSGIApplication
from ryu.controller import ofp_event
from ryu.controller.handler import set_ev_cls
from ryu.lib import hub
@ -47,21 +47,19 @@ class SimpleSwitchWebSocket13(simple_switch_13.SimpleSwitch13):
def __init__(self, *args, **kwargs):
super(SimpleSwitchWebSocket13, self).__init__(*args, **kwargs)
self.ws_send_queue = hub.Queue()
self.ws_lock = hub.BoundedSemaphore()
wsgi = kwargs['wsgi']
wsgi.register(
SimpleSwitchWebSocketController,
data={simple_switch_instance_name: self},
)
self._ws_manager = wsgi.websocketmanager
@set_ev_cls(ofp_event.EventOFPPacketIn)
def _packet_in_handler(self, ev):
super(SimpleSwitchWebSocket13, self)._packet_in_handler(ev)
pkt = packet.Packet(ev.msg.data)
self.ws_send_queue.put(str(pkt))
self._ws_manager.broadcast(str(pkt))
class SimpleSwitchWebSocketController(ControllerBase):
@ -70,22 +68,12 @@ class SimpleSwitchWebSocketController(ControllerBase):
req, link, data, **config)
self.simpl_switch_spp = data[simple_switch_instance_name]
@websocket('simpleswitch', url)
def _websocket_handler(self, ws):
simple_switch = self.simpl_switch_spp
simple_switch.logger.debug('WebSocket connected: %s', ws)
while True:
data = simple_switch.ws_send_queue.get()
ws.send(unicode(json.dumps(data)))
@route('simpleswitch', url)
def websocket(self, req, **kwargs):
simple_switch = self.simpl_switch_spp
if simple_switch.ws_lock.acquire(blocking=False):
try:
self.websocket_handshake(req, self._websocket_handler)
return
finally:
simple_switch.logger.debug('WebSocket disconnected')
simple_switch.ws_lock.release()
else:
return Response(status=503)
msg = ws.wait()
if msg is None:
break
simple_switch.logger.debug('WebSocket disconnected')

View File

@ -15,15 +15,15 @@
# limitations under the License.
import inspect
from types import MethodType
from ryu import cfg
import webob.dec
from webob.response import Response
from ryu import cfg
from ryu.lib import hub
from routes import Mapper
from routes.util import URLGenerator
CONF = cfg.CONF
CONF.register_cli_opts([
cfg.StrOpt('wsapi-host', default='', help='webapp listen host'),
@ -46,12 +46,59 @@ def route(name, path, methods=None, requirements=None):
return _route
class WebSocketRegistrationWrapper(object):
def __init__(self, func, controller):
self._controller = controller
self._controller_method = MethodType(func, controller)
def __call__(self, ws):
wsgi_application = self._controller.parent
ws_manager = wsgi_application.websocketmanager
ws_manager.add_connection(ws)
try:
self._controller_method(ws)
finally:
ws_manager.delete_connection(ws)
class _AlreadyHandledResponse(Response):
# XXX: Eventlet API should not be used directly.
from eventlet.wsgi import ALREADY_HANDLED
_ALREADY_HANDLED = ALREADY_HANDLED
def __call__(self, environ, start_response):
return self._ALREADY_HANDLED
def websocket(name, path):
def _websocket(controller_func):
def __websocket(self, req, **kwargs):
wrapper = WebSocketRegistrationWrapper(controller_func, self)
ws_wsgi = hub.WebSocketWSGI(wrapper)
ws_wsgi(req.environ, req.start_response)
# XXX: In order to prevent the writing to a already closed socket.
# This issue is caused by combined use:
# - webob.dec.wsgify()
# - eventlet.wsgi.HttpProtocol.handle_one_response()
return _AlreadyHandledResponse()
__websocket.routing_info = {
'name': name,
'path': path,
'methods': None,
'requirements': None,
}
return __websocket
return _websocket
class ControllerBase(object):
special_vars = ['action', 'controller']
def __init__(self, req, link, data, **config):
self.req = req
self.link = link
self.parent = None
for name, value in config.items():
setattr(self, name, value)
@ -67,10 +114,6 @@ class ControllerBase(object):
return getattr(self, action)(req, **kwargs)
def websocket_handshake(self, req, handler):
ws_wsgi = hub.WebSocketWSGI(handler)
return ws_wsgi(req.environ, req.start_response)
class wsgify_hack(webob.dec.wsgify):
def __call__(self, environ, start_response):
@ -78,11 +121,28 @@ class wsgify_hack(webob.dec.wsgify):
return super(wsgify_hack, self).__call__(environ, start_response)
class WebSocketManager(object):
def __init__(self):
self._connections = []
def add_connection(self, ws):
self._connections.append(ws)
def delete_connection(self, ws):
self._connections.remove(ws)
def broadcast(self, msg):
for connection in self._connections:
connection.send(msg)
class WSGIApplication(object):
def __init__(self, **config):
self.config = config
self.mapper = Mapper()
self.registory = {}
self._wsmanager = WebSocketManager()
super(WSGIApplication, self).__init__()
# XXX: Switch how to call the API of Routes for every version
match_argspec = inspect.getargspec(self.mapper.match)
@ -119,6 +179,7 @@ class WSGIApplication(object):
data = self.registory[name]
controller = match['controller'](req, link, data, **self.config)
controller.parent = self
return controller(req)
def register(self, controller, data=None):
@ -142,6 +203,10 @@ class WSGIApplication(object):
if data:
self.registory[controller.__name__] = data
@property
def websocketmanager(self):
return self._wsmanager
class WSGIServer(hub.WSGIServer):
def __init__(self, application, **config):