Clean up routes when the ONOS node they were sourced from goes down

Change-Id: I1b70e087b64404bf92e6251d18f3c85791e30583
This commit is contained in:
Jonathan Hart 2017-05-25 14:21:44 -07:00 committed by Thomas Vachuska
parent f42a2ccc39
commit d4be52fdcf
5 changed files with 179 additions and 1 deletions

View File

@ -5,6 +5,7 @@ COMPILE_DEPS = [
'//incubator/store:onos-incubator-store',
'//utils/rest:onlab-rest',
'//lib:concurrent-trees',
'//core/store/serializers:onos-core-serializers',
]
TEST_DEPS = [
@ -13,7 +14,6 @@ TEST_DEPS = [
'//lib:TEST_ADAPTERS',
'//core/api:onos-api-tests',
'//core/common:onos-core-common-tests',
'//core/store/serializers:onos-core-serializers',
]
osgi_jar_with_tests (

View File

@ -106,6 +106,12 @@
<artifactId>concurrent-trees</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-core-serializers</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>

View File

@ -24,6 +24,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onosproject.cluster.ClusterService;
import org.onosproject.incubator.net.routing.InternalRouteEvent;
import org.onosproject.incubator.net.routing.NextHop;
import org.onosproject.incubator.net.routing.ResolvedRoute;
@ -41,6 +42,7 @@ import org.onosproject.net.Host;
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostListener;
import org.onosproject.net.host.HostService;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -81,8 +83,16 @@ public class RouteManager implements RouteService, RouteAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
private ResolvedRouteStore resolvedRouteStore;
private RouteMonitor routeMonitor;
@GuardedBy(value = "this")
private Map<RouteListener, ListenerQueue> listeners = new HashMap<>();
@ -90,6 +100,7 @@ public class RouteManager implements RouteService, RouteAdminService {
@Activate
protected void activate() {
routeMonitor = new RouteMonitor(this, clusterService, storageService);
threadFactory = groupedThreads("onos/route", "listener-%d", log);
resolvedRouteStore = new DefaultResolvedRouteStore();
@ -104,6 +115,7 @@ public class RouteManager implements RouteService, RouteAdminService {
@Deactivate
protected void deactivate() {
routeMonitor.shutdown();
listeners.values().forEach(ListenerQueue::stop);
routeStore.unsetDelegate(delegate);

View File

@ -0,0 +1,148 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.net.routing.impl;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.incubator.net.routing.ResolvedRoute;
import org.onosproject.incubator.net.routing.Route;
import org.onosproject.incubator.net.routing.RouteAdminService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WorkQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
/**
* Monitors cluster nodes and removes routes if a cluster node becomes unavailable.
*/
public class RouteMonitor {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private static final String TOPIC = "route-reaper";
private static final int NUM_PARALLEL_JOBS = 10;
private RouteAdminService routeService;
private final ClusterService clusterService;
private StorageService storageService;
private WorkQueue<NodeId> queue;
private final InternalClusterListener clusterListener = new InternalClusterListener();
private final ScheduledExecutorService reaperExecutor =
newSingleThreadScheduledExecutor(groupedThreads("route/reaper", "", log));
/**
* Creates a new route monitor.
*
* @param routeService route service
* @param clusterService cluster service
* @param storageService storage service
*/
public RouteMonitor(RouteAdminService routeService,
ClusterService clusterService, StorageService storageService) {
this.routeService = routeService;
this.clusterService = clusterService;
this.storageService = storageService;
clusterService.addListener(clusterListener);
queue = storageService.getWorkQueue(TOPIC, Serializer.using(KryoNamespaces.API));
queue.addStatusChangeListener(this::statusChange);
startProcessing();
}
/**
* Shuts down the route monitor.
*/
public void shutdown() {
stopProcessing();
clusterService.removeListener(clusterListener);
}
private void statusChange(DistributedPrimitive.Status status) {
switch (status) {
case ACTIVE:
startProcessing();
break;
case SUSPENDED:
stopProcessing();
break;
case INACTIVE:
default:
break;
}
}
private void startProcessing() {
queue.registerTaskProcessor(this::cleanRoutes, NUM_PARALLEL_JOBS, reaperExecutor);
}
private void stopProcessing() {
queue.stopProcessing();
}
private void cleanRoutes(NodeId node) {
log.info("Cleaning routes from unavailable node {}", node);
Collection<Route> routes = routeService.getRouteTables().stream()
.flatMap(id -> routeService.getRoutes(id).stream())
.flatMap(route -> route.allRoutes().stream())
.map(ResolvedRoute::route)
.filter(r -> r.sourceNode().equals(node))
.collect(Collectors.toList());
log.debug("Withdrawing routes: {}", routes);
routeService.withdraw(routes);
}
private class InternalClusterListener implements ClusterEventListener {
@Override
public void event(ClusterEvent event) {
switch (event.type()) {
case INSTANCE_DEACTIVATED:
NodeId id = event.subject().id();
log.info("Node {} deactivated", id);
queue.addOne(id);
break;
case INSTANCE_ADDED:
case INSTANCE_REMOVED:
case INSTANCE_ACTIVATED:
case INSTANCE_READY:
default:
break;
}
}
}
}

View File

@ -27,6 +27,7 @@ import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onosproject.cluster.ClusterService;
import org.onosproject.incubator.net.routing.ResolvedRoute;
import org.onosproject.incubator.net.routing.Route;
import org.onosproject.incubator.net.routing.RouteEvent;
@ -44,11 +45,15 @@ import org.onosproject.net.host.HostListener;
import org.onosproject.net.host.HostService;
import org.onosproject.net.host.HostServiceAdapter;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WorkQueue;
import java.util.Collections;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
@ -94,6 +99,13 @@ public class RouteManagerTest {
routeManager = new TestRouteManager();
routeManager.hostService = hostService;
routeManager.clusterService = createNiceMock(ClusterService.class);
replay(routeManager.clusterService);
routeManager.storageService = createNiceMock(StorageService.class);
expect(routeManager.storageService.getWorkQueue(anyString(), anyObject()))
.andReturn(createNiceMock(WorkQueue.class));
replay(routeManager.storageService);
LocalRouteStore routeStore = new LocalRouteStore();
routeStore.activate();
routeManager.routeStore = routeStore;