diff --git a/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java b/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java index 62bdfb2af4..a44d53931e 100644 --- a/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java +++ b/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/GrpcRemoteServiceServer.java @@ -16,6 +16,7 @@ package org.onosproject.incubator.rpc.grpc; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.concurrent.Executors.newScheduledThreadPool; import static java.util.stream.Collectors.toList; import static org.onosproject.incubator.protobuf.net.ProtobufUtils.translate; import static org.onosproject.net.DeviceId.deviceId; @@ -25,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -36,6 +38,7 @@ import org.apache.felix.scr.annotations.Modified; import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; +import org.onlab.util.Tools; import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc; import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc; import org.onosproject.grpc.net.device.DeviceService.DeviceConnected; @@ -110,8 +113,11 @@ public class GrpcRemoteServiceServer { private final Map linkProviderServices = Maps.newConcurrentMap(); private final Map linkProviders = Maps.newConcurrentMap(); + private ScheduledExecutorService executor; + @Activate protected void activate(ComponentContext context) throws IOException { + executor = newScheduledThreadPool(1, Tools.groupedThreads("grpc", "%d", log)); modified(context); log.debug("Server starting on {}", listenPort); @@ -130,6 +136,12 @@ public class GrpcRemoteServiceServer { @Deactivate protected void deactivate() { + executor.shutdown(); + try { + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } registeredProviders.stream() .forEach(deviceProviderRegistry::unregister); @@ -182,6 +194,10 @@ public class GrpcRemoteServiceServer { return linkProviderServices.computeIfAbsent(scheme, this::registerStubLinkProvider); } + protected ScheduledExecutorService getSharedExecutor() { + return executor; + } + // RPC Server-side code // RPC session Factory /** diff --git a/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/LinkProviderServiceServerProxy.java b/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/LinkProviderServiceServerProxy.java index 608f6e3942..435c650c78 100644 --- a/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/LinkProviderServiceServerProxy.java +++ b/incubator/rpc-grpc/src/main/java/org/onosproject/incubator/rpc/grpc/LinkProviderServiceServerProxy.java @@ -17,8 +17,14 @@ package org.onosproject.incubator.rpc.grpc; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.cache.RemovalListeners.asynchronous; import static org.onosproject.net.DeviceId.deviceId; +import static org.onosproject.net.LinkKey.linkKey; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.tuple.Pair; import org.onosproject.grpc.net.Link.ConnectPoint.ElementIdCase; import org.onosproject.grpc.net.Link.LinkType; import org.onosproject.grpc.net.link.LinkProviderServiceRpcGrpc.LinkProviderServiceRpc; @@ -29,6 +35,7 @@ import org.onosproject.incubator.protobuf.net.ProtobufUtils; import org.onosproject.net.ConnectPoint; import org.onosproject.net.DeviceId; import org.onosproject.net.Link; +import org.onosproject.net.LinkKey; import org.onosproject.net.PortNumber; import org.onosproject.net.SparseAnnotations; import org.onosproject.net.link.DefaultLinkDescription; @@ -38,9 +45,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.api.client.repackaged.com.google.common.annotations.Beta; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalNotification; import io.grpc.stub.StreamObserver; +// Only single instance will be created and bound to gRPC LinkProviderService /** * Server-side implementation of gRPC version of LinkProviderService. */ @@ -48,15 +59,39 @@ import io.grpc.stub.StreamObserver; final class LinkProviderServiceServerProxy implements LinkProviderServiceRpc { + /** + * Silence time in seconds, until link gets treated as vanished. + */ + private static final int EVICT_LIMIT = 3 * 3; + private final Logger log = LoggerFactory.getLogger(getClass()); private final GrpcRemoteServiceServer server; // TODO implement aging mechanism to automatically remove // stale links reported by dead client, etc. + /** + * Evicting Cache to track last seen time. + */ + private final Cache, LinkDescription> lastSeen; LinkProviderServiceServerProxy(GrpcRemoteServiceServer server) { this.server = checkNotNull(server); + ScheduledExecutorService executor = server.getSharedExecutor(); + lastSeen = CacheBuilder.newBuilder() + .expireAfterWrite(EVICT_LIMIT, TimeUnit.SECONDS) + .removalListener(asynchronous(this::onRemove, executor)) + .build(); + + executor.scheduleWithFixedDelay(lastSeen::cleanUp, + EVICT_LIMIT, EVICT_LIMIT, TimeUnit.SECONDS); + } + + private void onRemove(RemovalNotification, LinkDescription> n) { + if (n.wasEvicted()) { + getLinkProviderServiceFor(n.getKey().getLeft()) + .linkVanished(n.getValue()); + } } /** @@ -92,6 +127,7 @@ final class LinkProviderServiceServerProxy LinkDescription linkDescription = translate(request.getLinkDescription()); linkProviderService.linkDetected(linkDescription); + lastSeen.put(Pair.of(scheme, linkKey(linkDescription)), linkDescription); } @Override @@ -123,6 +159,7 @@ final class LinkProviderServiceServerProxy case LINK_DESCRIPTION: LinkDescription desc = translate(request.getLinkDescription()); getLinkProviderServiceFor(scheme).linkVanished(desc); + lastSeen.invalidate(Pair.of(scheme, linkKey(desc))); break; case SUBJECT_NOT_SET: default: