diff --git a/incubator/net/BUCK b/incubator/net/BUCK index 8801df369e..5d12d771f7 100644 --- a/incubator/net/BUCK +++ b/incubator/net/BUCK @@ -5,6 +5,7 @@ COMPILE_DEPS = [ '//incubator/store:onos-incubator-store', '//utils/rest:onlab-rest', '//lib:concurrent-trees', + '//core/store/serializers:onos-core-serializers', ] TEST_DEPS = [ @@ -13,7 +14,6 @@ TEST_DEPS = [ '//lib:TEST_ADAPTERS', '//core/api:onos-api-tests', '//core/common:onos-core-common-tests', - '//core/store/serializers:onos-core-serializers', ] osgi_jar_with_tests ( diff --git a/incubator/net/pom.xml b/incubator/net/pom.xml index da78b6f5a8..cc5846ed49 100644 --- a/incubator/net/pom.xml +++ b/incubator/net/pom.xml @@ -106,6 +106,12 @@ concurrent-trees 2.6.0 + + + org.onosproject + onos-core-serializers + ${project.version} + diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/routing/impl/RouteManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/routing/impl/RouteManager.java index 6ab1f03a22..267bb69765 100644 --- a/incubator/net/src/main/java/org/onosproject/incubator/net/routing/impl/RouteManager.java +++ b/incubator/net/src/main/java/org/onosproject/incubator/net/routing/impl/RouteManager.java @@ -24,6 +24,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; import org.onlab.packet.IpAddress; import org.onlab.packet.IpPrefix; +import org.onosproject.cluster.ClusterService; import org.onosproject.incubator.net.routing.InternalRouteEvent; import org.onosproject.incubator.net.routing.NextHop; import org.onosproject.incubator.net.routing.ResolvedRoute; @@ -41,6 +42,7 @@ import org.onosproject.net.Host; import org.onosproject.net.host.HostEvent; import org.onosproject.net.host.HostListener; import org.onosproject.net.host.HostService; +import org.onosproject.store.service.StorageService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,8 +83,16 @@ public class RouteManager implements RouteService, RouteAdminService { @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected HostService hostService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected ClusterService clusterService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected StorageService storageService; + private ResolvedRouteStore resolvedRouteStore; + private RouteMonitor routeMonitor; + @GuardedBy(value = "this") private Map listeners = new HashMap<>(); @@ -90,6 +100,7 @@ public class RouteManager implements RouteService, RouteAdminService { @Activate protected void activate() { + routeMonitor = new RouteMonitor(this, clusterService, storageService); threadFactory = groupedThreads("onos/route", "listener-%d", log); resolvedRouteStore = new DefaultResolvedRouteStore(); @@ -104,6 +115,7 @@ public class RouteManager implements RouteService, RouteAdminService { @Deactivate protected void deactivate() { + routeMonitor.shutdown(); listeners.values().forEach(ListenerQueue::stop); routeStore.unsetDelegate(delegate); diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/routing/impl/RouteMonitor.java b/incubator/net/src/main/java/org/onosproject/incubator/net/routing/impl/RouteMonitor.java new file mode 100644 index 0000000000..639a6d4518 --- /dev/null +++ b/incubator/net/src/main/java/org/onosproject/incubator/net/routing/impl/RouteMonitor.java @@ -0,0 +1,148 @@ +/* + * Copyright 2017-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onosproject.incubator.net.routing.impl; + +import org.onosproject.cluster.ClusterEvent; +import org.onosproject.cluster.ClusterEventListener; +import org.onosproject.cluster.ClusterService; +import org.onosproject.cluster.NodeId; +import org.onosproject.incubator.net.routing.ResolvedRoute; +import org.onosproject.incubator.net.routing.Route; +import org.onosproject.incubator.net.routing.RouteAdminService; +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.DistributedPrimitive; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.WorkQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; + +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static org.onlab.util.Tools.groupedThreads; + +/** + * Monitors cluster nodes and removes routes if a cluster node becomes unavailable. + */ +public class RouteMonitor { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private static final String TOPIC = "route-reaper"; + private static final int NUM_PARALLEL_JOBS = 10; + + private RouteAdminService routeService; + private final ClusterService clusterService; + private StorageService storageService; + + private WorkQueue queue; + + private final InternalClusterListener clusterListener = new InternalClusterListener(); + + private final ScheduledExecutorService reaperExecutor = + newSingleThreadScheduledExecutor(groupedThreads("route/reaper", "", log)); + + /** + * Creates a new route monitor. + * + * @param routeService route service + * @param clusterService cluster service + * @param storageService storage service + */ + public RouteMonitor(RouteAdminService routeService, + ClusterService clusterService, StorageService storageService) { + this.routeService = routeService; + this.clusterService = clusterService; + this.storageService = storageService; + + clusterService.addListener(clusterListener); + + queue = storageService.getWorkQueue(TOPIC, Serializer.using(KryoNamespaces.API)); + queue.addStatusChangeListener(this::statusChange); + + startProcessing(); + } + + /** + * Shuts down the route monitor. + */ + public void shutdown() { + stopProcessing(); + clusterService.removeListener(clusterListener); + } + + private void statusChange(DistributedPrimitive.Status status) { + switch (status) { + case ACTIVE: + startProcessing(); + break; + case SUSPENDED: + stopProcessing(); + break; + case INACTIVE: + default: + break; + } + } + + private void startProcessing() { + queue.registerTaskProcessor(this::cleanRoutes, NUM_PARALLEL_JOBS, reaperExecutor); + } + + private void stopProcessing() { + queue.stopProcessing(); + } + + private void cleanRoutes(NodeId node) { + log.info("Cleaning routes from unavailable node {}", node); + + Collection routes = routeService.getRouteTables().stream() + .flatMap(id -> routeService.getRoutes(id).stream()) + .flatMap(route -> route.allRoutes().stream()) + .map(ResolvedRoute::route) + .filter(r -> r.sourceNode().equals(node)) + .collect(Collectors.toList()); + + log.debug("Withdrawing routes: {}", routes); + + routeService.withdraw(routes); + } + + private class InternalClusterListener implements ClusterEventListener { + + @Override + public void event(ClusterEvent event) { + switch (event.type()) { + case INSTANCE_DEACTIVATED: + NodeId id = event.subject().id(); + log.info("Node {} deactivated", id); + queue.addOne(id); + break; + case INSTANCE_ADDED: + case INSTANCE_REMOVED: + case INSTANCE_ACTIVATED: + case INSTANCE_READY: + default: + break; + } + } + } + +} diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/routing/impl/RouteManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/routing/impl/RouteManagerTest.java index 370b83b65b..9723a86264 100644 --- a/incubator/net/src/test/java/org/onosproject/incubator/net/routing/impl/RouteManagerTest.java +++ b/incubator/net/src/test/java/org/onosproject/incubator/net/routing/impl/RouteManagerTest.java @@ -27,6 +27,7 @@ import org.onlab.packet.IpAddress; import org.onlab.packet.IpPrefix; import org.onlab.packet.MacAddress; import org.onlab.packet.VlanId; +import org.onosproject.cluster.ClusterService; import org.onosproject.incubator.net.routing.ResolvedRoute; import org.onosproject.incubator.net.routing.Route; import org.onosproject.incubator.net.routing.RouteEvent; @@ -44,11 +45,15 @@ import org.onosproject.net.host.HostListener; import org.onosproject.net.host.HostService; import org.onosproject.net.host.HostServiceAdapter; import org.onosproject.net.provider.ProviderId; +import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.WorkQueue; import java.util.Collections; import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.createNiceMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; @@ -94,6 +99,13 @@ public class RouteManagerTest { routeManager = new TestRouteManager(); routeManager.hostService = hostService; + routeManager.clusterService = createNiceMock(ClusterService.class); + replay(routeManager.clusterService); + routeManager.storageService = createNiceMock(StorageService.class); + expect(routeManager.storageService.getWorkQueue(anyString(), anyObject())) + .andReturn(createNiceMock(WorkQueue.class)); + replay(routeManager.storageService); + LocalRouteStore routeStore = new LocalRouteStore(); routeStore.activate(); routeManager.routeStore = routeStore;