Add StatusChangedListener for maps in RouteTables

Change-Id: I02c8558567ac416ea62fea79f856c331da7282ad
This commit is contained in:
Jonathan Hart 2017-05-25 14:23:01 -07:00
parent dbee233e7f
commit 1f67d28601
2 changed files with 28 additions and 5 deletions

View File

@ -26,6 +26,7 @@ import org.onosproject.incubator.net.routing.RouteStoreDelegate;
import org.onosproject.incubator.net.routing.RouteTableId;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
@ -36,6 +37,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@ -48,27 +51,43 @@ public class DefaultRouteTable implements RouteTable {
private final RouteTableId id;
private final ConsistentMap<IpPrefix, Set<Route>> routes;
private final RouteStoreDelegate delegate;
private final ExecutorService executor;
private final RouteTableListener listener = new RouteTableListener();
private final Consumer<DistributedPrimitive.Status> statusChangeListener;
/**
* Creates a new route table.
*
* @param id route table ID
* @param delegate route store delegate to notify of events
* @param storageService storage service
* @param executor executor service
*/
public DefaultRouteTable(RouteTableId id, RouteStoreDelegate delegate,
StorageService storageService) {
StorageService storageService, ExecutorService executor) {
this.delegate = checkNotNull(delegate);
this.id = checkNotNull(id);
this.routes = buildRouteMap(checkNotNull(storageService));
this.executor = checkNotNull(executor);
statusChangeListener = status -> {
if (status.equals(DistributedPrimitive.Status.ACTIVE)) {
executor.execute(this::notifyExistingRoutes);
}
};
routes.addStatusChangeListener(statusChangeListener);
notifyExistingRoutes();
routes.addListener(listener);
}
private void notifyExistingRoutes() {
routes.entrySet().stream()
.map(e -> new InternalRouteEvent(InternalRouteEvent.Type.ROUTE_ADDED,
new RouteSet(id, e.getKey(), e.getValue().value())))
.forEach(delegate::notify);
routes.addListener(listener);
}
private ConsistentMap<IpPrefix, Set<Route>> buildRouteMap(StorageService storageService) {
@ -91,6 +110,7 @@ public class DefaultRouteTable implements RouteTable {
@Override
public void shutdown() {
routes.removeStatusChangeListener(statusChangeListener);
routes.removeListener(listener);
}
@ -186,4 +206,5 @@ public class DefaultRouteTable implements RouteTable {
}
}
}
}

View File

@ -44,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.onlab.util.Tools.groupedThreads;
/**
* Route store based on distributed storage.
*/
@ -75,7 +77,7 @@ public class DistributedRouteStore extends AbstractStore<InternalRouteEvent, Rou
*/
public void activate() {
routeTables = new ConcurrentHashMap<>();
executor = Executors.newSingleThreadExecutor();
executor = Executors.newSingleThreadExecutor(groupedThreads("onos/route", "store", log));
KryoNamespace masterRouteTableSerializer = KryoNamespace.newBuilder()
.register(RouteTableId.class)
@ -173,7 +175,7 @@ public class DistributedRouteStore extends AbstractStore<InternalRouteEvent, Rou
}
private void createRouteTable(RouteTableId tableId) {
routeTables.computeIfAbsent(tableId, id -> new DefaultRouteTable(id, ourDelegate, storageService));
routeTables.computeIfAbsent(tableId, id -> new DefaultRouteTable(id, ourDelegate, storageService, executor));
}
private void destroyRouteTable(RouteTableId tableId) {