diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DefaultRouteTable.java b/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DefaultRouteTable.java index e9537d992f..1231abec07 100644 --- a/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DefaultRouteTable.java +++ b/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DefaultRouteTable.java @@ -26,6 +26,7 @@ import org.onosproject.incubator.net.routing.RouteStoreDelegate; import org.onosproject.incubator.net.routing.RouteTableId; import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.DistributedPrimitive; import org.onosproject.store.service.MapEvent; import org.onosproject.store.service.MapEventListener; import org.onosproject.store.service.Serializer; @@ -36,6 +37,8 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkNotNull; @@ -48,27 +51,43 @@ public class DefaultRouteTable implements RouteTable { private final RouteTableId id; private final ConsistentMap> routes; private final RouteStoreDelegate delegate; + private final ExecutorService executor; private final RouteTableListener listener = new RouteTableListener(); + private final Consumer statusChangeListener; + /** * Creates a new route table. * * @param id route table ID * @param delegate route store delegate to notify of events * @param storageService storage service + * @param executor executor service */ public DefaultRouteTable(RouteTableId id, RouteStoreDelegate delegate, - StorageService storageService) { + StorageService storageService, ExecutorService executor) { this.delegate = checkNotNull(delegate); this.id = checkNotNull(id); this.routes = buildRouteMap(checkNotNull(storageService)); + this.executor = checkNotNull(executor); + statusChangeListener = status -> { + if (status.equals(DistributedPrimitive.Status.ACTIVE)) { + executor.execute(this::notifyExistingRoutes); + } + }; + routes.addStatusChangeListener(statusChangeListener); + + notifyExistingRoutes(); + + routes.addListener(listener); + } + + private void notifyExistingRoutes() { routes.entrySet().stream() .map(e -> new InternalRouteEvent(InternalRouteEvent.Type.ROUTE_ADDED, new RouteSet(id, e.getKey(), e.getValue().value()))) .forEach(delegate::notify); - - routes.addListener(listener); } private ConsistentMap> buildRouteMap(StorageService storageService) { @@ -91,6 +110,7 @@ public class DefaultRouteTable implements RouteTable { @Override public void shutdown() { + routes.removeStatusChangeListener(statusChangeListener); routes.removeListener(listener); } @@ -186,4 +206,5 @@ public class DefaultRouteTable implements RouteTable { } } } + } diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DistributedRouteStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DistributedRouteStore.java index 6901041b3e..34e281a102 100644 --- a/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DistributedRouteStore.java +++ b/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/DistributedRouteStore.java @@ -44,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.onlab.util.Tools.groupedThreads; + /** * Route store based on distributed storage. */ @@ -75,7 +77,7 @@ public class DistributedRouteStore extends AbstractStore(); - executor = Executors.newSingleThreadExecutor(); + executor = Executors.newSingleThreadExecutor(groupedThreads("onos/route", "store", log)); KryoNamespace masterRouteTableSerializer = KryoNamespace.newBuilder() .register(RouteTableId.class) @@ -173,7 +175,7 @@ public class DistributedRouteStore extends AbstractStore new DefaultRouteTable(id, ourDelegate, storageService)); + routeTables.computeIfAbsent(tableId, id -> new DefaultRouteTable(id, ourDelegate, storageService, executor)); } private void destroyRouteTable(RouteTableId tableId) {