bgp: manage filter in a peer instance instead of rtconf

rtconf is basically for static configuration and filter is
dynamic and also specific configuration for peer instance.

move filter things under peer instance, and change rtconf to do only
static configuration things from a configuration file

Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
ISHIDA Wataru 2014-07-28 18:22:59 +09:00 committed by FUJITA Tomonori
parent 6fe58b8a7f
commit a09850e480
4 changed files with 142 additions and 142 deletions

View File

@ -78,12 +78,6 @@ def update_neighbor(neigh_ip_address, changes):
if k == neighbors.ENABLED:
rets.append(update_neighbor_enabled(neigh_ip_address, v))
if k == neighbors.OUT_FILTER:
rets.append(_update_outfilter(neigh_ip_address, v))
if k == neighbors.IN_FILTER:
rets.append(_update_infilter(neigh_ip_address, v))
return all(rets)
@ -94,18 +88,6 @@ def _update_med(neigh_ip_address, value):
return True
def _update_infilter(neigh_ip_address, value):
neigh_conf = _get_neighbor_conf(neigh_ip_address)
neigh_conf.in_filter = value
return True
def _update_outfilter(neigh_ip_address, value):
neigh_conf = _get_neighbor_conf(neigh_ip_address)
neigh_conf.out_filter = value
return True
@RegisterWithArgChecks(name='neighbor.delete',
req_args=[neighbors.IP_ADDRESS])
def delete_neighbor(neigh_ip_address):
@ -130,6 +112,44 @@ def get_neighbors_conf():
return CORE_MANAGER.neighbors_conf.settings
@RegisterWithArgChecks(name='neighbor.in_filter.get',
req_args=[neighbors.IP_ADDRESS])
def get_neighbor_in_filter(neigh_ip_address):
"""Returns a neighbor in_filter for given ip address if exists."""
core = CORE_MANAGER.get_core_service()
peer = core.peer_manager.get_by_addr(neigh_ip_address)
return peer.in_filters
@RegisterWithArgChecks(name='neighbor.in_filter.set',
req_args=[neighbors.IP_ADDRESS, neighbors.IN_FILTER])
def set_neighbor_in_filter(neigh_ip_address, filters):
"""Returns a neighbor in_filter for given ip address if exists."""
core = CORE_MANAGER.get_core_service()
peer = core.peer_manager.get_by_addr(neigh_ip_address)
peer.in_filters = filters
return True
@RegisterWithArgChecks(name='neighbor.out_filter.get',
req_args=[neighbors.IP_ADDRESS])
def get_neighbor_out_filter(neigh_ip_address):
"""Returns a neighbor out_filter for given ip address if exists."""
core = CORE_MANAGER.get_core_service()
ret = core.peer_manager.get_by_addr(neigh_ip_address).out_filters
return ret
@RegisterWithArgChecks(name='neighbor.out_filter.set',
req_args=[neighbors.IP_ADDRESS, neighbors.OUT_FILTER])
def set_neighbor_in_filter(neigh_ip_address, filters):
"""Returns a neighbor in_filter for given ip address if exists."""
core = CORE_MANAGER.get_core_service()
peer = core.peer_manager.get_by_addr(neigh_ip_address)
peer.out_filters = filters
return True
# =============================================================================
# VRF configuration related APIs
# =============================================================================

View File

