Enhanced gRPC call logging

- Write calls to file, one file for each channel
- Log started, inbound msg, outbound msg, and closed events, for each RPC
- Distinguish between different RPCs by assigning an ID to each one

Also, removed redundant DeviceId attribute from GrpcChannelId, as all
channel IDs were already created using a client key that contains the
DeviceID. It seems a better approach to not restrict the definition of a
channel ID and have that defined simply as a string.

Change-Id: I9d88e528218a5689d6835c9b48022119976b6c5a
This commit is contained in:
Carmelo Cascone 2019-02-04 23:11:26 -08:00
parent 9b582b0b12
commit 73f4530c38
6 changed files with 249 additions and 141 deletions

View File

@ -20,9 +20,7 @@ import com.google.common.annotations.Beta;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import org.onosproject.net.DeviceId;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
@ -82,14 +80,6 @@ public interface GrpcChannelController {
*/
boolean isChannelOpen(GrpcChannelId channelId);
/**
* Returns all channels associated to the given device ID.
*
* @param deviceId device ID
* @return collection of channels
*/
Collection<ManagedChannel> getChannels(DeviceId deviceId);
/**
* If present, returns the channel associated with the given ID.
*

View File

@ -18,57 +18,24 @@ package org.onosproject.grpc.api;
import com.google.common.annotations.Beta;
import org.onlab.util.Identifier;
import org.onosproject.net.DeviceId;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* gRPC managed channel identifier, unique in the scope of a gRPC controller
* instance.
* gRPC channel identifier, unique in the scope of an ONOS node.
*/
@Beta
public final class GrpcChannelId extends Identifier<String> {
private final DeviceId deviceId;
private final String channelName;
private GrpcChannelId(DeviceId deviceId, String channelName) {
super(deviceId.toString() + ":" + channelName);
checkNotNull(deviceId, "device ID must not be null");
checkNotNull(channelName, "channel name must not be null");
checkArgument(!channelName.isEmpty(), "channel name must not be empty");
this.deviceId = deviceId;
this.channelName = channelName;
private GrpcChannelId(String channelName) {
super(channelName);
}
/**
* Returns the device part of this channel ID.
* Instantiates a new channel ID.
*
* @return device ID
*/
public DeviceId deviceId() {
return deviceId;
}
/**
* Returns the channel name part of this channel ID.
*
* @return channel name
*/
public String channelName() {
return channelName;
}
/**
* Instantiates a new channel ID for the given device ID and arbitrary
* channel name (e.g. the name of the gRPC service).
*
* @param deviceId device ID
* @param channelName name of the channel
* @return channel ID
*/
public static GrpcChannelId of(DeviceId deviceId, String channelName) {
return new GrpcChannelId(deviceId, channelName);
public static GrpcChannelId of(String channelName) {
return new GrpcChannelId(channelName);
}
}

View File

@ -3,6 +3,9 @@ COMPILE_DEPS = CORE_DEPS + [
"//protocols/grpc/proto:onos-protocols-grpc-proto",
"@io_grpc_grpc_java//core",
"@io_grpc_grpc_java//netty",
"@io_grpc_grpc_java//protobuf-lite",
"@com_google_protobuf//:protobuf_java",
"@com_google_api_grpc_proto_google_common_protos//jar",
]
osgi_jar(

View File

@ -115,7 +115,7 @@ public abstract class AbstractGrpcClientController
log.info("Creating client for {} (server={}:{})...",
deviceId, serverAddr, serverPort);
GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(), clientKey.toString());
GrpcChannelId channelId = GrpcChannelId.of(clientKey.toString());
ManagedChannelBuilder channelBuilder = NettyChannelBuilder
.forAddress(serverAddr, serverPort)
.maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES)

View File

@ -17,18 +17,9 @@
package org.onosproject.grpc.ctl;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Striped;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.onlab.util.Tools;
@ -37,7 +28,6 @@ import org.onosproject.grpc.api.GrpcChannelController;
import org.onosproject.grpc.api.GrpcChannelId;
import org.onosproject.grpc.proto.dummy.Dummy;
import org.onosproject.grpc.proto.dummy.DummyServiceGrpc;
import org.onosproject.net.DeviceId;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@ -48,14 +38,12 @@ import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import static com.google.common.base.Preconditions.checkNotNull;
@ -68,28 +56,31 @@ import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG_
*/
@Component(immediate = true, service = GrpcChannelController.class,
property = {
ENABLE_MESSAGE_LOG + ":Boolean=" + ENABLE_MESSAGE_LOG_DEFAULT,
ENABLE_MESSAGE_LOG + ":Boolean=" + ENABLE_MESSAGE_LOG_DEFAULT,
})
public class GrpcChannelControllerImpl implements GrpcChannelController {
// FIXME: Should use message size to determine whether it needs to log the message or not.
private static final String SET_FORWARDING_PIPELINE_CONFIG_METHOD = "p4.P4Runtime/SetForwardingPipelineConfig";
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService componentConfigService;
/** Indicates whether to log gRPC messages. */
private static boolean enableMessageLog = ENABLE_MESSAGE_LOG_DEFAULT;
/**
* Indicates whether to log gRPC messages.
*/
private final AtomicBoolean enableMessageLog = new AtomicBoolean(
ENABLE_MESSAGE_LOG_DEFAULT);
private final Logger log = LoggerFactory.getLogger(getClass());
private Map<GrpcChannelId, ManagedChannel> channels;
private Map<GrpcChannelId, GrpcLoggingInterceptor> interceptors;
private final Striped<Lock> channelLocks = Striped.lock(30);
@Activate
public void activate() {
componentConfigService.registerProperties(getClass());
channels = new ConcurrentHashMap<>();
interceptors = new ConcurrentHashMap<>();
log.info("Started");
}
@ -97,10 +88,12 @@ public class GrpcChannelControllerImpl implements GrpcChannelController {
public void modified(ComponentContext context) {
if (context != null) {
Dictionary<?, ?> properties = context.getProperties();
enableMessageLog = Tools.isPropertyEnabled(
properties, ENABLE_MESSAGE_LOG, ENABLE_MESSAGE_LOG_DEFAULT);
log.info("Configured. Log of gRPC messages is {} for new channels",
enableMessageLog ? "enabled" : "disabled");
enableMessageLog.set(Tools.isPropertyEnabled(
properties, ENABLE_MESSAGE_LOG, ENABLE_MESSAGE_LOG_DEFAULT));
log.info("Configured. Logging of gRPC messages is {}",
enableMessageLog.get()
? "ENABLED for new channels"
: "DISABLED for new and existing channels");
}
}
@ -109,6 +102,10 @@ public class GrpcChannelControllerImpl implements GrpcChannelController {
componentConfigService.unregisterProperties(getClass(), false);
channels.values().forEach(ManagedChannel::shutdownNow);
channels.clear();
channels = null;
interceptors.values().forEach(GrpcLoggingInterceptor::close);
interceptors.clear();
interceptors = null;
log.info("Stopped");
}
@ -126,8 +123,11 @@ public class GrpcChannelControllerImpl implements GrpcChannelController {
throw new IllegalArgumentException(format(
"A channel with ID '%s' already exists", channelId));
}
if (enableMessageLog) {
channelBuilder.intercept(new InternalLogChannelInterceptor(channelId));
GrpcLoggingInterceptor interceptor = null;
if (enableMessageLog.get()) {
interceptor = new GrpcLoggingInterceptor(channelId, enableMessageLog);
channelBuilder.intercept(interceptor);
}
ManagedChannel channel = channelBuilder.build();
// Forced connection API is still experimental. Use workaround...
@ -135,11 +135,17 @@ public class GrpcChannelControllerImpl implements GrpcChannelController {
try {
doDummyMessage(channel);
} catch (StatusRuntimeException e) {
if (interceptor != null) {
interceptor.close();
}
shutdownNowAndWait(channel, channelId);
throw e;
}
// If here, channel is open.
channels.put(channelId, channel);
if (interceptor != null) {
interceptors.put(channelId, interceptor);
}
return channel;
} finally {
lock.unlock();
@ -200,6 +206,10 @@ public class GrpcChannelControllerImpl implements GrpcChannelController {
if (channel != null) {
shutdownNowAndWait(channel, channelId);
}
final GrpcLoggingInterceptor interceptor = interceptors.remove(channelId);
if (interceptor != null) {
interceptor.close();
}
} finally {
lock.unlock();
}
@ -224,19 +234,6 @@ public class GrpcChannelControllerImpl implements GrpcChannelController {
return ImmutableMap.copyOf(channels);
}
@Override
public Collection<ManagedChannel> getChannels(final DeviceId deviceId) {
checkNotNull(deviceId);
final Set<ManagedChannel> deviceChannels = new HashSet<>();
channels.forEach((k, v) -> {
if (k.deviceId().equals(deviceId)) {
deviceChannels.add(v);
}
});
return ImmutableSet.copyOf(deviceChannels);
}
@Override
public Optional<ManagedChannel> getChannel(GrpcChannelId channelId) {
checkNotNull(channelId);
@ -251,57 +248,4 @@ public class GrpcChannelControllerImpl implements GrpcChannelController {
}
}
/**
* gRPC client interceptor that logs all messages sent and received.
*/
private final class InternalLogChannelInterceptor implements ClientInterceptor {
private final GrpcChannelId channelId;
private InternalLogChannelInterceptor(GrpcChannelId channelId) {
this.channelId = channelId;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions.withoutWaitForReady())) {
@Override
public void sendMessage(ReqT message) {
if (enableMessageLog && !methodDescriptor.getFullMethodName()
.startsWith(SET_FORWARDING_PIPELINE_CONFIG_METHOD)) {
log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}",
channelId, methodDescriptor.getFullMethodName(),
message.toString());
}
super.sendMessage(message);
}
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
ClientCall.Listener<RespT> listener = new ForwardingClientCallListener<RespT>() {
@Override
protected Listener<RespT> delegate() {
return responseListener;
}
@Override
public void onMessage(RespT message) {
if (enableMessageLog) {
log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}",
channelId, methodDescriptor.getFullMethodName(),
message.toString());
}
super.onMessage(message);
}
};
super.start(listener, headers);
}
};
}
}
}

