bgp: use ryu.lib.hub instead of the direct use of eventlet

Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
ISHIDA Wataru 2014-04-18 02:02:16 +00:00 committed by FUJITA Tomonori
parent 68c13b7fa1
commit 3a4f092de5
4 changed files with 42 additions and 65 deletions

View File

@ -39,6 +39,8 @@ if HUB_TYPE == 'eventlet':
getcurrent = eventlet.getcurrent
patch = eventlet.monkey_patch
sleep = eventlet.sleep
listen = eventlet.listen
connect = eventlet.connect
def spawn(*args, **kwargs):
def _launch(func, *args, **kwargs):
@ -57,6 +59,23 @@ if HUB_TYPE == 'eventlet':
return eventlet.spawn(_launch, *args, **kwargs)
def spawn_after(seconds, *args, **kwargs):
def _launch(func, *args, **kwargs):
# mimic gevent's default raise_error=False behaviour
# by not propergating an exception to the joiner.
try:
func(*args, **kwargs)
except greenlet.GreenletExit:
pass
except:
# log uncaught exception.
# note: this is an intentional divergence from gevent
# behaviour. gevent silently ignores such exceptions.
LOG.error('hub: uncaught exception: %s',
traceback.format_exc())
return eventlet.spawn_after(seconds, _launch, *args, **kwargs)
def kill(thread):
thread.kill()
@ -119,6 +138,9 @@ if HUB_TYPE == 'eventlet':
# note: _ev.reset() is obsolete.
self._ev = eventlet.event.Event()
def is_set(self):
return self._cond
def set(self):
self._cond = True
self._broadcast()

View File

