From e616d7553edb6db5a68b587721af76f01f36615c Mon Sep 17 00:00:00 2001 From: Yi Tseng Date: Tue, 27 Nov 2018 10:53:27 -0800 Subject: [PATCH] Add gNMI device state subscriber Change-Id: I20cb5e130f4e416bf8678aab2e5268faf24ad06b --- .../org/onosproject/gnmi/api/GnmiClient.java | 20 +- .../onosproject/gnmi/api/GnmiController.java | 4 +- .../org/onosproject/gnmi/api/GnmiEvent.java | 9 +- .../org/onosproject/gnmi/api/GnmiUpdate.java | 77 +++++ .../org/onosproject/gnmi/api/GnmiUtils.java | 55 ++++ .../onosproject/gnmi/ctl/GnmiClientImpl.java | 148 ++++++++- .../gnmi/ctl/GnmiControllerImpl.java | 12 +- providers/general/BUILD | 2 +- providers/general/device/BUILD | 7 +- .../device/impl/GeneralDeviceProvider.java | 16 + .../impl/GnmiDeviceStateSubscriber.java | 281 ++++++++++++++++++ 11 files changed, 615 insertions(+), 16 deletions(-) create mode 100644 protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUpdate.java create mode 100644 protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUtils.java create mode 100644 providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java index 5beb238e58..6e65dd358c 100644 --- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java +++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java @@ -22,6 +22,7 @@ import gnmi.Gnmi.GetRequest; import gnmi.Gnmi.GetResponse; import gnmi.Gnmi.SetRequest; import gnmi.Gnmi.SetResponse; +import gnmi.Gnmi.SubscribeRequest; import org.onosproject.grpc.api.GrpcClient; import java.util.concurrent.CompletableFuture; @@ -56,12 +57,23 @@ public interface GnmiClient extends GrpcClient { CompletableFuture set(SetRequest request); /** - * Check weather the gNMI service is available or not by sending a - * dummy get request message. + * Subscribes to a given specific gNMI path. + * + * @param request the subscribe request + * @return true if subscribe successfully; false otherwise + */ + boolean subscribe(SubscribeRequest request); + + /** + * Terminates the subscription channel of this device. + */ + void terminateSubscriptionChannel(); + + /** + * Check weather the gNMI service is available or not by sending a dummy get + * request message. * * @return true if gNMI service available; false otherwise */ CompletableFuture isServiceAvailable(); - - // TODO: Support gNMI subscription } diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java index f4964ed666..b0e0071adc 100644 --- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java +++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java @@ -17,6 +17,7 @@ package org.onosproject.gnmi.api; import com.google.common.annotations.Beta; +import org.onosproject.event.ListenerService; import org.onosproject.grpc.api.GrpcClientController; /** @@ -24,5 +25,6 @@ import org.onosproject.grpc.api.GrpcClientController; */ @Beta public interface GnmiController - extends GrpcClientController { + extends GrpcClientController, + ListenerService { } diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java index 5129926c4a..84031d0393 100644 --- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java +++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java @@ -32,15 +32,10 @@ public final class GnmiEvent extends AbstractEvent { + pathStringBuilder.append("/").append(elem.getName()); + if (elem.getKeyCount() > 0) { + pathStringBuilder.append("["); + List keys = elem.getKeyMap().entrySet().stream() + .map(entry -> entry.getKey() + "=" + entry.getValue()) + .collect(Collectors.toList()); + pathStringBuilder.append(String.join(", ", keys)); + pathStringBuilder.append("]"); + } + }); + return pathStringBuilder.toString(); + } +} diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java index 22b226b185..117b27e8a8 100644 --- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java +++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java @@ -23,17 +23,26 @@ import gnmi.Gnmi.Path; import gnmi.Gnmi.PathElem; import gnmi.Gnmi.SetRequest; import gnmi.Gnmi.SetResponse; +import gnmi.Gnmi.SubscribeRequest; +import gnmi.Gnmi.SubscribeResponse; import gnmi.gNMIGrpc; import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.StreamObserver; import org.onosproject.gnmi.api.GnmiClient; import org.onosproject.gnmi.api.GnmiClientKey; +import org.onosproject.gnmi.api.GnmiEvent; +import org.onosproject.gnmi.api.GnmiUpdate; import org.onosproject.grpc.ctl.AbstractGrpcClient; import org.slf4j.Logger; +import java.net.ConnectException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import static java.lang.String.format; import static org.slf4j.LoggerFactory.getLogger; /** @@ -45,10 +54,14 @@ public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient { private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build(); private final Logger log = getLogger(getClass()); private final gNMIGrpc.gNMIBlockingStub blockingStub; + private StreamChannelManager streamChannelManager; + private GnmiControllerImpl controller; - GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel) { + GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) { super(clientKey); this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel); + this.streamChannelManager = new StreamChannelManager(managedChannel); + this.controller = controller; } @Override @@ -66,11 +79,27 @@ public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient { return supplyInContext(() -> doSet(request), "set"); } + @Override + public boolean subscribe(SubscribeRequest request) { + return streamChannelManager.send(request); + } + + @Override + public void terminateSubscriptionChannel() { + streamChannelManager.complete(); + } + @Override public CompletableFuture isServiceAvailable() { return supplyInContext(this::doServiceAvailable, "isServiceAvailable"); } + @Override + protected Void doShutdown() { + streamChannelManager.complete(); + return super.doShutdown(); + } + private CapabilityResponse doCapability() { CapabilityRequest request = CapabilityRequest.newBuilder().build(); try { @@ -110,4 +139,121 @@ public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient { return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT); } } + + + + /** + * A manager for the gNMI stream channel that opportunistically creates + * new stream RCP stubs (e.g. when one fails because of errors) and posts + * subscribe events via the gNMI controller. + */ + private final class StreamChannelManager { + + private final ManagedChannel channel; + private final AtomicBoolean open; + private final StreamObserver responseObserver; + private ClientCallStreamObserver requestObserver; + + private StreamChannelManager(ManagedChannel channel) { + this.channel = channel; + this.responseObserver = new InternalStreamResponseObserver(this); + this.open = new AtomicBoolean(false); + } + + private void initIfRequired() { + if (requestObserver == null) { + log.debug("Creating new stream channel for {}...", deviceId); + requestObserver = (ClientCallStreamObserver) + gNMIGrpc.newStub(channel).subscribe(responseObserver); + open.set(false); + } + } + + public boolean send(SubscribeRequest value) { + synchronized (this) { + initIfRequired(); + try { + requestObserver.onNext(value); + return true; + } catch (Throwable ex) { + if (ex instanceof StatusRuntimeException) { + log.warn("Unable to send subscribe request to {}: {}", + deviceId, ex.getMessage()); + } else { + log.warn("Exception while sending subscribe request to {}", + deviceId, ex); + } + complete(); + return false; + } + } + } + + public void complete() { + synchronized (this) { + if (requestObserver != null) { + requestObserver.onCompleted(); + requestObserver.cancel("Terminated", null); + requestObserver = null; + } + } + } + } + + + /** + * Handles messages received from the device on the stream channel. + */ + private final class InternalStreamResponseObserver + implements StreamObserver { + + private final StreamChannelManager streamChannelManager; + + private InternalStreamResponseObserver( + StreamChannelManager streamChannelManager) { + this.streamChannelManager = streamChannelManager; + } + + @Override + public void onNext(SubscribeResponse message) { + executorService.submit(() -> doNext(message)); + } + + private void doNext(SubscribeResponse message) { + try { + log.debug("Received message on stream channel from {}: {}", + deviceId, message.toString()); + GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse()); + GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update); + controller.postEvent(event); + } catch (Throwable ex) { + log.error("Exception while processing stream message from {}", + deviceId, ex); + } + } + + @Override + public void onError(Throwable throwable) { + if (throwable instanceof StatusRuntimeException) { + StatusRuntimeException sre = (StatusRuntimeException) throwable; + if (sre.getStatus().getCause() instanceof ConnectException) { + log.warn("Device {} is unreachable ({})", + deviceId, sre.getCause().getMessage()); + } else { + log.warn("Received error on stream channel for {}: {}", + deviceId, throwable.getMessage()); + } + } else { + log.warn(format("Received exception on stream channel for %s", + deviceId), throwable); + } + streamChannelManager.complete(); + } + + @Override + public void onCompleted() { + log.warn("Stream channel for {} has completed", deviceId); + streamChannelManager.complete(); + } + } } diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java index 8392c4a0be..cf147128eb 100644 --- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java +++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java @@ -41,6 +41,7 @@ public class GnmiControllerImpl @Activate public void activate() { super.activate(); + eventDispatcher.addSink(GnmiEvent.class, listenerRegistry); log.info("Started"); } @@ -52,6 +53,15 @@ public class GnmiControllerImpl @Override protected GnmiClient createClientInstance(GnmiClientKey clientKey, ManagedChannel channel) { - return new GnmiClientImpl(clientKey, channel); + return new GnmiClientImpl(clientKey, channel, this); + } + + /** + * Handles event from gNMI client. + * + * @param event the gNMI event + */ + void postEvent(GnmiEvent event) { + post(event); } } diff --git a/providers/general/BUILD b/providers/general/BUILD index 0f4654f94e..2ac73a0d31 100644 --- a/providers/general/BUILD +++ b/providers/general/BUILD @@ -7,7 +7,7 @@ onos_app( category = "Provider", description = "General device southbound providers.", included_bundles = BUNDLES, - required_apps = [], + required_apps = ["org.onosproject.protocols.gnmi"], title = "General Device Provider", url = "http://onosproject.org", ) diff --git a/providers/general/device/BUILD b/providers/general/device/BUILD index 2807457047..40957b14ff 100644 --- a/providers/general/device/BUILD +++ b/providers/general/device/BUILD @@ -1,4 +1,9 @@ -COMPILE_DEPS = CORE_DEPS + JACKSON +COMPILE_DEPS = CORE_DEPS + JACKSON + [ + "//protocols/gnmi/stub:onos-protocols-gnmi-stub", + "//protocols/gnmi/api:onos-protocols-gnmi-api", + "@com_google_protobuf//:protobuf_java", + "//protocols/grpc/api:onos-protocols-grpc-api", +] osgi_jar_with_tests( test_deps = TEST_ADAPTERS, diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java index 3ca56ee664..bbf71f28ac 100644 --- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java +++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java @@ -25,6 +25,7 @@ import org.onlab.util.ItemNotFoundException; import org.onlab.util.Tools; import org.onosproject.cfg.ComponentConfigService; import org.onosproject.core.CoreService; +import org.onosproject.gnmi.api.GnmiController; import org.onosproject.mastership.MastershipService; import org.onosproject.net.AnnotationKeys; import org.onosproject.net.DefaultAnnotations; @@ -169,6 +170,16 @@ public class GeneralDeviceProvider extends AbstractProvider @Reference(cardinality = ReferenceCardinality.MANDATORY) private PiPipeconfWatchdogService pipeconfWatchdogService; + // FIXME: no longer general if we add a dependency to a protocol-specific + // service. Possible solutions are: rename this provider to + // StratumDeviceProvider, find a way to allow this provider to register for + // protocol specific events (e.g. port events) via drivers (similar to + // DeviceAgentListener). + @Reference(cardinality = ReferenceCardinality.MANDATORY) + private GnmiController gnmiController; + + private GnmiDeviceStateSubscriber gnmiDeviceStateSubscriber; + /** * Configure poll frequency for port status and statistics; default is 10 sec. */ @@ -228,6 +239,9 @@ public class GeneralDeviceProvider extends AbstractProvider pipeconfWatchdogService.addListener(pipeconfWatchdogListener); rescheduleProbeTask(false); modified(context); + gnmiDeviceStateSubscriber = new GnmiDeviceStateSubscriber(gnmiController, + deviceService, mastershipService, providerService); + gnmiDeviceStateSubscriber.activate(); log.info("Started"); } @@ -316,6 +330,8 @@ public class GeneralDeviceProvider extends AbstractProvider providerRegistry.unregister(this); providerService = null; cfgService.unregisterConfigFactory(factory); + gnmiDeviceStateSubscriber.deactivate(); + gnmiDeviceStateSubscriber = null; log.info("Stopped"); } diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java new file mode 100644 index 0000000000..b253f3e7f1 --- /dev/null +++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java @@ -0,0 +1,281 @@ +/* + * Copyright 2018-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onosproject.provider.general.device.impl; + +import com.google.common.annotations.Beta; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Striped; +import gnmi.Gnmi.Notification; +import gnmi.Gnmi.Path; +import gnmi.Gnmi.PathElem; +import gnmi.Gnmi.SubscribeRequest; +import gnmi.Gnmi.Subscription; +import gnmi.Gnmi.SubscriptionList; +import gnmi.Gnmi.SubscriptionMode; +import gnmi.Gnmi.Update; +import org.onlab.util.SharedExecutors; +import org.onosproject.gnmi.api.GnmiClient; +import org.onosproject.gnmi.api.GnmiController; +import org.onosproject.gnmi.api.GnmiEvent; +import org.onosproject.gnmi.api.GnmiEventListener; +import org.onosproject.gnmi.api.GnmiUpdate; +import org.onosproject.gnmi.api.GnmiUtils; +import org.onosproject.mastership.MastershipEvent; +import org.onosproject.mastership.MastershipListener; +import org.onosproject.mastership.MastershipService; +import org.onosproject.net.DeviceId; +import org.onosproject.net.Port; +import org.onosproject.net.SparseAnnotations; +import org.onosproject.net.device.DefaultPortDescription; +import org.onosproject.net.device.DeviceEvent; +import org.onosproject.net.device.DeviceListener; +import org.onosproject.net.device.DeviceProviderService; +import org.onosproject.net.device.DeviceService; +import org.onosproject.net.device.PortDescription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.Lock; + +/** + * Entity that manages gNMI subscription for devices using OpenConfig models and + * that reports relevant events to the core. + */ +@Beta +class GnmiDeviceStateSubscriber { + + private static Logger log = LoggerFactory.getLogger(GnmiDeviceStateSubscriber.class); + + private final GnmiController gnmiController; + private final DeviceService deviceService; + private final DeviceProviderService providerService; + private final MastershipService mastershipService; + + private final ExecutorService executorService = SharedExecutors.getPoolThreadExecutor(); + + private final InternalGnmiEventListener gnmiEventListener = new InternalGnmiEventListener(); + private final InternalDeviceListener deviceEventListener = new InternalDeviceListener(); + private final InternalMastershipListener mastershipListener = new InternalMastershipListener(); + private final Collection deviceSubscribed = Sets.newHashSet(); + + private final Striped deviceLocks = Striped.lock(30); + + GnmiDeviceStateSubscriber(GnmiController gnmiController, DeviceService deviceService, + MastershipService mastershipService, + DeviceProviderService providerService) { + this.gnmiController = gnmiController; + this.deviceService = deviceService; + this.mastershipService = mastershipService; + this.providerService = providerService; + } + + public void activate() { + deviceService.addListener(deviceEventListener); + mastershipService.addListener(mastershipListener); + gnmiController.addListener(gnmiEventListener); + // Subscribe to existing devices. + deviceService.getDevices().forEach(d -> executorService.execute( + () -> checkDeviceSubscription(d.id()))); + } + + public void deactivate() { + deviceSubscribed.forEach(this::unsubscribeIfNeeded); + deviceService.removeListener(deviceEventListener); + mastershipService.removeListener(mastershipListener); + gnmiController.removeListener(gnmiEventListener); + } + + private void checkDeviceSubscription(DeviceId deviceId) { + deviceLocks.get(deviceId).lock(); + try { + if (!deviceService.isAvailable(deviceId) + || deviceService.getDevice(deviceId) == null + || !mastershipService.isLocalMaster(deviceId)) { + // Device not available/removed or this instance is no longer + // master. + unsubscribeIfNeeded(deviceId); + } else { + subscribeIfNeeded(deviceId); + } + } finally { + deviceLocks.get(deviceId).unlock(); + } + } + + private Path interfaceOperStatusPath(String interfaceName) { + return Path.newBuilder() + .addElem(PathElem.newBuilder().setName("interfaces").build()) + .addElem(PathElem.newBuilder() + .setName("interface").putKey("name", interfaceName).build()) + .addElem(PathElem.newBuilder().setName("state").build()) + .addElem(PathElem.newBuilder().setName("oper-status").build()) + .build(); + } + + private void unsubscribeIfNeeded(DeviceId deviceId) { + if (!deviceSubscribed.contains(deviceId)) { + // Not subscribed. + return; + } + GnmiClient client = gnmiController.getClient(deviceId); + if (client == null) { + log.debug("Cannot find gNMI client for device {}", deviceId); + } else { + client.terminateSubscriptionChannel(); + } + deviceSubscribed.remove(deviceId); + } + + private void subscribeIfNeeded(DeviceId deviceId) { + if (deviceSubscribed.contains(deviceId)) { + // Already subscribed. + // FIXME: if a new port is added after the first subscription we are + // not subscribing to the new port. + return; + } + + GnmiClient client = gnmiController.getClient(deviceId); + if (client == null) { + log.warn("Cannot find gNMI client for device {}", deviceId); + return; + } + + List ports = deviceService.getPorts(deviceId); + SubscriptionList.Builder subscriptionList = SubscriptionList.newBuilder(); + subscriptionList.setMode(SubscriptionList.Mode.STREAM); + subscriptionList.setUpdatesOnly(true); + + ports.forEach(port -> { + String portName = port.number().name(); + // Subscribe /interface/interface[name=port-name]/state/oper-status + Path subscribePath = interfaceOperStatusPath(portName); + Subscription interfaceOperStatusSub = + Subscription.newBuilder() + .setPath(subscribePath) + .setMode(SubscriptionMode.ON_CHANGE) + .build(); + // TODO: more state subscription + subscriptionList.addSubscription(interfaceOperStatusSub); + }); + + SubscribeRequest subscribeRequest = SubscribeRequest.newBuilder() + .setSubscribe(subscriptionList.build()) + .build(); + + client.subscribe(subscribeRequest); + + deviceSubscribed.add(deviceId); + } + + private void handleGnmiUpdate(GnmiUpdate eventSubject) { + Notification notification = eventSubject.update(); + if (notification == null) { + log.warn("Cannot handle gNMI event without update data, abort"); + log.debug("gNMI update:\n{}", eventSubject); + return; + } + + List updateList = notification.getUpdateList(); + updateList.forEach(update -> { + Path path = update.getPath(); + PathElem lastElem = path.getElem(path.getElemCount() - 1); + + // Use last element to identify which state updated + if ("oper-status".equals(lastElem.getName())) { + handleOperStatusUpdate(eventSubject.deviceId(), update); + } else { + log.debug("Unrecognized update {}", GnmiUtils.pathToString(path)); + } + }); + } + + private void handleOperStatusUpdate(DeviceId deviceId, Update update) { + Path path = update.getPath(); + // first element should be "interface" + String interfaceName = path.getElem(1).getKeyOrDefault("name", null); + if (interfaceName == null) { + log.error("No interface present in gNMI update, abort"); + log.debug("gNMI update:\n{}", update); + return; + } + + List portsFromDevice = deviceService.getPorts(deviceId); + portsFromDevice.forEach(port -> { + if (!port.number().name().equals(interfaceName)) { + return; + } + + // Port/Interface name is identical in OpenConfig model, but not in ONOS + // This might cause some problem if we use one name to different port + PortDescription portDescription = DefaultPortDescription.builder() + .portSpeed(port.portSpeed()) + .withPortNumber(port.number()) + .isEnabled(update.getVal().getStringVal().equals("UP")) + .type(port.type()) + .annotations((SparseAnnotations) port.annotations()) + .build(); + providerService.portStatusChanged(deviceId, portDescription); + }); + } + + class InternalGnmiEventListener implements GnmiEventListener { + + @Override + public void event(GnmiEvent event) { + if (!deviceSubscribed.contains(event.subject().deviceId())) { + log.warn("Received gNMI event from {}, but we are not subscribed to it", + event.subject().deviceId()); + } + log.debug("Received gNMI event {}", event.toString()); + if (event.type() == GnmiEvent.Type.UPDATE) { + executorService.execute( + () -> handleGnmiUpdate((GnmiUpdate) event.subject())); + } else { + log.debug("Unsupported gNMI event type: {}", event.type()); + } + } + } + + class InternalMastershipListener implements MastershipListener { + + @Override + public void event(MastershipEvent event) { + executorService.execute(() -> checkDeviceSubscription(event.subject())); + } + } + + class InternalDeviceListener implements DeviceListener { + + @Override + public void event(DeviceEvent event) { + switch (event.type()) { + case DEVICE_ADDED: + case DEVICE_AVAILABILITY_CHANGED: + case DEVICE_UPDATED: + case DEVICE_REMOVED: + executorService.execute( + () -> checkDeviceSubscription(event.subject().id())); + break; + default: + break; + } + } + } +}