[ONOS-7873] Add retry mechanism to gNMI stream channel manager

Change-Id: Ifdd5b1c3fe9d3588913697aace9b77b27fb442f5
(cherry picked from commit 07d9b842f6b3bc8be3d33b2a666f1213231fd2c6)
This commit is contained in:
Yi Tseng 2018-12-14 14:19:18 -08:00 committed by Carmelo Cascone
parent a71b849708
commit a7f76c17e9
2 changed files with 217 additions and 132 deletions

View File

@ -24,25 +24,17 @@ import gnmi.Gnmi.PathElem;
import gnmi.Gnmi.SetRequest;
import gnmi.Gnmi.SetResponse;
import gnmi.Gnmi.SubscribeRequest;
import gnmi.Gnmi.SubscribeResponse;
import gnmi.gNMIGrpc;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.onosproject.gnmi.api.GnmiClient;
import org.onosproject.gnmi.api.GnmiClientKey;
import org.onosproject.gnmi.api.GnmiEvent;
import org.onosproject.gnmi.api.GnmiUpdate;
import org.onosproject.grpc.ctl.AbstractGrpcClient;
import org.slf4j.Logger;
import java.net.ConnectException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.lang.String.format;
import static org.slf4j.LoggerFactory.getLogger;
/**
@ -54,14 +46,13 @@ public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient {
private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build();
private final Logger log = getLogger(getClass());
private final gNMIGrpc.gNMIBlockingStub blockingStub;
private StreamChannelManager streamChannelManager;
private GnmiControllerImpl controller;
private GnmiSubscriptionManager gnmiSubscriptionManager;
GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
super(clientKey);
this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
this.streamChannelManager = new StreamChannelManager(managedChannel);
this.controller = controller;
this.gnmiSubscriptionManager =
new GnmiSubscriptionManager(managedChannel, deviceId, controller);
}
@Override
@ -81,12 +72,12 @@ public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient {
@Override
public boolean subscribe(SubscribeRequest request) {
return streamChannelManager.send(request);
return gnmiSubscriptionManager.subscribe(request);
}
@Override
public void terminateSubscriptionChannel() {
streamChannelManager.complete();
gnmiSubscriptionManager.complete();
}
@Override
@ -96,7 +87,7 @@ public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient {
@Override
protected Void doShutdown() {
streamChannelManager.complete();
gnmiSubscriptionManager.shutdown();
return super.doShutdown();
}
@ -139,121 +130,4 @@ public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient {
return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT);
}
}
/**
* A manager for the gNMI stream channel that opportunistically creates
* new stream RCP stubs (e.g. when one fails because of errors) and posts
* subscribe events via the gNMI controller.
*/
private final class StreamChannelManager {
private final ManagedChannel channel;
private final AtomicBoolean open;
private final StreamObserver<SubscribeResponse> responseObserver;
private ClientCallStreamObserver<SubscribeRequest> requestObserver;
private StreamChannelManager(ManagedChannel channel) {
this.channel = channel;
this.responseObserver = new InternalStreamResponseObserver(this);
this.open = new AtomicBoolean(false);
}
private void initIfRequired() {
if (requestObserver == null) {
log.debug("Creating new stream channel for {}...", deviceId);
requestObserver = (ClientCallStreamObserver<SubscribeRequest>)
gNMIGrpc.newStub(channel).subscribe(responseObserver);
open.set(false);
}
}
public boolean send(SubscribeRequest value) {
synchronized (this) {
initIfRequired();
try {
requestObserver.onNext(value);
return true;
} catch (Throwable ex) {
if (ex instanceof StatusRuntimeException) {
log.warn("Unable to send subscribe request to {}: {}",
deviceId, ex.getMessage());
} else {
log.warn("Exception while sending subscribe request to {}",
deviceId, ex);
}
complete();
return false;
}
}
}
public void complete() {
synchronized (this) {
if (requestObserver != null) {
requestObserver.onCompleted();
requestObserver.cancel("Terminated", null);
requestObserver = null;
}
}
}
}
/**
* Handles messages received from the device on the stream channel.
*/
private final class InternalStreamResponseObserver
implements StreamObserver<SubscribeResponse> {
private final StreamChannelManager streamChannelManager;
private InternalStreamResponseObserver(
StreamChannelManager streamChannelManager) {
this.streamChannelManager = streamChannelManager;
}
@Override
public void onNext(SubscribeResponse message) {
executorService.submit(() -> doNext(message));
}
private void doNext(SubscribeResponse message) {
try {
log.debug("Received message on stream channel from {}: {}",
deviceId, message.toString());
GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
controller.postEvent(event);
} catch (Throwable ex) {
log.error("Exception while processing stream message from {}",
deviceId, ex);
}
}
@Override
public void onError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
StatusRuntimeException sre = (StatusRuntimeException) throwable;
if (sre.getStatus().getCause() instanceof ConnectException) {
log.warn("Device {} is unreachable ({})",
deviceId, sre.getCause().getMessage());
} else {
log.warn("Received error on stream channel for {}: {}",
deviceId, throwable.getMessage());
}
} else {
log.warn(format("Received exception on stream channel for %s",
deviceId), throwable);
}
streamChannelManager.complete();
}
@Override
public void onCompleted() {
log.warn("Stream channel for {} has completed", deviceId);
streamChannelManager.complete();
}
}
}