@ -16,7 +16,7 @@
"""
Defines APIs related to Core/CoreManager.
"""
import eventlet
from ryu.lib import hub
from ryu.services.protocols.bgp.api.base import register
from ryu.services.protocols.bgp.core_manager import CORE_MANAGER
@ -39,7 +39,7 @@ def start(**kwargs):
waiter = kwargs.pop('waiter')
common_config = CommonConf(**kwargs)
eventlet.spawn(CORE_MANAGER.start, *[], **{'common_conf': common_config,
hub.spawn(CORE_MANAGER.start, *[], **{'common_conf': common_config,
'waiter': waiter})
return True
@ -70,7 +70,7 @@ def reset_neighor(ip_address):
# Disable neighbor to close existing session.
neigh_conf.enabled = False
# Yield here so that we give chance for neighbor to be disabled.
eventlet.sleep(NEIGHBOR_RESET_WAIT_TIME)
hub.sleep(NEIGHBOR_RESET_WAIT_TIME)
# Enable neighbor, so that we have a new session with it.
neigh_conf.enabled = True
else:

View File

@ -16,13 +16,13 @@
Defines some base class related to managing green threads.
"""
import abc
import eventlet
import logging
import time
import traceback
import weakref
from eventlet.timeout import Timeout
from ryu.lib import hub
from ryu.lib.hub import Timeout
from ryu.lib.packet.bgp import RF_IPv4_UC
from ryu.lib.packet.bgp import RF_IPv6_UC
from ryu.lib.packet.bgp import RF_IPv4_VPN
@ -171,7 +171,7 @@ class Activity(object):
self._validate_activity(activity)
# Spawn a new greenthread for given activity
greenthread = eventlet.spawn(activity.start, *args, **kwargs)
greenthread = hub.spawn(activity.start, *args, **kwargs)
self._child_thread_map[activity.name] = greenthread
self._child_activity_map[activity.name] = activity
return greenthread
@ -180,7 +180,7 @@ class Activity(object):
self._validate_activity(activity)
# Schedule to spawn a new greenthread after requested delay
greenthread = eventlet.spawn_after(seconds, activity.start, *args,
greenthread = hub.spawn_after(seconds, activity.start, *args,
**kwargs)
self._child_thread_map[activity.name] = greenthread
self._child_activity_map[activity.name] = activity
@ -200,13 +200,13 @@ class Activity(object):
def _spawn(self, name, callable_, *args, **kwargs):
self._validate_callable(callable_)
greenthread = eventlet.spawn(callable_, *args, **kwargs)
greenthread = hub.spawn(callable_, *args, **kwargs)
self._child_thread_map[name] = greenthread
return greenthread
def _spawn_after(self, name, seconds, callable_, *args, **kwargs):
self._validate_callable(callable_)
greenthread = eventlet.spawn_after(seconds, callable_, *args, **kwargs)
greenthread = hub.spawn_after(seconds, callable_, *args, **kwargs)
self._child_thread_map[name] = greenthread
return greenthread
@ -244,12 +244,12 @@ class Activity(object):
self.stop()
def pause(self, seconds=0):
"""Relinquishes eventlet hub for given number of seconds.
"""Relinquishes hub for given number of seconds.
In other words is puts to sleep to give other greeenthread a chance to
run.
"""
eventlet.sleep(seconds)
hub.sleep(seconds)
def _stop_child_activities(self):
"""Stop all child activities spawn by this activity.
@ -317,7 +317,7 @@ class Activity(object):
For each connection `server_factory` starts a new protocol.
"""
server = eventlet.listen(loc_addr)
server = hub.listen(loc_addr)
server_name = self.name + '_server@' + str(loc_addr)
self._asso_socket_map[server_name] = server
@ -341,7 +341,7 @@ class Activity(object):
LOG.debug('Connect TCP called for %s:%s' % (peer_addr[0],
peer_addr[1]))
with Timeout(time_out, False):
sock = eventlet.connect(peer_addr, bind=bind_address)
sock = hub.connect(peer_addr, bind=bind_address)
if sock:
# Connection name for pro-active connection is made up
# of local end address + remote end address

View File

@ -16,8 +16,7 @@
"""
Concurrent networking library - Eventlet, based utilities classes.
"""
import eventlet
from eventlet import event
from ryu.lib import hub
import logging
LOG = logging.getLogger('utils.evtlet')
@ -28,58 +27,14 @@ class EventletIOFactory(object):
@staticmethod
def create_custom_event():
LOG.debug('Create CustomEvent called')
return CustomEvent()
return hub.Event()
@staticmethod
def create_looping_call(funct, *args, **kwargs):
LOG.debug('create_looping_call called')
return LoopingCall(funct, *args, **kwargs)
class CustomEvent(object):
"""Encapsulates eventlet event to provide a event which can recur.
It has the same interface as threading.Event but works for eventlet.
"""
def __init__(self,):
self._event = event.Event()
self._is_set = False
def is_set(self):
"""Return true if and only if the internal flag is true."""
return self._is_set
def set(self):
"""Set the internal flag to true.
All threads waiting for it to become true are awakened.
Threads that call wait() once the flag is true will not block at all.
"""
if self._event and not self._event.ready():
self._event.send()
self._is_set = True
def clear(self):
"""Reset the internal flag to false.
Subsequently, threads calling wait() will block until set() is called
to set the internal flag to true again.
"""
if self._is_set:
self._is_set = False
self._event = event.Event()
def wait(self):
"""Block until the internal flag is true.
If the internal flag is true on entry, return immediately. Otherwise,
block until another thread calls set() to set the flag to true, or
until the optional timeout occurs.
"""
if not self._is_set:
self._event.wait()
# TODO: improve Timer service and move it into framework
class LoopingCall(object):
"""Call a function repeatedly.
"""
@ -102,7 +57,7 @@ class LoopingCall(object):
def __call__(self):
if self._running:
# Schedule next iteration of the call.
self._self_thread = eventlet.spawn_after(self._interval, self)
self._self_thread = hub.spawn_after(self._interval, self)
self._funct(*self._args, **self._kwargs)
def start(self, interval, now=True):
@ -117,9 +72,9 @@ class LoopingCall(object):
self._running = True
self._interval = interval
if now:
self._self_thread = eventlet.spawn_after(0, self)
self._self_thread = hub.spawn_after(0, self)
else:
self._self_thread = eventlet.spawn_after(self._interval, self)
self._self_thread = hub.spawn_after(self._interval, self)
def stop(self):
"""Stop running scheduled function.
@ -137,4 +92,4 @@ class LoopingCall(object):
self._self_thread.cancel()
self._self_thread = None
# Schedule a new call
self._self_thread = eventlet.spawn_after(self._interval, self)
self._self_thread = hub.spawn_after(self._interval, self)