View File

@ -0,0 +1,204 @@
/*
* Copyright 2019-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.grpc.ctl;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.protobuf.lite.ProtoLiteUtils;
import org.onosproject.grpc.api.GrpcChannelId;
import org.slf4j.Logger;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static java.lang.String.format;
import static org.slf4j.LoggerFactory.getLogger;
/**
* gRPC client interceptor that logs to file all messages sent and received.
*/
final class GrpcLoggingInterceptor implements ClientInterceptor {
private static final Metadata.Key<com.google.rpc.Status> GRPC_STATUS_KEY =
Metadata.Key.of(
"grpc-status-details-bin",
ProtoLiteUtils.metadataMarshaller(
com.google.rpc.Status.getDefaultInstance()));
private static final Logger log = getLogger(GrpcLoggingInterceptor.class);
private final AtomicLong callIdGenerator = new AtomicLong();
private final GrpcChannelId channelId;
private final AtomicBoolean enabled;
private FileWriter writer;
GrpcLoggingInterceptor(GrpcChannelId channelId, AtomicBoolean enabled) {
this.channelId = channelId;
this.enabled = enabled;
try {
writer = initWriter();
write(format("GRPC CALL LOG - %s\n\n", channelId));
} catch (IOException e) {
log.error("Unable to initialize gRPC call log writer", e);
}
}
private FileWriter initWriter() throws IOException {
final String safeChName = channelId.id()
.replaceAll("[^A-Za-z0-9]", "_");
final String fileName = format("grpc_%s_", safeChName).toLowerCase();
final File tmpFile = File.createTempFile(fileName, ".log");
log.info("Created gRPC call log file for channel {}: {}",
channelId, tmpFile.getAbsolutePath());
return new FileWriter(tmpFile);
}
void close() {
synchronized (this) {
if (writer == null) {
return;
}
try {
log.info("Closing log writer for {}...", channelId);
writer.close();
} catch (IOException e) {
log.error("Unable to close gRPC call log writer", e);
}
writer = null;
}
}
private void write(String message) {
synchronized (this) {
if (writer != null) {
if (message.length() > 4096) {
message = message.substring(0, 256) + "... TRUNCATED!\n\n";
}
try {
writer.write(format(
"*** %s - %s",
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S")
.format(new Date()),
message));
} catch (IOException e) {
log.error("Unable to write gRPC call log", e);
}
}
}
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions.withoutWaitForReady())) {
private final long callId = callIdGenerator.getAndIncrement();
@Override
public void sendMessage(ReqT message) {
if (enabled.get()) {
write(format(
"%s >> OUTBOUND >> [callId=%s]\n%s\n",
methodDescriptor.getFullMethodName(),
callId,
message.toString()));
}
super.sendMessage(message);
}
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
if (enabled.get()) {
write(format(
"%s STARTED [callId=%s]\n%s\n\n",
methodDescriptor.getFullMethodName(),
callId,
headers.toString()));
}
Listener<RespT> listener = new ForwardingClientCallListener<RespT>() {
@Override
protected Listener<RespT> delegate() {
return responseListener;
}
@Override
public void onMessage(RespT message) {
if (enabled.get()) {
write(format(
"%s << INBOUND << [callId=%s]\n%s\n",
methodDescriptor.getFullMethodName(),
callId,
message.toString()));
}
super.onMessage(message);
}
@Override
public void onClose(Status status, Metadata trailers) {
if (enabled.get()) {
write(format(
"%s CLOSED [callId=%s]\n%s\n%s\n\n",
methodDescriptor.getFullMethodName(),
callId,
status.toString(),
parseTrailers(trailers)));
}
super.onClose(status, trailers);
}
private String parseTrailers(Metadata trailers) {
StringJoiner joiner = new StringJoiner("\n");
joiner.add(trailers.toString());
// If Google's RPC Status trailers are found, parse them.
final Iterable<com.google.rpc.Status> statuses = trailers.getAll(
GRPC_STATUS_KEY);
if (statuses == null) {
return joiner.toString();
}
statuses.forEach(s -> joiner.add(s.toString()));
return joiner.toString();
}
@Override
public void onHeaders(Metadata headers) {
super.onHeaders(headers);
}
};
super.start(listener, headers);
}
};
}
}