[AETHER-72] Refactoring RouteService

- to use bulk updates interface
- to use new getRoutesForNextHops API
- to use multi-thread resolver
- to use multi-thread hostexec
- to use a concurrent hashmap instead of synchronized
- to use a non-blocking resolved store

Additionally updates unit tests

Change-Id: Id960abd0f2a1b03066ce34b6a2f72b76566bb58c
This commit is contained in:
pier 2019-10-16 16:58:20 +02:00 committed by Pier Luigi Ventre
parent ec2b3184c8
commit e91c87f975
12 changed files with 502 additions and 131 deletions

View File

@ -37,6 +37,13 @@ public interface RouteStore extends Store<InternalRouteEvent, RouteStoreDelegate
*/
void updateRoute(Route route);
/**
* Adds or updates the given routes in the store.
*
* @param routes routes to add or update
*/
void updateRoutes(Collection<Route> routes);
/**
* Removes the given route from the store.
*
@ -44,6 +51,13 @@ public interface RouteStore extends Store<InternalRouteEvent, RouteStoreDelegate
*/
void removeRoute(Route route);
/**
* Removes the given routes from the store.
*
* @param routes routes to remove
*/
void removeRoutes(Collection<Route> routes);
/**
* Replaces the all the routes for a prefix
* with the given route.
@ -78,6 +92,14 @@ public interface RouteStore extends Store<InternalRouteEvent, RouteStoreDelegate
// TODO think about including route table info
Collection<Route> getRoutesForNextHop(IpAddress ip);
/**
* Returns all routes that point to any of the given next hops IP addresses.
*
* @param nextHops next hops IP addresses
* @return collection of routes sets
*/
Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> nextHops);
/**
* Returns the set of routes in the default route table for the given prefix.
*

View File

@ -30,11 +30,21 @@ public class RouteStoreAdapter implements RouteStore {
}
@Override
public void updateRoutes(Collection<Route> routes) {
}
@Override
public void removeRoute(Route route) {
}
@Override
public void removeRoutes(Collection<Route> routes) {
}
@Override
public void replaceRoute(Route route) {
@ -55,6 +65,11 @@ public class RouteStoreAdapter implements RouteStore {
return null;
}
@Override
public Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> nextHops) {
return null;
}
@Override
public RouteSet getRoutes(IpPrefix prefix) {
return null;

View File

@ -17,7 +17,6 @@
package org.onosproject.routeservice.impl;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.googlecode.concurrenttrees.common.KeyValuePair;
import com.googlecode.concurrenttrees.radix.node.concrete.DefaultByteArrayNodeFactory;
import com.googlecode.concurrenttrees.radixinverted.ConcurrentInvertedRadixTree;
@ -121,7 +120,7 @@ public class DefaultResolvedRouteStore implements ResolvedRouteStore {
routeTable = new ConcurrentInvertedRadixTree<>(
new DefaultByteArrayNodeFactory());
alternativeRoutes = Maps.newHashMap();
alternativeRoutes = new ConcurrentHashMap<>();
}
/**
@ -133,7 +132,6 @@ public class DefaultResolvedRouteStore implements ResolvedRouteStore {
public RouteEvent update(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
Set<ResolvedRoute> immutableAlternatives = checkAlternatives(route, alternatives);
synchronized (this) {
ResolvedRoute oldRoute = routeTable.put(createBinaryString(route.prefix()), route);
Set<ResolvedRoute> oldRoutes = alternativeRoutes.put(route.prefix(), immutableAlternatives);
@ -153,7 +151,6 @@ public class DefaultResolvedRouteStore implements ResolvedRouteStore {
}
return null;
}
}
/**
@ -181,18 +178,16 @@ public class DefaultResolvedRouteStore implements ResolvedRouteStore {
* @param prefix prefix to remove
*/
public RouteEvent remove(IpPrefix prefix) {
synchronized (this) {
String key = createBinaryString(prefix);
String key = createBinaryString(prefix);
ResolvedRoute route = routeTable.getValueForExactKey(key);
Set<ResolvedRoute> alternatives = alternativeRoutes.remove(prefix);
ResolvedRoute route = routeTable.getValueForExactKey(key);
Set<ResolvedRoute> alternatives = alternativeRoutes.remove(prefix);
if (route != null) {
routeTable.remove(key);
return new RouteEvent(RouteEvent.Type.ROUTE_REMOVED, route, alternatives);
}
return null;
if (route != null) {
routeTable.remove(key);
return new RouteEvent(RouteEvent.Type.ROUTE_REMOVED, route, alternatives);
}
return null;
}
/**

View File

@ -19,6 +19,7 @@ package org.onosproject.routeservice.impl;
import com.google.common.collect.ImmutableList;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.util.PredictableExecutor;
import org.onosproject.cluster.ClusterService;
import org.onosproject.net.Host;
import org.onosproject.net.host.HostEvent;
@ -32,7 +33,6 @@ import org.onosproject.routeservice.RouteEvent;
import org.onosproject.routeservice.RouteInfo;
import org.onosproject.routeservice.RouteListener;
import org.onosproject.routeservice.RouteService;
import org.onosproject.routeservice.RouteSet;
import org.onosproject.routeservice.RouteStore;
import org.onosproject.routeservice.RouteStoreDelegate;
import org.onosproject.routeservice.RouteTableId;
@ -45,16 +45,12 @@ import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@ -71,6 +67,8 @@ public class RouteManager implements RouteService, RouteAdminService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final int DEFAULT_BUCKETS = 0;
private RouteStoreDelegate delegate = new InternalRouteStoreDelegate();
private InternalHostListener hostListener = new InternalHostListener();
@ -90,18 +88,21 @@ public class RouteManager implements RouteService, RouteAdminService {
private RouteMonitor routeMonitor;
@GuardedBy(value = "this")
private Map<RouteListener, ListenerQueue> listeners = new HashMap<>();
protected RouteResolver routeResolver;
private Map<RouteListener, ListenerQueue> listeners = new ConcurrentHashMap<>();
private ThreadFactory threadFactory;
protected Executor hostEventExecutor = newSingleThreadExecutor(
groupedThreads("rm-event-host", "%d", log));
protected PredictableExecutor hostEventExecutors;
@Activate
protected void activate() {
routeMonitor = new RouteMonitor(this, clusterService, storageService);
routeResolver = new RouteResolver(this, hostService);
threadFactory = groupedThreads("onos/route", "listener-%d", log);
hostEventExecutors = new PredictableExecutor(DEFAULT_BUCKETS, groupedThreads("onos/route-manager",
"event-host-%d", log));
resolvedRouteStore = new DefaultResolvedRouteStore();
@ -110,15 +111,14 @@ public class RouteManager implements RouteService, RouteAdminService {
routeStore.getRouteTables().stream()
.flatMap(id -> routeStore.getRoutes(id).stream())
.forEach(this::resolve);
.forEach(routeSet -> routeResolver.resolve(routeSet));
}
@Deactivate
protected void deactivate() {
routeMonitor.shutdown();
synchronized (this) {
listeners.values().forEach(ListenerQueue::stop);
}
routeResolver.shutdown();
listeners.values().forEach(ListenerQueue::stop);
routeStore.unsetDelegate(delegate);
hostService.removeListener(hostListener);
@ -136,8 +136,9 @@ public class RouteManager implements RouteService, RouteAdminService {
*/
@Override
public void addListener(RouteListener listener) {
synchronized (this) {
log.debug("Synchronizing current routes to new listener");
log.debug("Synchronizing current routes to new listener");
ListenerQueue listenerQueue = listeners.compute(listener, (key, value) -> {
// Create listener regardless the existence of a previous value
ListenerQueue l = createListenerQueue(listener);
resolvedRouteStore.getRouteTables().stream()
.map(resolvedRouteStore::getRoutes)
@ -145,21 +146,18 @@ public class RouteManager implements RouteService, RouteAdminService {
.map(route -> new RouteEvent(RouteEvent.Type.ROUTE_ADDED, route,
resolvedRouteStore.getAllRoutes(route.prefix())))
.forEach(l::post);
listeners.put(listener, l);
l.start();
log.debug("Route synchronization complete");
}
return l;
});
// Start draining the events
listenerQueue.start();
log.debug("Route synchronization complete");
}
@Override
public void removeListener(RouteListener listener) {
synchronized (this) {
ListenerQueue l = listeners.remove(listener);
if (l != null) {
l.stop();
}
ListenerQueue l = listeners.remove(listener);
if (l != null) {
l.stop();
}
}
@ -171,16 +169,10 @@ public class RouteManager implements RouteService, RouteAdminService {
private void post(RouteEvent event) {
if (event != null) {
log.debug("Sending event {}", event);
synchronized (this) {
listeners.values().forEach(l -> l.post(event));
}
listeners.values().forEach(l -> l.post(event));
}
}
private Collection<Route> reformatRoutes(Collection<RouteSet> routeSets) {
return routeSets.stream().flatMap(r -> r.routes().stream()).collect(Collectors.toList());
}
@Override
public Collection<RouteTableId> getRouteTables() {
return routeStore.getRouteTables();
@ -190,24 +182,11 @@ public class RouteManager implements RouteService, RouteAdminService {
public Collection<RouteInfo> getRoutes(RouteTableId id) {
return routeStore.getRoutes(id).stream()
.map(routeSet -> new RouteInfo(routeSet.prefix(),
resolvedRouteStore.getRoute(routeSet.prefix()).orElse(null), resolveRouteSet(routeSet)))
resolvedRouteStore.getRoute(routeSet.prefix()).orElse(null),
routeResolver.resolveRouteSet(routeSet)))
.collect(Collectors.toList());
}
private Set<ResolvedRoute> resolveRouteSet(RouteSet routeSet) {
return routeSet.routes().stream()
.map(this::tryResolve)
.collect(Collectors.toSet());
}
private ResolvedRoute tryResolve(Route route) {
ResolvedRoute resolvedRoute = resolve(route);
if (resolvedRoute == null) {
resolvedRoute = new ResolvedRoute(route, null, null);
}
return resolvedRoute;
}
@Override
public Collection<ResolvedRoute> getResolvedRoutes(RouteTableId id) {
return resolvedRouteStore.getRoutes(id);
@ -225,22 +204,14 @@ public class RouteManager implements RouteService, RouteAdminService {
@Override
public void update(Collection<Route> routes) {
synchronized (this) {
routes.forEach(route -> {
log.debug("Received update {}", route);
routeStore.updateRoute(route);
});
}
log.debug("Received update {}", routes);
routeStore.updateRoutes(routes);
}
@Override
public void withdraw(Collection<Route> routes) {
synchronized (this) {
routes.forEach(route -> {
log.debug("Received withdraw {}", route);
routeStore.removeRoute(route);
});
}
log.debug("Received withdraw {}", routes);
routeStore.removeRoutes(routes);
}
@Override
@ -250,48 +221,14 @@ public class RouteManager implements RouteService, RouteAdminService {
.orElse(null);
}
private ResolvedRoute resolve(Route route) {
hostService.startMonitoringIp(route.nextHop());
Set<Host> hosts = hostService.getHostsByIp(route.nextHop());
return hosts.stream().findFirst()
.map(host -> new ResolvedRoute(route, host.mac(), host.vlan()))
.orElse(null);
}
private ResolvedRoute decide(ResolvedRoute route1, ResolvedRoute route2) {
return Comparator.comparing(ResolvedRoute::nextHop)
.compare(route1, route2) <= 0 ? route1 : route2;
}
private void store(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
void store(ResolvedRoute route, Set<ResolvedRoute> alternatives) {
post(resolvedRouteStore.updateRoute(route, alternatives));
}
private void remove(IpPrefix prefix) {
void remove(IpPrefix prefix) {
post(resolvedRouteStore.removeRoute(prefix));
}
private void resolve(RouteSet routes) {
if (routes.routes() == null) {
// The routes were removed before we got to them, nothing to do
return;
}
Set<ResolvedRoute> resolvedRoutes = routes.routes().stream()
.map(this::resolve)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
Optional<ResolvedRoute> bestRoute = resolvedRoutes.stream()
.reduce(this::decide);
if (bestRoute.isPresent()) {
store(bestRoute.get(), resolvedRoutes);
} else {
remove(routes.prefix());
}
}
private void hostUpdated(Host host) {
hostChanged(host);
}
@ -301,12 +238,8 @@ public class RouteManager implements RouteService, RouteAdminService {
}
private void hostChanged(Host host) {
synchronized (this) {
host.ipAddresses().stream()
.flatMap(ip -> routeStore.getRoutesForNextHop(ip).stream())
.map(route -> routeStore.getRoutes(route.prefix()))
.forEach(this::resolve);
}
routeStore.getRoutesForNextHops(host.ipAddresses())
.forEach(routeSet -> routeResolver.resolve(routeSet));
}
/**
@ -377,10 +310,10 @@ public class RouteManager implements RouteService, RouteAdminService {
public void notify(InternalRouteEvent event) {
switch (event.type()) {
case ROUTE_ADDED:
resolve(event.subject());
routeResolver.resolve(event.subject());
break;
case ROUTE_REMOVED:
resolve(event.subject());
routeResolver.resolve(event.subject());
break;
default:
break;
@ -399,11 +332,11 @@ public class RouteManager implements RouteService, RouteAdminService {
case HOST_UPDATED:
case HOST_MOVED:
log.trace("Scheduled host event {}", event);
hostEventExecutor.execute(() -> hostUpdated(event.subject()));
hostEventExecutors.execute(() -> hostUpdated(event.subject()), event.subject().id().hashCode());
break;
case HOST_REMOVED:
log.trace("Scheduled host event {}", event);
hostEventExecutor.execute(() -> hostRemoved(event.subject()));
hostEventExecutors.execute(() -> hostRemoved(event.subject()), event.subject().id().hashCode());
break;
default:
break;

View File

@ -0,0 +1,123 @@
/*
* Copyright 2019-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.routeservice.impl;
import org.onlab.util.PredictableExecutor;
import org.onosproject.net.Host;
import org.onosproject.net.host.HostService;
import org.onosproject.routeservice.ResolvedRoute;
import org.onosproject.routeservice.Route;
import org.onosproject.routeservice.RouteSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static org.onlab.util.Tools.groupedThreads;
/**
* Resolves routes in multi-thread fashion.
*/
class RouteResolver {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private static final int DEFAULT_BUCKETS = 0;
private RouteManager routeManager;
private HostService hostService;
protected PredictableExecutor routeResolvers;
/**
* Creates a new route resolver.
*
* @param routeManager route service
* @param hostService host service
*/
RouteResolver(RouteManager routeManager, HostService hostService) {
this.routeManager = routeManager;
this.hostService = hostService;
routeResolvers = new PredictableExecutor(DEFAULT_BUCKETS, groupedThreads("onos/route-resolver",
"route-resolver-%d", log));
}
/**
* Shuts down the route resolver.
*/
void shutdown() {
routeResolvers.shutdown();
}
private ResolvedRoute tryResolve(Route route) {
ResolvedRoute resolvedRoute = resolve(route);
if (resolvedRoute == null) {
resolvedRoute = new ResolvedRoute(route, null, null);
}
return resolvedRoute;
}
// Used by external reads
Set<ResolvedRoute> resolveRouteSet(RouteSet routeSet) {
return routeSet.routes().stream()
.map(this::tryResolve)
.collect(Collectors.toSet());
}
// Used by external reads and by resolvers
ResolvedRoute resolve(Route route) {
hostService.startMonitoringIp(route.nextHop());
Set<Host> hosts = hostService.getHostsByIp(route.nextHop());
return hosts.stream().findFirst()
.map(host -> new ResolvedRoute(route, host.mac(), host.vlan()))
.orElse(null);
}
private ResolvedRoute decide(ResolvedRoute route1, ResolvedRoute route2) {
return Comparator.comparing(ResolvedRoute::nextHop)
.compare(route1, route2) <= 0 ? route1 : route2;
}
private void resolveInternal(RouteSet routes) {
if (routes.routes() == null) {
// The routes were removed before we got to them, nothing to do
return;
}
Set<ResolvedRoute> resolvedRoutes = routes.routes().stream()
.map(this::resolve)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
Optional<ResolvedRoute> bestRoute = resolvedRoutes.stream()
.reduce(this::decide);
if (bestRoute.isPresent()) {
routeManager.store(bestRoute.get(), resolvedRoutes);
} else {
routeManager.remove(routes.prefix());
}
}
// Offload to the resolvers using prefix hashcode as hint
// TODO Remove RouteManager reference using PickyCallable
void resolve(RouteSet routes) {
routeResolvers.execute(() -> resolveInternal(routes), routes.prefix().hashCode());
}
}

View File

@ -18,6 +18,7 @@ package org.onosproject.routeservice.store;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
@ -43,6 +44,7 @@ import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
@ -132,6 +134,14 @@ public class DefaultRouteTable implements RouteTable {
routes.put(route.prefix().toString(), new RawRoute(route));
}
@Override
public void update(Collection<Route> routesAdded) {
Map<String, Collection<? extends RawRoute>> computedRoutes = new HashMap<>();
computeRoutesToAdd(routesAdded).forEach((prefix, routes) -> computedRoutes.computeIfAbsent(
prefix, k -> Sets.newHashSet(routes)));
routes.putAll(computedRoutes);
}
@Override
public void remove(Route route) {
getRoutes(route.prefix())
@ -144,6 +154,14 @@ public class DefaultRouteTable implements RouteTable {
});
}
@Override
public void remove(Collection<Route> routesRemoved) {
Map<String, Collection<? extends RawRoute>> computedRoutes = new HashMap<>();
computeRoutesToRemove(routesRemoved).forEach((prefix, routes) -> computedRoutes.computeIfAbsent(
prefix, k -> Sets.newHashSet(routes)));
routes.removeAll(computedRoutes);
}
@Override
public void replace(Route route) {
routes.replaceValues(route.prefix().toString(), Sets.newHashSet(new RawRoute(route)));
@ -180,6 +198,53 @@ public class DefaultRouteTable implements RouteTable {
.collect(Collectors.toSet());
}
@Override
public Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> nextHops) {
// First create a reduced snapshot of the store iterating one time the map
Map<String, Collection<? extends RawRoute>> filteredRouteStore = new HashMap<>();
routes.values().stream()
.filter(r -> nextHops.contains(IpAddress.valueOf(r.nextHop())))
.forEach(r -> filteredRouteStore.computeIfAbsent(r.prefix, k -> {
// We need to get all the routes because the resolve logic
// will use the alternatives as well
Versioned<Collection<? extends RawRoute>> routeSet = routes.get(k);
if (routeSet != null) {
return routeSet.value();
}
return null;
}));
// Return the collection of the routeSet we have to resolve
return filteredRouteStore.entrySet().stream()
.map(entry -> new RouteSet(id, IpPrefix.valueOf(entry.getKey()),
entry.getValue().stream().map(RawRoute::route).collect(Collectors.toSet())))
.collect(Collectors.toSet());
}
private Map<String, Collection<RawRoute>> computeRoutesToAdd(Collection<Route> routesAdded) {
Map<String, Collection<RawRoute>> computedRoutes = new HashMap<>();
routesAdded.forEach(route -> {
Collection<RawRoute> tempRoutes = computedRoutes.computeIfAbsent(
route.prefix().toString(), k -> Sets.newHashSet());
tempRoutes.add(new RawRoute(route));
});
return computedRoutes;
}
private Map<String, Collection<RawRoute>> computeRoutesToRemove(Collection<Route> routesRemoved) {
Map<String, Collection<RawRoute>> computedRoutes = new HashMap<>();
routesRemoved.forEach(route -> getRoutes(route.prefix())
.routes()
.stream()
.filter(r -> r.equals(route))
.findAny()
.ifPresent(matchRoute -> {
Collection<RawRoute> tempRoutes = computedRoutes.computeIfAbsent(
matchRoute.prefix().toString(), k -> Sets.newHashSet());
tempRoutes.add(new RawRoute(matchRoute));
}));
return computedRoutes;
}
private class RouteTableListener
implements MultimapEventListener<String, RawRoute> {

View File

@ -17,6 +17,7 @@
package org.onosproject.routeservice.store;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoNamespace;
@ -37,11 +38,13 @@ import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static org.onlab.util.Tools.groupedThreads;
@ -115,11 +118,27 @@ public class DistributedRouteStore extends AbstractStore<InternalRouteEvent, Rou
getDefaultRouteTable(route).update(route);
}
@Override
public void updateRoutes(Collection<Route> routes) {
Map<RouteTableId, Set<Route>> computedTables = computeRouteTablesFromRoutes(routes);
computedTables.forEach(
((routeTableId, routesToAdd) -> getDefaultRouteTable(routeTableId).update(routesToAdd))
);
}
@Override
public void removeRoute(Route route) {
getDefaultRouteTable(route).remove(route);
}
@Override
public void removeRoutes(Collection<Route> routes) {
Map<RouteTableId, Set<Route>> computedTables = computeRouteTablesFromRoutes(routes);
computedTables.forEach(
((routeTableId, routesToRemove) -> getDefaultRouteTable(routeTableId).remove(routesToRemove))
);
}
@Override
public void replaceRoute(Route route) {
getDefaultRouteTable(route).replace(route);
@ -145,6 +164,15 @@ public class DistributedRouteStore extends AbstractStore<InternalRouteEvent, Rou
return getDefaultRouteTable(ip).getRoutesForNextHop(ip);
}
@Override
public Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> nextHops) {
Map<RouteTableId, Set<IpAddress>> computedTables = computeRouteTablesFromIps(nextHops);
return computedTables.entrySet().stream()
.map(entry -> getDefaultRouteTable(entry.getKey()).getRoutesForNextHops(entry.getValue()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
@Override
public RouteSet getRoutes(IpPrefix prefix) {
return getDefaultRouteTable(prefix.address()).getRoutes(prefix);
@ -170,6 +198,30 @@ public class DistributedRouteStore extends AbstractStore<InternalRouteEvent, Rou
return routeTables.getOrDefault(routeTableId, EmptyRouteTable.instance());
}
private RouteTable getDefaultRouteTable(RouteTableId routeTableId) {
return routeTables.getOrDefault(routeTableId, EmptyRouteTable.instance());
}
private Map<RouteTableId, Set<Route>> computeRouteTablesFromRoutes(Collection<Route> routes) {
Map<RouteTableId, Set<Route>> computedTables = new HashMap<>();
routes.forEach(route -> {
RouteTableId routeTableId = (route.prefix().address().isIp4()) ? IPV4 : IPV6;
Set<Route> tempRoutes = computedTables.computeIfAbsent(routeTableId, k -> Sets.newHashSet());
tempRoutes.add(route);
});
return computedTables;
}
private Map<RouteTableId, Set<IpAddress>> computeRouteTablesFromIps(Collection<IpAddress> ipAddresses) {
Map<RouteTableId, Set<IpAddress>> computedTables = new HashMap<>();
ipAddresses.forEach(ipAddress -> {
RouteTableId routeTableId = (ipAddress.isIp4()) ? IPV4 : IPV6;
Set<IpAddress> tempIpAddresses = computedTables.computeIfAbsent(routeTableId, k -> Sets.newHashSet());
tempIpAddresses.add(ipAddress);
});
return computedTables;
}
private class InternalRouteStoreDelegate implements RouteStoreDelegate {
@Override
public void notify(InternalRouteEvent event) {

View File

@ -51,11 +51,21 @@ public final class EmptyRouteTable implements RouteTable {
}
@Override
public void update(Collection<Route> routes) {
}
@Override
public void remove(Route route) {
}
@Override
public void remove(Collection<Route> routes) {
}
@Override
public void replace(Route route) {
@ -81,6 +91,11 @@ public final class EmptyRouteTable implements RouteTable {
return Collections.emptyList();
}
@Override
public Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> nextHops) {
return Collections.emptyList();
}
@Override
public void shutdown() {

View File

@ -16,6 +16,7 @@
package org.onosproject.routeservice.store;
import com.google.common.collect.Sets;
import com.googlecode.concurrenttrees.common.KeyValuePair;
import com.googlecode.concurrenttrees.radix.node.concrete.DefaultByteArrayNodeFactory;
import com.googlecode.concurrenttrees.radixinverted.ConcurrentInvertedRadixTree;
@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@ -81,11 +83,27 @@ public class LocalRouteStore extends AbstractStore<InternalRouteEvent, RouteStor
getDefaultRouteTable(route).update(route);
}
@Override
public void updateRoutes(Collection<Route> routes) {
Map<RouteTableId, Set<Route>> computedTables = computeRouteTablesFromRoutes(routes);
computedTables.forEach(
((routeTableId, routesToAdd) -> getDefaultRouteTable(routeTableId).update(routesToAdd))
);
}
@Override
public void removeRoute(Route route) {
getDefaultRouteTable(route).remove(route);
}
@Override
public void removeRoutes(Collection<Route> routes) {
Map<RouteTableId, Set<Route>> computedTables = computeRouteTablesFromRoutes(routes);
computedTables.forEach(
((routeTableId, routesToRemove) -> getDefaultRouteTable(routeTableId).remove(routesToRemove))
);
}
@Override
public void replaceRoute(Route route) {
getDefaultRouteTable(route).replace(route);
@ -110,6 +128,15 @@ public class LocalRouteStore extends AbstractStore<InternalRouteEvent, RouteStor
return getDefaultRouteTable(ip).getRoutesForNextHop(ip);
}
@Override
public Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> nextHops) {
Map<RouteTableId, Set<IpAddress>> computedTables = computeRouteTablesFromIps(nextHops);
return computedTables.entrySet().stream()
.map(entry -> getDefaultRouteTable(entry.getKey()).getRoutesForNextHops(entry.getValue()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
@Override
public RouteSet getRoutes(IpPrefix prefix) {
return getDefaultRouteTable(prefix.address()).getRoutes(prefix);
@ -124,6 +151,30 @@ public class LocalRouteStore extends AbstractStore<InternalRouteEvent, RouteStor
return routeTables.get(routeTableId);
}
private RouteTable getDefaultRouteTable(RouteTableId routeTableId) {
return routeTables.get(routeTableId);
}
private Map<RouteTableId, Set<Route>> computeRouteTablesFromRoutes(Collection<Route> routes) {
Map<RouteTableId, Set<Route>> computedTables = new HashMap<>();
routes.forEach(route -> {
RouteTableId routeTableId = (route.prefix().address().isIp4()) ? IPV4 : IPV6;
Set<Route> tempRoutes = computedTables.computeIfAbsent(routeTableId, k -> Sets.newHashSet());
tempRoutes.add(route);
});
return computedTables;
}
private Map<RouteTableId, Set<IpAddress>> computeRouteTablesFromIps(Collection<IpAddress> ipAddresses) {
Map<RouteTableId, Set<IpAddress>> computedTables = new HashMap<>();
ipAddresses.forEach(ipAddress -> {
RouteTableId routeTableId = (ipAddress.isIp4()) ? IPV4 : IPV6;
Set<IpAddress> tempIpAddresses = computedTables.computeIfAbsent(routeTableId, k -> Sets.newHashSet());
tempIpAddresses.add(ipAddress);
});
return computedTables;
}
/**
* Route table into which routes can be placed.
*/
@ -162,6 +213,17 @@ public class LocalRouteStore extends AbstractStore<InternalRouteEvent, RouteStor
}
}
/**
* Adds or updates the routes in the route table.
*
* @param routes routes to update
*/
public void update(Collection<Route> routes) {
synchronized (this) {
routes.forEach(this::update);
}
}
/**
* Removes the route from the route table.
*
@ -179,6 +241,17 @@ public class LocalRouteStore extends AbstractStore<InternalRouteEvent, RouteStor
}
}
/**
* Adds or updates the routes in the route table.
*
* @param routes routes to update
*/
public void remove(Collection<Route> routes) {
synchronized (this) {
routes.forEach(this::remove);
}
}
/**
* Replace the route in the route table.
*/
@ -199,6 +272,28 @@ public class LocalRouteStore extends AbstractStore<InternalRouteEvent, RouteStor
.collect(Collectors.toSet());
}
/**
* Returns the routes pointing to the next hops.
*
* @param ips next hops IP addresses
* @return routes for the next hop
*/
public Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> ips) {
// First create a reduced snapshot of the store iterating one time the map
Map<IpPrefix, Set<Route>> filteredRouteStore = new HashMap<>();
routes.values().stream()
.filter(r -> ips.contains(r.nextHop()))
.forEach(r -> {
Collection<Route> tempRoutes = filteredRouteStore.computeIfAbsent(
r.prefix(), k -> Sets.newHashSet());
tempRoutes.add(r);
});
// Return the collection of the routeSet we have to resolve
return filteredRouteStore.entrySet().stream()
.map(entry -> new RouteSet(id, entry.getKey(), entry.getValue()))
.collect(Collectors.toSet());
}
public RouteSet getRoutes(IpPrefix prefix) {
Route route = routes.get(prefix);
if (route != null) {

View File

@ -140,11 +140,21 @@ public class RouteStoreImpl extends AbstractStore<InternalRouteEvent, RouteStore
currentRouteStore.updateRoute(route);
}
@Override
public void updateRoutes(Collection<Route> routes) {
currentRouteStore.updateRoutes(routes);
}
@Override
public void removeRoute(Route route) {
currentRouteStore.removeRoute(route);
}
@Override
public void removeRoutes(Collection<Route> routes) {
currentRouteStore.removeRoutes(routes);
}
@Override
public void replaceRoute(Route route) {
currentRouteStore.replaceRoute(route);
@ -165,6 +175,11 @@ public class RouteStoreImpl extends AbstractStore<InternalRouteEvent, RouteStore
return currentRouteStore.getRoutesForNextHop(ip);
}
@Override
public Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> ips) {
return currentRouteStore.getRoutesForNextHops(ips);
}
@Override
public RouteSet getRoutes(IpPrefix prefix) {
return currentRouteStore.getRoutes(prefix);

View File

@ -36,6 +36,13 @@ public interface RouteTable {
*/
void update(Route route);
/**
* Adds the routes to the route table.
*
* @param routes routes
*/
void update(Collection<Route> routes);
/**
* Removes a route from the route table.
*
@ -43,6 +50,13 @@ public interface RouteTable {
*/
void remove(Route route);
/**
* Removes the routes from the route table.
*
* @param routes routes
*/
void remove(Collection<Route> routes);
/**
* Replaces a route in the route table.
*
@ -80,6 +94,14 @@ public interface RouteTable {
*/
Collection<Route> getRoutesForNextHop(IpAddress nextHop);
/**
* Returns all routes that have the given next hops.
*
* @param nextHops next hops IP addresses
* @return collection of routes sets
*/
Collection<RouteSet> getRoutesForNextHops(Collection<IpAddress> nextHops);
/**
* Releases route table resources held locally.
*/

View File

@ -19,7 +19,7 @@ package org.onosproject.routeservice.impl;
import java.util.Collection;
import java.util.Collections;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Test;
import org.onlab.packet.Ip4Address;
@ -30,6 +30,7 @@ import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.PredictableExecutor;
import org.onosproject.routeservice.ResolvedRoute;
import org.onosproject.routeservice.Route;
import org.onosproject.routeservice.RouteEvent;
@ -65,6 +66,7 @@ import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.onlab.util.Tools.groupedThreads;
/**
* Unit tests for the route manager.
@ -104,7 +106,6 @@ public class RouteManagerTest {
routeManager = new TestRouteManager();
routeManager.hostService = hostService;
routeManager.hostEventExecutor = MoreExecutors.directExecutor();
routeManager.clusterService = createNiceMock(ClusterService.class);
replay(routeManager.clusterService);
@ -130,6 +131,11 @@ public class RouteManagerTest {
routeManager.routeStore = routeStore;
routeManager.activate();
routeManager.hostEventExecutors = new PredictableExecutor(
0, groupedThreads("onos/route-manager-test", "event-host-%d"), true);
routeManager.routeResolver.routeResolvers = new PredictableExecutor(
0, groupedThreads("onos/route-resolver-test", "route-resolver-%d"), true);
routeManager.addListener(routeListener);
}
@ -142,16 +148,16 @@ public class RouteManagerTest {
hostService.addListener(anyObject(HostListener.class));
expectLastCall().andDelegateTo(new TestHostService()).anyTimes();
Host host1 = createHost(MAC1, V4_NEXT_HOP1);
Host host1 = createHost(MAC1, Collections.singletonList(V4_NEXT_HOP1));
expectHost(host1);
Host host2 = createHost(MAC2, V4_NEXT_HOP2);
Host host2 = createHost(MAC2, Collections.singletonList(V4_NEXT_HOP2));
expectHost(host2);
Host host3 = createHost(MAC3, V6_NEXT_HOP1);
Host host3 = createHost(MAC3, Collections.singletonList(V6_NEXT_HOP1));
expectHost(host3);
Host host4 = createHost(MAC4, V6_NEXT_HOP2);
Host host4 = createHost(MAC4, Collections.singletonList(V6_NEXT_HOP2));
expectHost(host4);
replay(hostService);
@ -176,13 +182,13 @@ public class RouteManagerTest {
* Creates a host with the given parameters.
*
* @param macAddress MAC address
* @param ipAddress IP address
* @param ipAddresses IP addresses
* @return new host
*/
private Host createHost(MacAddress macAddress, IpAddress ipAddress) {
private Host createHost(MacAddress macAddress, Collection<IpAddress> ipAddresses) {
return new DefaultHost(ProviderId.NONE, HostId.NONE, macAddress,
VlanId.NONE, new HostLocation(CP1, 1),
Sets.newHashSet(ipAddress));
Sets.newHashSet(ipAddresses));
}
/**
@ -343,12 +349,19 @@ public class RouteManagerTest {
@Test
public void testAsyncRouteAdd() {
Route route = new Route(Route.Source.STATIC, V4_PREFIX1, V4_NEXT_HOP1);
// 2nd route for the same nexthop
Route route2 = new Route(Route.Source.STATIC, V4_PREFIX2, V4_NEXT_HOP2);
// 3rd route with no valid nexthop
Route route3 = new Route(Route.Source.STATIC, V6_PREFIX1, V6_NEXT_HOP1);
// Host service will reply with no hosts when asked
reset(hostService);
expect(hostService.getHostsByIp(anyObject(IpAddress.class))).andReturn(
Collections.emptySet()).anyTimes();
hostService.startMonitoringIp(V4_NEXT_HOP1);
hostService.startMonitoringIp(V4_NEXT_HOP2);
hostService.startMonitoringIp(V6_NEXT_HOP1);
expectLastCall().anyTimes();
replay(hostService);
@ -356,7 +369,7 @@ public class RouteManagerTest {
// the host is not known
replay(routeListener);
routeManager.update(Collections.singleton(route));
routeManager.update(Lists.newArrayList(route, route2, route3));
verify(routeListener);
@ -365,15 +378,21 @@ public class RouteManagerTest {
ResolvedRoute resolvedRoute = new ResolvedRoute(route, MAC1);
routeListener.event(event(RouteEvent.Type.ROUTE_ADDED, resolvedRoute, null,
Sets.newHashSet(resolvedRoute), null));
ResolvedRoute resolvedRoute2 = new ResolvedRoute(route2, MAC1);
routeListener.event(event(RouteEvent.Type.ROUTE_ADDED, resolvedRoute2, null,
Sets.newHashSet(resolvedRoute2), null));
replay(routeListener);
Host host = createHost(MAC1, V4_NEXT_HOP1);
Host host = createHost(MAC1, Lists.newArrayList(V4_NEXT_HOP1, V4_NEXT_HOP2));
// Set up the host service with a host
reset(hostService);
expect(hostService.getHostsByIp(V4_NEXT_HOP1)).andReturn(
Collections.singleton(host)).anyTimes();
hostService.startMonitoringIp(V4_NEXT_HOP1);
expect(hostService.getHostsByIp(V4_NEXT_HOP2)).andReturn(
Collections.singleton(host)).anyTimes();
hostService.startMonitoringIp(V4_NEXT_HOP2);
expectLastCall().anyTimes();
replay(hostService);