From a7f76c17e9f29a66fd7d32a59d9a56f382dedd80 Mon Sep 17 00:00:00 2001 From: Yi Tseng Date: Fri, 14 Dec 2018 14:19:18 -0800 Subject: [PATCH] [ONOS-7873] Add retry mechanism to gNMI stream channel manager Change-Id: Ifdd5b1c3fe9d3588913697aace9b77b27fb442f5 (cherry picked from commit 07d9b842f6b3bc8be3d33b2a666f1213231fd2c6) --- .../onosproject/gnmi/ctl/GnmiClientImpl.java | 138 +----------- .../gnmi/ctl/GnmiSubscriptionManager.java | 211 ++++++++++++++++++ 2 files changed, 217 insertions(+), 132 deletions(-) create mode 100644 protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java index ce9c5f840e..9936ff6810 100644 --- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java +++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java @@ -24,25 +24,17 @@ import gnmi.Gnmi.PathElem; import gnmi.Gnmi.SetRequest; import gnmi.Gnmi.SetResponse; import gnmi.Gnmi.SubscribeRequest; -import gnmi.Gnmi.SubscribeResponse; import gnmi.gNMIGrpc; import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.StatusRuntimeException; -import io.grpc.stub.ClientCallStreamObserver; -import io.grpc.stub.StreamObserver; import org.onosproject.gnmi.api.GnmiClient; import org.onosproject.gnmi.api.GnmiClientKey; -import org.onosproject.gnmi.api.GnmiEvent; -import org.onosproject.gnmi.api.GnmiUpdate; import org.onosproject.grpc.ctl.AbstractGrpcClient; import org.slf4j.Logger; -import java.net.ConnectException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import static java.lang.String.format; import static org.slf4j.LoggerFactory.getLogger; /** @@ -54,14 +46,13 @@ public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient { private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build(); private final Logger log = getLogger(getClass()); private final gNMIGrpc.gNMIBlockingStub blockingStub; - private StreamChannelManager streamChannelManager; - private GnmiControllerImpl controller; + private GnmiSubscriptionManager gnmiSubscriptionManager; GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) { super(clientKey); this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel); - this.streamChannelManager = new StreamChannelManager(managedChannel); - this.controller = controller; + this.gnmiSubscriptionManager = + new GnmiSubscriptionManager(managedChannel, deviceId, controller); } @Override @@ -81,12 +72,12 @@ public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient { @Override public boolean subscribe(SubscribeRequest request) { - return streamChannelManager.send(request); + return gnmiSubscriptionManager.subscribe(request); } @Override public void terminateSubscriptionChannel() { - streamChannelManager.complete(); + gnmiSubscriptionManager.complete(); } @Override @@ -96,7 +87,7 @@ public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient { @Override protected Void doShutdown() { - streamChannelManager.complete(); + gnmiSubscriptionManager.shutdown(); return super.doShutdown(); } @@ -139,121 +130,4 @@ public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient { return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT); } } - - - - /** - * A manager for the gNMI stream channel that opportunistically creates - * new stream RCP stubs (e.g. when one fails because of errors) and posts - * subscribe events via the gNMI controller. - */ - private final class StreamChannelManager { - - private final ManagedChannel channel; - private final AtomicBoolean open; - private final StreamObserver responseObserver; - private ClientCallStreamObserver requestObserver; - - private StreamChannelManager(ManagedChannel channel) { - this.channel = channel; - this.responseObserver = new InternalStreamResponseObserver(this); - this.open = new AtomicBoolean(false); - } - - private void initIfRequired() { - if (requestObserver == null) { - log.debug("Creating new stream channel for {}...", deviceId); - requestObserver = (ClientCallStreamObserver) - gNMIGrpc.newStub(channel).subscribe(responseObserver); - open.set(false); - } - } - - public boolean send(SubscribeRequest value) { - synchronized (this) { - initIfRequired(); - try { - requestObserver.onNext(value); - return true; - } catch (Throwable ex) { - if (ex instanceof StatusRuntimeException) { - log.warn("Unable to send subscribe request to {}: {}", - deviceId, ex.getMessage()); - } else { - log.warn("Exception while sending subscribe request to {}", - deviceId, ex); - } - complete(); - return false; - } - } - } - - public void complete() { - synchronized (this) { - if (requestObserver != null) { - requestObserver.onCompleted(); - requestObserver.cancel("Terminated", null); - requestObserver = null; - } - } - } - } - - - /** - * Handles messages received from the device on the stream channel. - */ - private final class InternalStreamResponseObserver - implements StreamObserver { - - private final StreamChannelManager streamChannelManager; - - private InternalStreamResponseObserver( - StreamChannelManager streamChannelManager) { - this.streamChannelManager = streamChannelManager; - } - - @Override - public void onNext(SubscribeResponse message) { - executorService.submit(() -> doNext(message)); - } - - private void doNext(SubscribeResponse message) { - try { - log.debug("Received message on stream channel from {}: {}", - deviceId, message.toString()); - GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse()); - GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update); - controller.postEvent(event); - } catch (Throwable ex) { - log.error("Exception while processing stream message from {}", - deviceId, ex); - } - } - - @Override - public void onError(Throwable throwable) { - if (throwable instanceof StatusRuntimeException) { - StatusRuntimeException sre = (StatusRuntimeException) throwable; - if (sre.getStatus().getCause() instanceof ConnectException) { - log.warn("Device {} is unreachable ({})", - deviceId, sre.getCause().getMessage()); - } else { - log.warn("Received error on stream channel for {}: {}", - deviceId, throwable.getMessage()); - } - } else { - log.warn(format("Received exception on stream channel for %s", - deviceId), throwable); - } - streamChannelManager.complete(); - } - - @Override - public void onCompleted() { - log.warn("Stream channel for {} has completed", deviceId); - streamChannelManager.complete(); - } - } } diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java new file mode 100644 index 0000000000..050925f624 --- /dev/null +++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java @@ -0,0 +1,211 @@ +/* + * Copyright 2018-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package protocols.gnmi.ctl.java.org.onosproject.gnmi.ctl; + + +import gnmi.Gnmi; +import gnmi.gNMIGrpc; +import io.grpc.ManagedChannel; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.StreamObserver; +import org.onosproject.gnmi.api.GnmiEvent; +import org.onosproject.gnmi.api.GnmiUpdate; +import org.onosproject.net.DeviceId; +import org.slf4j.Logger; + +import java.net.ConnectException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static java.lang.String.format; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static org.onlab.util.Tools.groupedThreads; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * A manager for the gNMI stream channel that opportunistically creates + * new stream RCP stubs (e.g. when one fails because of errors) and posts + * subscribe events via the gNMI controller. + */ +final class GnmiSubscriptionManager { + + /** + * The state of the subscription manager. + */ + enum State { + + /** + * Subscription not exists. + */ + INIT, + + /** + * Exists a subscription and channel opened. + */ + SUBSCRIBED, + + /** + * Exists a subscription, but the channel does not open. + */ + RETRYING, + } + + // FIXME: make this configurable + private static final long DEFAULT_RECONNECT_DELAY = 5; // Seconds + private static final Logger log = getLogger(GnmiSubscriptionManager.class); + private final ManagedChannel channel; + private final DeviceId deviceId; + private final GnmiControllerImpl controller; + + private final StreamObserver responseObserver; + private final AtomicReference state = new AtomicReference<>(State.INIT); + + private ClientCallStreamObserver requestObserver; + private Gnmi.SubscribeRequest existingSubscription; + private final ScheduledExecutorService streamCheckerExecutor = + newSingleThreadScheduledExecutor(groupedThreads("onos/gnmi-probe", "%d", log)); + + GnmiSubscriptionManager(ManagedChannel channel, DeviceId deviceId, + GnmiControllerImpl controller) { + this.channel = channel; + this.deviceId = deviceId; + this.controller = controller; + this.responseObserver = new InternalStreamResponseObserver(); + streamCheckerExecutor.scheduleAtFixedRate(this::checkGnmiStream, 0, + DEFAULT_RECONNECT_DELAY, + TimeUnit.SECONDS); + } + + public void shutdown() { + log.info("gNMI subscription manager for device {} shutdown", deviceId); + streamCheckerExecutor.shutdown(); + complete(); + } + + private void initIfRequired() { + if (requestObserver == null) { + log.debug("Creating new stream channel for {}...", deviceId); + requestObserver = (ClientCallStreamObserver) + gNMIGrpc.newStub(channel).subscribe(responseObserver); + + } + } + + boolean subscribe(Gnmi.SubscribeRequest request) { + synchronized (state) { + if (state.get() == State.SUBSCRIBED) { + // Cancel subscription when we need to subscribe new thing + complete(); + } + + existingSubscription = request; + return send(request); + } + } + + private boolean send(Gnmi.SubscribeRequest value) { + initIfRequired(); + try { + requestObserver.onNext(value); + state.set(State.SUBSCRIBED); + return true; + } catch (Throwable ex) { + if (ex instanceof StatusRuntimeException) { + log.warn("Unable to send subscribe request to {}: {}", + deviceId, ex.getMessage()); + } else { + log.warn("Exception while sending subscribe request to {}", + deviceId, ex); + } + state.set(State.RETRYING); + return false; + } + } + + public void complete() { + synchronized (state) { + state.set(State.INIT); + if (requestObserver != null) { + requestObserver.onCompleted(); + requestObserver.cancel("Terminated", null); + requestObserver = null; + } + } + } + + private void checkGnmiStream() { + synchronized (state) { + if (state.get() != State.RETRYING) { + // No need to retry if the state is not RETRYING + return; + } + log.info("Try reconnecting gNMI stream to device {}", deviceId); + + complete(); + send(existingSubscription); + } + } + + /** + * Handles messages received from the device on the stream channel. + */ + private final class InternalStreamResponseObserver + implements StreamObserver { + + @Override + public void onNext(Gnmi.SubscribeResponse message) { + try { + log.debug("Received message on stream channel from {}: {}", + deviceId, message.toString()); + GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse()); + GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update); + controller.postEvent(event); + } catch (Throwable ex) { + log.error("Exception while processing stream message from {}", + deviceId, ex); + } + } + + @Override + public void onError(Throwable throwable) { + if (throwable instanceof StatusRuntimeException) { + StatusRuntimeException sre = (StatusRuntimeException) throwable; + if (sre.getStatus().getCause() instanceof ConnectException) { + log.warn("Device {} is unreachable ({})", + deviceId, sre.getCause().getMessage()); + } else { + log.warn("Received error on stream channel for {}: {}", + deviceId, throwable.getMessage()); + } + } else { + log.warn(format("Received exception on stream channel for %s", + deviceId), throwable); + } + state.set(State.RETRYING); + } + + @Override + public void onCompleted() { + log.warn("Stream channel for {} has completed", deviceId); + state.set(State.RETRYING); + } + } +} + +