@ -408,14 +408,10 @@ class BGPSpeaker(object):
if prefix_lists is None:
prefix_lists = []
func_name = 'neighbor.update'
prefix_value = {'prefix_lists': prefix_lists,
'route_family': route_family}
filter_param = {neighbors.OUT_FILTER: prefix_value}
func_name = 'neighbor.out_filter.set'
param = {}
param[neighbors.IP_ADDRESS] = address
param[neighbors.CHANGES] = filter_param
param[neighbors.OUT_FILTER] = prefix_lists
call(func_name, **param)
def out_filter_get(self, address):
@ -427,11 +423,11 @@ class BGPSpeaker(object):
"""
func_name = 'neighbor.get'
func_name = 'neighbor.out_filter.get'
param = {}
param[neighbors.IP_ADDRESS] = address
settings = call(func_name, **param)
return settings[OUT_FILTER]
out_filter = call(func_name, **param)
return out_filter
def in_filter_set(self, address, prefix_lists,
route_family=IN_FILTER_RF_IPv4_UC):
@ -442,19 +438,15 @@ class BGPSpeaker(object):
if prefix_lists is None:
prefix_lists = []
func_name = 'neighbor.update'
prefix_value = {'prefix_lists': prefix_lists,
'route_family': route_family}
filter_param = {neighbors.IN_FILTER: prefix_value}
func_name = 'neighbor.in_filter.set'
param = {}
param[neighbors.IP_ADDRESS] = address
param[neighbors.CHANGES] = filter_param
param[neighbors.IN_FILTER] = prefix_lists
call(func_name, **param)
def in_filter_get(self, address):
func_name = 'neighbor.get'
func_name = 'neighbor.in_filter.get'
param = {}
param[neighbors.IP_ADDRESS] = address
settings = call(func_name, **param)
return settings[IN_FILTER]
in_filter = call(func_name, **param)
return in_filter

View File

@ -375,6 +375,26 @@ class Peer(Source, Sink, NeighborConfListener, Activity):
def med(self):
return self._neigh_conf.multi_exit_disc
@property
def in_filters(self):
return self._in_filters
@in_filters.setter
def in_filters(self, filters):
self._in_filters = [f.clone() for f in filters]
LOG.debug('set in-filter : %s' % filters)
self.on_update_in_filter()
@property
def out_filters(self):
return self._out_filters
@out_filters.setter
def out_filters(self, filters):
self._out_filters = [f.clone() for f in filters]
LOG.debug('set out-filter : %s' % filters)
self.on_update_out_filter()
def is_mpbgp_cap_valid(self, route_family):
if not self.in_established:
raise ValueError('Invalid request: Peer not in established state')
@ -462,47 +482,74 @@ class Peer(Source, Sink, NeighborConfListener, Activity):
for af in negotiated_afs:
self._fire_route_refresh(af)
def on_update_out_filter(self, conf_evt):
LOG.debug('on_update_out_filter fired')
event_value = conf_evt.value
prefix_lists = event_value['prefix_lists']
rf = event_value['route_family']
def _apply_filter(self, filters, path):
block = False
blocked_cause = None
table = self._core_service.\
table_manager.get_global_table_by_route_family(rf)
for destination in table.itervalues():
LOG.debug('dest : %s' % destination)
sent_routes = destination.sent_routes_by_peer(self)
if len(sent_routes) == 0:
for filter_ in filters:
policy, is_matched = filter_.evaluate(path)
if policy == PrefixList.POLICY_PERMIT and is_matched:
block = False
break
elif policy == PrefixList.POLICY_DENY and is_matched:
block = True
blocked_cause = filter_.prefix + ' - DENY'
break
return block, blocked_cause
def _apply_in_filter(self, path):
return self._apply_filter(self._in_filters, path)
def _apply_out_filter(self, path):
return self._apply_filter(self._out_filters, path)
def on_update_in_filter(self):
LOG.debug('on_update_in_filter fired')
for received_path in self._adj_rib_in.itervalues():
LOG.debug('received_path: %s' % received_path)
path = received_path.path
nlri_str = path.nlri.formatted_nlri_str
block, blocked_reason = self._apply_in_filter(path)
if block == received_path.filtered:
LOG.debug('block situation not changed: %s' % block)
continue
elif block:
# path wasn't blocked, but must be blocked by this update
path = sent_route.path.clone(for_withdrawal=True)
LOG.debug('withdraw %s because of in filter update'
% nlri_str)
else:
# path was blocked, but mustn't be blocked by this update
LOG.debug('learn blocked %s because of in filter update'
% nlri_str)
received_path.filtered = block
tm = self._core_service.table_manager
tm.learn_path(path)
for sent_route in sent_routes:
path = sent_route.path
nlri_str = path.nlri.formatted_nlri_str
send_withdraw = False
for pl in prefix_lists:
policy, result = pl.evaluate(path)
if policy == PrefixList.POLICY_PERMIT and result:
send_withdraw = False
break
elif policy == PrefixList.POLICY_DENY and result:
send_withdraw = True
break
outgoing_route = None
if send_withdraw:
# send withdraw routes that have already been sent
withdraw_clone = sent_route.path.clone(for_withdrawal=True)
outgoing_route = OutgoingRoute(withdraw_clone)
LOG.debug('send withdraw %s because of out filter'
% nlri_str)
else:
outgoing_route = OutgoingRoute(sent_route.path,
for_route_refresh=True)
LOG.debug('resend path : %s' % nlri_str)
self.enque_outgoing_msg(outgoing_route)
def on_update_out_filter(self):
LOG.debug('on_update_out_filter fired')
for sent_path in self._adj_rib_out.itervalues():
LOG.debug('sent_path: %s' % sent_path)
path = sent_path.path
nlri_str = path.nlri.formatted_nlri_str
block, blocked_reason = self._apply_out_filter(path)
if block == sent_path.filtered:
LOG.debug('block situation not changed: %s' % block)
continue
elif block:
# path wasn't blocked, but must be blocked by this update
withdraw_clone = sent_route.path.clone(for_withdrawal=True)
outgoing_route = OutgoingRoute(withdraw_clone)
LOG.debug('send withdraw %s because of out filter update'
% nlri_str)
else:
# path was blocked, but mustn't be blocked by this update
outgoing_route = OutgoingRoute(path)
LOG.debug('send blocked %s because of out filter update'
% nlri_str)
sent_path.filtered = block
self.enque_outgoing_msg(outgoing_route)
def __str__(self):
return 'Peer(ip: %s, asn: %s)' % (self._neigh_conf.ip_address,
@ -548,23 +595,8 @@ class Peer(Source, Sink, NeighborConfListener, Activity):
Populates Adj-RIB-out with corresponding `SentRoute`.
"""
# evaluate prefix list
rf = outgoing_route.path.route_family
allow_to_send = True
if rf in (RF_IPv4_UC, RF_IPv6_UC):
prefix_lists = self._neigh_conf.out_filter
if not outgoing_route.path.is_withdraw:
for prefix_list in prefix_lists:
path = outgoing_route.path
policy, is_matched = prefix_list.evaluate(path)
if policy == PrefixList.POLICY_PERMIT and is_matched:
allow_to_send = True
break
elif policy == PrefixList.POLICY_DENY and is_matched:
allow_to_send = False
blocked_cause = prefix_list.prefix + ' - DENY'
break
path = outgoing_route.path
block, blocked_cause = self._apply_out_filter(path)
nlri_str = outgoing_route.path.nlri.formatted_nlri_str
sent_route = SentRoute(outgoing_route.path, self, block)
@ -573,7 +605,7 @@ class Peer(Source, Sink, NeighborConfListener, Activity):
# TODO(PH): optimized by sending several prefixes per update.
# Construct and send update message.
if allow_to_send:
if not block:
update_msg = self._construct_update(outgoing_route)
self._protocol.send(update_msg)
# Collect update statistics.
@ -1204,23 +1236,6 @@ class Peer(Source, Sink, NeighborConfListener, Activity):
if withdraw_list:
self._extract_and_handle_bgp4_withdraws(withdraw_list)
def _apply_in_filter(self, path):
block = False
blocked_cause = None
prefix_lists = self._neigh_conf.in_filter
for prefix_list in prefix_lists:
policy, is_matched = prefix_list.evaluate(path)
if policy == PrefixList.POLICY_PERMIT and is_matched:
block = False
break
elif policy == PrefixList.POLICY_DENY and is_matched:
block = True
blocked_cause = prefix_list.prefix + ' - DENY'
break
return block, blocked_cause
def _extract_and_handle_bgp4_new_paths(self, update_msg):
"""Extracts new paths advertised in the given update message's
*MpReachNlri* attribute.

View File

@ -61,6 +61,7 @@ from ryu.services.protocols.bgp.rtconf.base import validate_med
from ryu.services.protocols.bgp.rtconf.base import validate_soo_list
from ryu.services.protocols.bgp.utils.validation import is_valid_ipv4
from ryu.services.protocols.bgp.utils.validation import is_valid_old_asn
from ryu.services.protocols.bgp.info_base.base import Filter
from ryu.services.protocols.bgp.info_base.base import PrefixList
LOG = logging.getLogger('bgpspeaker.rtconf.neighbor')
@ -107,7 +108,7 @@ def validate_enabled(enabled):
@validate(name=CHANGES)
def validate_changes(changes):
for k, v in changes.iteritems():
if k not in (MULTI_EXIT_DISC, ENABLED, IN_FILTER, OUT_FILTER):
if k not in (MULTI_EXIT_DISC, ENABLED):
raise ConfigValueError(desc="Unknown field to change: %s" % k)
if k == MULTI_EXIT_DISC:
@ -188,6 +189,9 @@ SUPPORTED_FILTER_VALIDATORS = {
def valid_filter(filter_):
if isinstance(filter_, Filter):
return filter_
if not isinstance(filter_, dict):
raise ConfigTypeError(desc='Invalid filter: %s' % filter_)
@ -219,10 +223,8 @@ class NeighborConf(ConfWithId, ConfWithStats):
UPDATE_ENABLED_EVT = 'update_enabled_evt'
UPDATE_MED_EVT = 'update_med_evt'
UPDATE_OUT_FILTER_EVT = 'update_out_filter_evt'
VALID_EVT = frozenset([UPDATE_ENABLED_EVT, UPDATE_MED_EVT,
UPDATE_OUT_FILTER_EVT])
VALID_EVT = frozenset([UPDATE_ENABLED_EVT, UPDATE_MED_EVT])
REQUIRED_SETTINGS = frozenset([REMOTE_AS, IP_ADDRESS])
OPTIONAL_SETTINGS = frozenset([CAP_REFRESH,
CAP_ENHANCED_REFRESH,
@ -433,33 +435,10 @@ class NeighborConf(ConfWithId, ConfWithStats):
def in_filter(self):
return self._settings[IN_FILTER]
@in_filter.setter
def in_filter(self, value):
self._settings[IN_FILTER] = []
prefix_lists = value['prefix_lists']
for prefix_list in prefix_lists:
# copy PrefixList object and put it in the _settings
self._settings[IN_FILTER].append(prefix_list.clone())
LOG.debug('set in-filter : %s' % prefix_lists)
@property
def out_filter(self):
return self._settings[OUT_FILTER]
@out_filter.setter
def out_filter(self, value):
self._settings[OUT_FILTER] = []
prefix_lists = value['prefix_lists']
for prefix_list in prefix_lists:
# copy PrefixList object and put it in the _settings
self._settings[OUT_FILTER].append(prefix_list.clone())
LOG.debug('set out-filter : %s' % prefix_lists)
# check sent_route
self._notify_listeners(NeighborConf.UPDATE_OUT_FILTER_EVT, value)
def exceeds_max_prefix_allowed(self, prefix_count):
allowed_max = self._settings[MAX_PREFIXES]
does_exceed = False
@ -603,8 +582,6 @@ class NeighborConfListener(ConfWithIdListener, ConfWithStatsListener):
self.on_update_enabled)
neigh_conf.add_listener(NeighborConf.UPDATE_MED_EVT,
self.on_update_med)
neigh_conf.add_listener(NeighborConf.UPDATE_OUT_FILTER_EVT,
self.on_update_out_filter)
@abstractmethod
def on_update_enabled(self, evt):
@ -613,10 +590,6 @@ class NeighborConfListener(ConfWithIdListener, ConfWithStatsListener):
def on_update_med(self, evt):
raise NotImplementedError('This method should be overridden.')
@abstractmethod
def on_update_out_filter(self, evt):
raise NotImplementedError('This method should be overridden.')
class NeighborsConfListener(BaseConfListener):
"""Base listener for change events to neighbor configuration container."""