View File

@ -0,0 +1,211 @@
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package protocols.gnmi.ctl.java.org.onosproject.gnmi.ctl;
import gnmi.Gnmi;
import gnmi.gNMIGrpc;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.onosproject.gnmi.api.GnmiEvent;
import org.onosproject.gnmi.api.GnmiUpdate;
import org.onosproject.net.DeviceId;
import org.slf4j.Logger;
import java.net.ConnectException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static java.lang.String.format;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* A manager for the gNMI stream channel that opportunistically creates
* new stream RCP stubs (e.g. when one fails because of errors) and posts
* subscribe events via the gNMI controller.
*/
final class GnmiSubscriptionManager {
/**
* The state of the subscription manager.
*/
enum State {
/**
* Subscription not exists.
*/
INIT,
/**
* Exists a subscription and channel opened.
*/
SUBSCRIBED,
/**
* Exists a subscription, but the channel does not open.
*/
RETRYING,
}
// FIXME: make this configurable
private static final long DEFAULT_RECONNECT_DELAY = 5; // Seconds
private static final Logger log = getLogger(GnmiSubscriptionManager.class);
private final ManagedChannel channel;
private final DeviceId deviceId;
private final GnmiControllerImpl controller;
private final StreamObserver<Gnmi.SubscribeResponse> responseObserver;
private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
private ClientCallStreamObserver<Gnmi.SubscribeRequest> requestObserver;
private Gnmi.SubscribeRequest existingSubscription;
private final ScheduledExecutorService streamCheckerExecutor =
newSingleThreadScheduledExecutor(groupedThreads("onos/gnmi-probe", "%d", log));
GnmiSubscriptionManager(ManagedChannel channel, DeviceId deviceId,
GnmiControllerImpl controller) {
this.channel = channel;
this.deviceId = deviceId;
this.controller = controller;
this.responseObserver = new InternalStreamResponseObserver();
streamCheckerExecutor.scheduleAtFixedRate(this::checkGnmiStream, 0,
DEFAULT_RECONNECT_DELAY,
TimeUnit.SECONDS);
}
public void shutdown() {
log.info("gNMI subscription manager for device {} shutdown", deviceId);
streamCheckerExecutor.shutdown();
complete();
}
private void initIfRequired() {
if (requestObserver == null) {
log.debug("Creating new stream channel for {}...", deviceId);
requestObserver = (ClientCallStreamObserver<Gnmi.SubscribeRequest>)
gNMIGrpc.newStub(channel).subscribe(responseObserver);
}
}
boolean subscribe(Gnmi.SubscribeRequest request) {
synchronized (state) {
if (state.get() == State.SUBSCRIBED) {
// Cancel subscription when we need to subscribe new thing
complete();
}
existingSubscription = request;
return send(request);
}
}
private boolean send(Gnmi.SubscribeRequest value) {
initIfRequired();
try {
requestObserver.onNext(value);
state.set(State.SUBSCRIBED);
return true;
} catch (Throwable ex) {
if (ex instanceof StatusRuntimeException) {
log.warn("Unable to send subscribe request to {}: {}",
deviceId, ex.getMessage());
} else {
log.warn("Exception while sending subscribe request to {}",
deviceId, ex);
}
state.set(State.RETRYING);
return false;
}
}
public void complete() {
synchronized (state) {
state.set(State.INIT);
if (requestObserver != null) {
requestObserver.onCompleted();
requestObserver.cancel("Terminated", null);
requestObserver = null;
}
}
}
private void checkGnmiStream() {
synchronized (state) {
if (state.get() != State.RETRYING) {
// No need to retry if the state is not RETRYING
return;
}
log.info("Try reconnecting gNMI stream to device {}", deviceId);
complete();
send(existingSubscription);
}
}
/**
* Handles messages received from the device on the stream channel.
*/
private final class InternalStreamResponseObserver
implements StreamObserver<Gnmi.SubscribeResponse> {
@Override
public void onNext(Gnmi.SubscribeResponse message) {
try {
log.debug("Received message on stream channel from {}: {}",
deviceId, message.toString());
GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
controller.postEvent(event);
} catch (Throwable ex) {
log.error("Exception while processing stream message from {}",
deviceId, ex);
}
}
@Override
public void onError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
StatusRuntimeException sre = (StatusRuntimeException) throwable;
if (sre.getStatus().getCause() instanceof ConnectException) {
log.warn("Device {} is unreachable ({})",
deviceId, sre.getCause().getMessage());
} else {
log.warn("Received error on stream channel for {}: {}",
deviceId, throwable.getMessage());
}
} else {
log.warn(format("Received exception on stream channel for %s",
deviceId), throwable);
}
state.set(State.RETRYING);
}
@Override
public void onCompleted() {
log.warn("Stream channel for {} has completed", deviceId);
state.set(State.RETRYING);
}
}
}