mirror of
https://github.com/opennetworkinglab/onos.git
synced 2025-10-16 09:51:38 +02:00
Add gNMI device state subscriber
Change-Id: I20cb5e130f4e416bf8678aab2e5268faf24ad06b
This commit is contained in:
parent
d771648025
commit
e616d7553e
@ -22,6 +22,7 @@ import gnmi.Gnmi.GetRequest;
|
||||
import gnmi.Gnmi.GetResponse;
|
||||
import gnmi.Gnmi.SetRequest;
|
||||
import gnmi.Gnmi.SetResponse;
|
||||
import gnmi.Gnmi.SubscribeRequest;
|
||||
import org.onosproject.grpc.api.GrpcClient;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
@ -56,12 +57,23 @@ public interface GnmiClient extends GrpcClient {
|
||||
CompletableFuture<SetResponse> set(SetRequest request);
|
||||
|
||||
/**
|
||||
* Check weather the gNMI service is available or not by sending a
|
||||
* dummy get request message.
|
||||
* Subscribes to a given specific gNMI path.
|
||||
*
|
||||
* @param request the subscribe request
|
||||
* @return true if subscribe successfully; false otherwise
|
||||
*/
|
||||
boolean subscribe(SubscribeRequest request);
|
||||
|
||||
/**
|
||||
* Terminates the subscription channel of this device.
|
||||
*/
|
||||
void terminateSubscriptionChannel();
|
||||
|
||||
/**
|
||||
* Check weather the gNMI service is available or not by sending a dummy get
|
||||
* request message.
|
||||
*
|
||||
* @return true if gNMI service available; false otherwise
|
||||
*/
|
||||
CompletableFuture<Boolean> isServiceAvailable();
|
||||
|
||||
// TODO: Support gNMI subscription
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package org.onosproject.gnmi.api;
|
||||
|
||||
import com.google.common.annotations.Beta;
|
||||
import org.onosproject.event.ListenerService;
|
||||
import org.onosproject.grpc.api.GrpcClientController;
|
||||
|
||||
/**
|
||||
@ -24,5 +25,6 @@ import org.onosproject.grpc.api.GrpcClientController;
|
||||
*/
|
||||
@Beta
|
||||
public interface GnmiController
|
||||
extends GrpcClientController<GnmiClientKey, GnmiClient> {
|
||||
extends GrpcClientController<GnmiClientKey, GnmiClient>,
|
||||
ListenerService<GnmiEvent, GnmiEventListener> {
|
||||
}
|
||||
|
@ -32,15 +32,10 @@ public final class GnmiEvent extends AbstractEvent<GnmiEvent.Type, GnmiEventSubj
|
||||
/**
|
||||
* Update.
|
||||
*/
|
||||
UPDATE,
|
||||
|
||||
/**
|
||||
* Sync response.
|
||||
*/
|
||||
SYNC_RESPONSE
|
||||
UPDATE
|
||||
}
|
||||
|
||||
protected GnmiEvent(Type type, GnmiEventSubject subject) {
|
||||
public GnmiEvent(Type type, GnmiEventSubject subject) {
|
||||
super(type, subject);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,77 @@
|
||||
/*
|
||||
* Copyright 2018-present Open Networking Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.onosproject.gnmi.api;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import gnmi.Gnmi.Notification;
|
||||
import org.onosproject.net.DeviceId;
|
||||
|
||||
/**
|
||||
* Event class for gNMI update.
|
||||
*/
|
||||
public class GnmiUpdate implements GnmiEventSubject {
|
||||
private DeviceId deviceId;
|
||||
private Notification update;
|
||||
private boolean syncResponse;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*
|
||||
* @param deviceId the device id for this event
|
||||
* @param update the update for this event
|
||||
* @param syncResponse indicate target has sent all values associated with
|
||||
* the subscription at least once.
|
||||
*/
|
||||
public GnmiUpdate(DeviceId deviceId, Notification update, boolean syncResponse) {
|
||||
this.deviceId = deviceId;
|
||||
this.update = update;
|
||||
this.syncResponse = syncResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the update data.
|
||||
*
|
||||
* @return the update data
|
||||
*/
|
||||
public Notification update() {
|
||||
return update;
|
||||
}
|
||||
|
||||
/**
|
||||
* indicate target has sent all values associated with the subscription at
|
||||
* least once.
|
||||
*
|
||||
* @return true if all value from target has sent
|
||||
*/
|
||||
public boolean syncResponse() {
|
||||
return syncResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeviceId deviceId() {
|
||||
return deviceId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(getClass())
|
||||
.add("deviceId", deviceId)
|
||||
.add("syncResponse", syncResponse)
|
||||
.add("update", update)
|
||||
.toString();
|
||||
}
|
||||
}
|
@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Copyright 2018-present Open Networking Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.onosproject.gnmi.api;
|
||||
|
||||
import gnmi.Gnmi.Path;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Utilities for gNMI protocol.
|
||||
*/
|
||||
public final class GnmiUtils {
|
||||
|
||||
private GnmiUtils() {
|
||||
// Hide default constructor
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert gNMI path to human readable string.
|
||||
*
|
||||
* @param path the gNMI path
|
||||
* @return readable string of the path
|
||||
*/
|
||||
public static String pathToString(Path path) {
|
||||
StringBuilder pathStringBuilder = new StringBuilder();
|
||||
|
||||
path.getElemList().forEach(elem -> {
|
||||
pathStringBuilder.append("/").append(elem.getName());
|
||||
if (elem.getKeyCount() > 0) {
|
||||
pathStringBuilder.append("[");
|
||||
List<String> keys = elem.getKeyMap().entrySet().stream()
|
||||
.map(entry -> entry.getKey() + "=" + entry.getValue())
|
||||
.collect(Collectors.toList());
|
||||
pathStringBuilder.append(String.join(", ", keys));
|
||||
pathStringBuilder.append("]");
|
||||
}
|
||||
});
|
||||
return pathStringBuilder.toString();
|
||||
}
|
||||
}
|
@ -23,17 +23,26 @@ import gnmi.Gnmi.Path;
|
||||
import gnmi.Gnmi.PathElem;
|
||||
import gnmi.Gnmi.SetRequest;
|
||||
import gnmi.Gnmi.SetResponse;
|
||||
import gnmi.Gnmi.SubscribeRequest;
|
||||
import gnmi.Gnmi.SubscribeResponse;
|
||||
import gnmi.gNMIGrpc;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.grpc.stub.ClientCallStreamObserver;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import org.onosproject.gnmi.api.GnmiClient;
|
||||
import org.onosproject.gnmi.api.GnmiClientKey;
|
||||
import org.onosproject.gnmi.api.GnmiEvent;
|
||||
import org.onosproject.gnmi.api.GnmiUpdate;
|
||||
import org.onosproject.grpc.ctl.AbstractGrpcClient;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.net.ConnectException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static java.lang.String.format;
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
|
||||
/**
|
||||
@ -45,10 +54,14 @@ public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient {
|
||||
private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build();
|
||||
private final Logger log = getLogger(getClass());
|
||||
private final gNMIGrpc.gNMIBlockingStub blockingStub;
|
||||
private StreamChannelManager streamChannelManager;
|
||||
private GnmiControllerImpl controller;
|
||||
|
||||
GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel) {
|
||||
GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
|
||||
super(clientKey);
|
||||
this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
|
||||
this.streamChannelManager = new StreamChannelManager(managedChannel);
|
||||
this.controller = controller;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -66,11 +79,27 @@ public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient {
|
||||
return supplyInContext(() -> doSet(request), "set");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean subscribe(SubscribeRequest request) {
|
||||
return streamChannelManager.send(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void terminateSubscriptionChannel() {
|
||||
streamChannelManager.complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isServiceAvailable() {
|
||||
return supplyInContext(this::doServiceAvailable, "isServiceAvailable");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Void doShutdown() {
|
||||
streamChannelManager.complete();
|
||||
return super.doShutdown();
|
||||
}
|
||||
|
||||
private CapabilityResponse doCapability() {
|
||||
CapabilityRequest request = CapabilityRequest.newBuilder().build();
|
||||
try {
|
||||
@ -110,4 +139,121 @@ public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient {
|
||||
return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* A manager for the gNMI stream channel that opportunistically creates
|
||||
* new stream RCP stubs (e.g. when one fails because of errors) and posts
|
||||
* subscribe events via the gNMI controller.
|
||||
*/
|
||||
private final class StreamChannelManager {
|
||||
|
||||
private final ManagedChannel channel;
|
||||
private final AtomicBoolean open;
|
||||
private final StreamObserver<SubscribeResponse> responseObserver;
|
||||
private ClientCallStreamObserver<SubscribeRequest> requestObserver;
|
||||
|
||||
private StreamChannelManager(ManagedChannel channel) {
|
||||
this.channel = channel;
|
||||
this.responseObserver = new InternalStreamResponseObserver(this);
|
||||
this.open = new AtomicBoolean(false);
|
||||
}
|
||||
|
||||
private void initIfRequired() {
|
||||
if (requestObserver == null) {
|
||||
log.debug("Creating new stream channel for {}...", deviceId);
|
||||
requestObserver = (ClientCallStreamObserver<SubscribeRequest>)
|
||||
gNMIGrpc.newStub(channel).subscribe(responseObserver);
|
||||
open.set(false);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean send(SubscribeRequest value) {
|
||||
synchronized (this) {
|
||||
initIfRequired();
|
||||
try {
|
||||
requestObserver.onNext(value);
|
||||
return true;
|
||||
} catch (Throwable ex) {
|
||||
if (ex instanceof StatusRuntimeException) {
|
||||
log.warn("Unable to send subscribe request to {}: {}",
|
||||
deviceId, ex.getMessage());
|
||||
} else {
|
||||
log.warn("Exception while sending subscribe request to {}",
|
||||
deviceId, ex);
|
||||
}
|
||||
complete();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void complete() {
|
||||
synchronized (this) {
|
||||
if (requestObserver != null) {
|
||||
requestObserver.onCompleted();
|
||||
requestObserver.cancel("Terminated", null);
|
||||
requestObserver = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Handles messages received from the device on the stream channel.
|
||||
*/
|
||||
private final class InternalStreamResponseObserver
|
||||
implements StreamObserver<SubscribeResponse> {
|
||||
|
||||
private final StreamChannelManager streamChannelManager;
|
||||
|
||||
private InternalStreamResponseObserver(
|
||||
StreamChannelManager streamChannelManager) {
|
||||
this.streamChannelManager = streamChannelManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(SubscribeResponse message) {
|
||||
executorService.submit(() -> doNext(message));
|
||||
}
|
||||
|
||||
private void doNext(SubscribeResponse message) {
|
||||
try {
|
||||
log.debug("Received message on stream channel from {}: {}",
|
||||
deviceId, message.toString());
|
||||
GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
|
||||
GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
|
||||
controller.postEvent(event);
|
||||
} catch (Throwable ex) {
|
||||
log.error("Exception while processing stream message from {}",
|
||||
deviceId, ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
if (throwable instanceof StatusRuntimeException) {
|
||||
StatusRuntimeException sre = (StatusRuntimeException) throwable;
|
||||
if (sre.getStatus().getCause() instanceof ConnectException) {
|
||||
log.warn("Device {} is unreachable ({})",
|
||||
deviceId, sre.getCause().getMessage());
|
||||
} else {
|
||||
log.warn("Received error on stream channel for {}: {}",
|
||||
deviceId, throwable.getMessage());
|
||||
}
|
||||
} else {
|
||||
log.warn(format("Received exception on stream channel for %s",
|
||||
deviceId), throwable);
|
||||
}
|
||||
streamChannelManager.complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
log.warn("Stream channel for {} has completed", deviceId);
|
||||
streamChannelManager.complete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -41,6 +41,7 @@ public class GnmiControllerImpl
|
||||
@Activate
|
||||
public void activate() {
|
||||
super.activate();
|
||||
eventDispatcher.addSink(GnmiEvent.class, listenerRegistry);
|
||||
log.info("Started");
|
||||
}
|
||||
|
||||
@ -52,6 +53,15 @@ public class GnmiControllerImpl
|
||||
|
||||
@Override
|
||||
protected GnmiClient createClientInstance(GnmiClientKey clientKey, ManagedChannel channel) {
|
||||
return new GnmiClientImpl(clientKey, channel);
|
||||
return new GnmiClientImpl(clientKey, channel, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles event from gNMI client.
|
||||
*
|
||||
* @param event the gNMI event
|
||||
*/
|
||||
void postEvent(GnmiEvent event) {
|
||||
post(event);
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ onos_app(
|
||||
category = "Provider",
|
||||
description = "General device southbound providers.",
|
||||
included_bundles = BUNDLES,
|
||||
required_apps = [],
|
||||
required_apps = ["org.onosproject.protocols.gnmi"],
|
||||
title = "General Device Provider",
|
||||
url = "http://onosproject.org",
|
||||
)
|
||||
|
@ -1,4 +1,9 @@
|
||||
COMPILE_DEPS = CORE_DEPS + JACKSON
|
||||
COMPILE_DEPS = CORE_DEPS + JACKSON + [
|
||||
"//protocols/gnmi/stub:onos-protocols-gnmi-stub",
|
||||
"//protocols/gnmi/api:onos-protocols-gnmi-api",
|
||||
"@com_google_protobuf//:protobuf_java",
|
||||
"//protocols/grpc/api:onos-protocols-grpc-api",
|
||||
]
|
||||
|
||||
osgi_jar_with_tests(
|
||||
test_deps = TEST_ADAPTERS,
|
||||
|
@ -25,6 +25,7 @@ import org.onlab.util.ItemNotFoundException;
|
||||
import org.onlab.util.Tools;
|
||||
import org.onosproject.cfg.ComponentConfigService;
|
||||
import org.onosproject.core.CoreService;
|
||||
import org.onosproject.gnmi.api.GnmiController;
|
||||
import org.onosproject.mastership.MastershipService;
|
||||
import org.onosproject.net.AnnotationKeys;
|
||||
import org.onosproject.net.DefaultAnnotations;
|
||||
@ -169,6 +170,16 @@ public class GeneralDeviceProvider extends AbstractProvider
|
||||
@Reference(cardinality = ReferenceCardinality.MANDATORY)
|
||||
private PiPipeconfWatchdogService pipeconfWatchdogService;
|
||||
|
||||
// FIXME: no longer general if we add a dependency to a protocol-specific
|
||||
// service. Possible solutions are: rename this provider to
|
||||
// StratumDeviceProvider, find a way to allow this provider to register for
|
||||
// protocol specific events (e.g. port events) via drivers (similar to
|
||||
// DeviceAgentListener).
|
||||
@Reference(cardinality = ReferenceCardinality.MANDATORY)
|
||||
private GnmiController gnmiController;
|
||||
|
||||
private GnmiDeviceStateSubscriber gnmiDeviceStateSubscriber;
|
||||
|
||||
/**
|
||||
* Configure poll frequency for port status and statistics; default is 10 sec.
|
||||
*/
|
||||
@ -228,6 +239,9 @@ public class GeneralDeviceProvider extends AbstractProvider
|
||||
pipeconfWatchdogService.addListener(pipeconfWatchdogListener);
|
||||
rescheduleProbeTask(false);
|
||||
modified(context);
|
||||
gnmiDeviceStateSubscriber = new GnmiDeviceStateSubscriber(gnmiController,
|
||||
deviceService, mastershipService, providerService);
|
||||
gnmiDeviceStateSubscriber.activate();
|
||||
log.info("Started");
|
||||
}
|
||||
|
||||
@ -316,6 +330,8 @@ public class GeneralDeviceProvider extends AbstractProvider
|
||||
providerRegistry.unregister(this);
|
||||
providerService = null;
|
||||
cfgService.unregisterConfigFactory(factory);
|
||||
gnmiDeviceStateSubscriber.deactivate();
|
||||
gnmiDeviceStateSubscriber = null;
|
||||
log.info("Stopped");
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,281 @@
|
||||
/*
|
||||
* Copyright 2018-present Open Networking Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.onosproject.provider.general.device.impl;
|
||||
|
||||
import com.google.common.annotations.Beta;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.Striped;
|
||||
import gnmi.Gnmi.Notification;
|
||||
import gnmi.Gnmi.Path;
|
||||
import gnmi.Gnmi.PathElem;
|
||||
import gnmi.Gnmi.SubscribeRequest;
|
||||
import gnmi.Gnmi.Subscription;
|
||||
import gnmi.Gnmi.SubscriptionList;
|
||||
import gnmi.Gnmi.SubscriptionMode;
|
||||
import gnmi.Gnmi.Update;
|
||||
import org.onlab.util.SharedExecutors;
|
||||
import org.onosproject.gnmi.api.GnmiClient;
|
||||
import org.onosproject.gnmi.api.GnmiController;
|
||||
import org.onosproject.gnmi.api.GnmiEvent;
|
||||
import org.onosproject.gnmi.api.GnmiEventListener;
|
||||
import org.onosproject.gnmi.api.GnmiUpdate;
|
||||
import org.onosproject.gnmi.api.GnmiUtils;
|
||||
import org.onosproject.mastership.MastershipEvent;
|
||||
import org.onosproject.mastership.MastershipListener;
|
||||
import org.onosproject.mastership.MastershipService;
|
||||
import org.onosproject.net.DeviceId;
|
||||
import org.onosproject.net.Port;
|
||||
import org.onosproject.net.SparseAnnotations;
|
||||
import org.onosproject.net.device.DefaultPortDescription;
|
||||
import org.onosproject.net.device.DeviceEvent;
|
||||
import org.onosproject.net.device.DeviceListener;
|
||||
import org.onosproject.net.device.DeviceProviderService;
|
||||
import org.onosproject.net.device.DeviceService;
|
||||
import org.onosproject.net.device.PortDescription;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
||||
/**
|
||||
* Entity that manages gNMI subscription for devices using OpenConfig models and
|
||||
* that reports relevant events to the core.
|
||||
*/
|
||||
@Beta
|
||||
class GnmiDeviceStateSubscriber {
|
||||
|
||||
private static Logger log = LoggerFactory.getLogger(GnmiDeviceStateSubscriber.class);
|
||||
|
||||
private final GnmiController gnmiController;
|
||||
private final DeviceService deviceService;
|
||||
private final DeviceProviderService providerService;
|
||||
private final MastershipService mastershipService;
|
||||
|
||||
private final ExecutorService executorService = SharedExecutors.getPoolThreadExecutor();
|
||||
|
||||
private final InternalGnmiEventListener gnmiEventListener = new InternalGnmiEventListener();
|
||||
private final InternalDeviceListener deviceEventListener = new InternalDeviceListener();
|
||||
private final InternalMastershipListener mastershipListener = new InternalMastershipListener();
|
||||
private final Collection<DeviceId> deviceSubscribed = Sets.newHashSet();
|
||||
|
||||
private final Striped<Lock> deviceLocks = Striped.lock(30);
|
||||
|
||||
GnmiDeviceStateSubscriber(GnmiController gnmiController, DeviceService deviceService,
|
||||
MastershipService mastershipService,
|
||||
DeviceProviderService providerService) {
|
||||
this.gnmiController = gnmiController;
|
||||
this.deviceService = deviceService;
|
||||
this.mastershipService = mastershipService;
|
||||
this.providerService = providerService;
|
||||
}
|
||||
|
||||
public void activate() {
|
||||
deviceService.addListener(deviceEventListener);
|
||||
mastershipService.addListener(mastershipListener);
|
||||
gnmiController.addListener(gnmiEventListener);
|
||||
// Subscribe to existing devices.
|
||||
deviceService.getDevices().forEach(d -> executorService.execute(
|
||||
() -> checkDeviceSubscription(d.id())));
|
||||
}
|
||||
|
||||
public void deactivate() {
|
||||
deviceSubscribed.forEach(this::unsubscribeIfNeeded);
|
||||
deviceService.removeListener(deviceEventListener);
|
||||
mastershipService.removeListener(mastershipListener);
|
||||
gnmiController.removeListener(gnmiEventListener);
|
||||
}
|
||||
|
||||
private void checkDeviceSubscription(DeviceId deviceId) {
|
||||
deviceLocks.get(deviceId).lock();
|
||||
try {
|
||||
if (!deviceService.isAvailable(deviceId)
|
||||
|| deviceService.getDevice(deviceId) == null
|
||||
|| !mastershipService.isLocalMaster(deviceId)) {
|
||||
// Device not available/removed or this instance is no longer
|
||||
// master.
|
||||
unsubscribeIfNeeded(deviceId);
|
||||
} else {
|
||||
subscribeIfNeeded(deviceId);
|
||||
}
|
||||
} finally {
|
||||
deviceLocks.get(deviceId).unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private Path interfaceOperStatusPath(String interfaceName) {
|
||||
return Path.newBuilder()
|
||||
.addElem(PathElem.newBuilder().setName("interfaces").build())
|
||||
.addElem(PathElem.newBuilder()
|
||||
.setName("interface").putKey("name", interfaceName).build())
|
||||
.addElem(PathElem.newBuilder().setName("state").build())
|
||||
.addElem(PathElem.newBuilder().setName("oper-status").build())
|
||||
.build();
|
||||
}
|
||||
|
||||
private void unsubscribeIfNeeded(DeviceId deviceId) {
|
||||
if (!deviceSubscribed.contains(deviceId)) {
|
||||
// Not subscribed.
|
||||
return;
|
||||
}
|
||||
GnmiClient client = gnmiController.getClient(deviceId);
|
||||
if (client == null) {
|
||||
log.debug("Cannot find gNMI client for device {}", deviceId);
|
||||
} else {
|
||||
client.terminateSubscriptionChannel();
|
||||
}
|
||||
deviceSubscribed.remove(deviceId);
|
||||
}
|
||||
|
||||
private void subscribeIfNeeded(DeviceId deviceId) {
|
||||
if (deviceSubscribed.contains(deviceId)) {
|
||||
// Already subscribed.
|
||||
// FIXME: if a new port is added after the first subscription we are
|
||||
// not subscribing to the new port.
|
||||
return;
|
||||
}
|
||||
|
||||
GnmiClient client = gnmiController.getClient(deviceId);
|
||||
if (client == null) {
|
||||
log.warn("Cannot find gNMI client for device {}", deviceId);
|
||||
return;
|
||||
}
|
||||
|
||||
List<Port> ports = deviceService.getPorts(deviceId);
|
||||
SubscriptionList.Builder subscriptionList = SubscriptionList.newBuilder();
|
||||
subscriptionList.setMode(SubscriptionList.Mode.STREAM);
|
||||
subscriptionList.setUpdatesOnly(true);
|
||||
|
||||
ports.forEach(port -> {
|
||||
String portName = port.number().name();
|
||||
// Subscribe /interface/interface[name=port-name]/state/oper-status
|
||||
Path subscribePath = interfaceOperStatusPath(portName);
|
||||
Subscription interfaceOperStatusSub =
|
||||
Subscription.newBuilder()
|
||||
.setPath(subscribePath)
|
||||
.setMode(SubscriptionMode.ON_CHANGE)
|
||||
.build();
|
||||
// TODO: more state subscription
|
||||
subscriptionList.addSubscription(interfaceOperStatusSub);
|
||||
});
|
||||
|
||||
SubscribeRequest subscribeRequest = SubscribeRequest.newBuilder()
|
||||
.setSubscribe(subscriptionList.build())
|
||||
.build();
|
||||
|
||||
client.subscribe(subscribeRequest);
|
||||
|
||||
deviceSubscribed.add(deviceId);
|
||||
}
|
||||
|
||||
private void handleGnmiUpdate(GnmiUpdate eventSubject) {
|
||||
Notification notification = eventSubject.update();
|
||||
if (notification == null) {
|
||||
log.warn("Cannot handle gNMI event without update data, abort");
|
||||
log.debug("gNMI update:\n{}", eventSubject);
|
||||
return;
|
||||
}
|
||||
|
||||
List<Update> updateList = notification.getUpdateList();
|
||||
updateList.forEach(update -> {
|
||||
Path path = update.getPath();
|
||||
PathElem lastElem = path.getElem(path.getElemCount() - 1);
|
||||
|
||||
// Use last element to identify which state updated
|
||||
if ("oper-status".equals(lastElem.getName())) {
|
||||
handleOperStatusUpdate(eventSubject.deviceId(), update);
|
||||
} else {
|
||||
log.debug("Unrecognized update {}", GnmiUtils.pathToString(path));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void handleOperStatusUpdate(DeviceId deviceId, Update update) {
|
||||
Path path = update.getPath();
|
||||
// first element should be "interface"
|
||||
String interfaceName = path.getElem(1).getKeyOrDefault("name", null);
|
||||
if (interfaceName == null) {
|
||||
log.error("No interface present in gNMI update, abort");
|
||||
log.debug("gNMI update:\n{}", update);
|
||||
return;
|
||||
}
|
||||
|
||||
List<Port> portsFromDevice = deviceService.getPorts(deviceId);
|
||||
portsFromDevice.forEach(port -> {
|
||||
if (!port.number().name().equals(interfaceName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Port/Interface name is identical in OpenConfig model, but not in ONOS
|
||||
// This might cause some problem if we use one name to different port
|
||||
PortDescription portDescription = DefaultPortDescription.builder()
|
||||
.portSpeed(port.portSpeed())
|
||||
.withPortNumber(port.number())
|
||||
.isEnabled(update.getVal().getStringVal().equals("UP"))
|
||||
.type(port.type())
|
||||
.annotations((SparseAnnotations) port.annotations())
|
||||
.build();
|
||||
providerService.portStatusChanged(deviceId, portDescription);
|
||||
});
|
||||
}
|
||||
|
||||
class InternalGnmiEventListener implements GnmiEventListener {
|
||||
|
||||
@Override
|
||||
public void event(GnmiEvent event) {
|
||||
if (!deviceSubscribed.contains(event.subject().deviceId())) {
|
||||
log.warn("Received gNMI event from {}, but we are not subscribed to it",
|
||||
event.subject().deviceId());
|
||||
}
|
||||
log.debug("Received gNMI event {}", event.toString());
|
||||
if (event.type() == GnmiEvent.Type.UPDATE) {
|
||||
executorService.execute(
|
||||
() -> handleGnmiUpdate((GnmiUpdate) event.subject()));
|
||||
} else {
|
||||
log.debug("Unsupported gNMI event type: {}", event.type());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class InternalMastershipListener implements MastershipListener {
|
||||
|
||||
@Override
|
||||
public void event(MastershipEvent event) {
|
||||
executorService.execute(() -> checkDeviceSubscription(event.subject()));
|
||||
}
|
||||
}
|
||||
|
||||
class InternalDeviceListener implements DeviceListener {
|
||||
|
||||
@Override
|
||||
public void event(DeviceEvent event) {
|
||||
switch (event.type()) {
|
||||
case DEVICE_ADDED:
|
||||
case DEVICE_AVAILABILITY_CHANGED:
|
||||
case DEVICE_UPDATED:
|
||||
case DEVICE_REMOVED:
|
||||
executorService.execute(
|
||||
() -> checkDeviceSubscription(event.subject().id()));
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user