diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java index fdd59805f7..5b0725468e 100644 --- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java +++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelController.java @@ -20,9 +20,7 @@ import com.google.common.annotations.Beta; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; -import org.onosproject.net.DeviceId; -import java.util.Collection; import java.util.Map; import java.util.Optional; @@ -82,14 +80,6 @@ public interface GrpcChannelController { */ boolean isChannelOpen(GrpcChannelId channelId); - /** - * Returns all channels associated to the given device ID. - * - * @param deviceId device ID - * @return collection of channels - */ - Collection getChannels(DeviceId deviceId); - /** * If present, returns the channel associated with the given ID. * diff --git a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java index 87609f491c..6db331ab2d 100644 --- a/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java +++ b/protocols/grpc/api/src/main/java/org/onosproject/grpc/api/GrpcChannelId.java @@ -18,57 +18,24 @@ package org.onosproject.grpc.api; import com.google.common.annotations.Beta; import org.onlab.util.Identifier; -import org.onosproject.net.DeviceId; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; /** - * gRPC managed channel identifier, unique in the scope of a gRPC controller - * instance. + * gRPC channel identifier, unique in the scope of an ONOS node. */ @Beta public final class GrpcChannelId extends Identifier { - private final DeviceId deviceId; - private final String channelName; - - private GrpcChannelId(DeviceId deviceId, String channelName) { - super(deviceId.toString() + ":" + channelName); - checkNotNull(deviceId, "device ID must not be null"); - checkNotNull(channelName, "channel name must not be null"); - checkArgument(!channelName.isEmpty(), "channel name must not be empty"); - this.deviceId = deviceId; - this.channelName = channelName; + private GrpcChannelId(String channelName) { + super(channelName); } /** - * Returns the device part of this channel ID. + * Instantiates a new channel ID. * - * @return device ID - */ - public DeviceId deviceId() { - return deviceId; - } - - /** - * Returns the channel name part of this channel ID. - * - * @return channel name - */ - public String channelName() { - return channelName; - } - - /** - * Instantiates a new channel ID for the given device ID and arbitrary - * channel name (e.g. the name of the gRPC service). - * - * @param deviceId device ID * @param channelName name of the channel * @return channel ID */ - public static GrpcChannelId of(DeviceId deviceId, String channelName) { - return new GrpcChannelId(deviceId, channelName); + public static GrpcChannelId of(String channelName) { + return new GrpcChannelId(channelName); } } diff --git a/protocols/grpc/ctl/BUILD b/protocols/grpc/ctl/BUILD index ac0703d1ec..11f98c0a26 100644 --- a/protocols/grpc/ctl/BUILD +++ b/protocols/grpc/ctl/BUILD @@ -3,6 +3,9 @@ COMPILE_DEPS = CORE_DEPS + [ "//protocols/grpc/proto:onos-protocols-grpc-proto", "@io_grpc_grpc_java//core", "@io_grpc_grpc_java//netty", + "@io_grpc_grpc_java//protobuf-lite", + "@com_google_protobuf//:protobuf_java", + "@com_google_api_grpc_proto_google_common_protos//jar", ] osgi_jar( diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java index e5f4884a5d..dc7ae9db39 100644 --- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java +++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/AbstractGrpcClientController.java @@ -115,7 +115,7 @@ public abstract class AbstractGrpcClientController log.info("Creating client for {} (server={}:{})...", deviceId, serverAddr, serverPort); - GrpcChannelId channelId = GrpcChannelId.of(clientKey.deviceId(), clientKey.toString()); + GrpcChannelId channelId = GrpcChannelId.of(clientKey.toString()); ManagedChannelBuilder channelBuilder = NettyChannelBuilder .forAddress(serverAddr, serverPort) .maxInboundMessageSize(DEFAULT_MAX_INBOUND_MSG_SIZE * MEGABYTES) diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java index 4ad3166e16..ca0c8c37b5 100644 --- a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java +++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcChannelControllerImpl.java @@ -17,18 +17,9 @@ package org.onosproject.grpc.ctl; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Striped; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; -import io.grpc.ForwardingClientCall; -import io.grpc.ForwardingClientCallListener; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.StatusRuntimeException; import org.onlab.util.Tools; @@ -37,7 +28,6 @@ import org.onosproject.grpc.api.GrpcChannelController; import org.onosproject.grpc.api.GrpcChannelId; import org.onosproject.grpc.proto.dummy.Dummy; import org.onosproject.grpc.proto.dummy.DummyServiceGrpc; -import org.onosproject.net.DeviceId; import org.osgi.service.component.ComponentContext; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; @@ -48,14 +38,12 @@ import org.osgi.service.component.annotations.ReferenceCardinality; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; import java.util.Dictionary; -import java.util.HashSet; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import static com.google.common.base.Preconditions.checkNotNull; @@ -68,28 +56,31 @@ import static org.onosproject.grpc.ctl.OsgiPropertyConstants.ENABLE_MESSAGE_LOG_ */ @Component(immediate = true, service = GrpcChannelController.class, property = { - ENABLE_MESSAGE_LOG + ":Boolean=" + ENABLE_MESSAGE_LOG_DEFAULT, + ENABLE_MESSAGE_LOG + ":Boolean=" + ENABLE_MESSAGE_LOG_DEFAULT, }) public class GrpcChannelControllerImpl implements GrpcChannelController { - // FIXME: Should use message size to determine whether it needs to log the message or not. - private static final String SET_FORWARDING_PIPELINE_CONFIG_METHOD = "p4.P4Runtime/SetForwardingPipelineConfig"; - @Reference(cardinality = ReferenceCardinality.MANDATORY) protected ComponentConfigService componentConfigService; - /** Indicates whether to log gRPC messages. */ - private static boolean enableMessageLog = ENABLE_MESSAGE_LOG_DEFAULT; + /** + * Indicates whether to log gRPC messages. + */ + private final AtomicBoolean enableMessageLog = new AtomicBoolean( + ENABLE_MESSAGE_LOG_DEFAULT); private final Logger log = LoggerFactory.getLogger(getClass()); private Map channels; + private Map interceptors; + private final Striped channelLocks = Striped.lock(30); @Activate public void activate() { componentConfigService.registerProperties(getClass()); channels = new ConcurrentHashMap<>(); + interceptors = new ConcurrentHashMap<>(); log.info("Started"); } @@ -97,10 +88,12 @@ public class GrpcChannelControllerImpl implements GrpcChannelController { public void modified(ComponentContext context) { if (context != null) { Dictionary properties = context.getProperties(); - enableMessageLog = Tools.isPropertyEnabled( - properties, ENABLE_MESSAGE_LOG, ENABLE_MESSAGE_LOG_DEFAULT); - log.info("Configured. Log of gRPC messages is {} for new channels", - enableMessageLog ? "enabled" : "disabled"); + enableMessageLog.set(Tools.isPropertyEnabled( + properties, ENABLE_MESSAGE_LOG, ENABLE_MESSAGE_LOG_DEFAULT)); + log.info("Configured. Logging of gRPC messages is {}", + enableMessageLog.get() + ? "ENABLED for new channels" + : "DISABLED for new and existing channels"); } } @@ -109,6 +102,10 @@ public class GrpcChannelControllerImpl implements GrpcChannelController { componentConfigService.unregisterProperties(getClass(), false); channels.values().forEach(ManagedChannel::shutdownNow); channels.clear(); + channels = null; + interceptors.values().forEach(GrpcLoggingInterceptor::close); + interceptors.clear(); + interceptors = null; log.info("Stopped"); } @@ -126,8 +123,11 @@ public class GrpcChannelControllerImpl implements GrpcChannelController { throw new IllegalArgumentException(format( "A channel with ID '%s' already exists", channelId)); } - if (enableMessageLog) { - channelBuilder.intercept(new InternalLogChannelInterceptor(channelId)); + + GrpcLoggingInterceptor interceptor = null; + if (enableMessageLog.get()) { + interceptor = new GrpcLoggingInterceptor(channelId, enableMessageLog); + channelBuilder.intercept(interceptor); } ManagedChannel channel = channelBuilder.build(); // Forced connection API is still experimental. Use workaround... @@ -135,11 +135,17 @@ public class GrpcChannelControllerImpl implements GrpcChannelController { try { doDummyMessage(channel); } catch (StatusRuntimeException e) { + if (interceptor != null) { + interceptor.close(); + } shutdownNowAndWait(channel, channelId); throw e; } // If here, channel is open. channels.put(channelId, channel); + if (interceptor != null) { + interceptors.put(channelId, interceptor); + } return channel; } finally { lock.unlock(); @@ -200,6 +206,10 @@ public class GrpcChannelControllerImpl implements GrpcChannelController { if (channel != null) { shutdownNowAndWait(channel, channelId); } + final GrpcLoggingInterceptor interceptor = interceptors.remove(channelId); + if (interceptor != null) { + interceptor.close(); + } } finally { lock.unlock(); } @@ -224,19 +234,6 @@ public class GrpcChannelControllerImpl implements GrpcChannelController { return ImmutableMap.copyOf(channels); } - @Override - public Collection getChannels(final DeviceId deviceId) { - checkNotNull(deviceId); - final Set deviceChannels = new HashSet<>(); - channels.forEach((k, v) -> { - if (k.deviceId().equals(deviceId)) { - deviceChannels.add(v); - } - }); - - return ImmutableSet.copyOf(deviceChannels); - } - @Override public Optional getChannel(GrpcChannelId channelId) { checkNotNull(channelId); @@ -251,57 +248,4 @@ public class GrpcChannelControllerImpl implements GrpcChannelController { } } - /** - * gRPC client interceptor that logs all messages sent and received. - */ - private final class InternalLogChannelInterceptor implements ClientInterceptor { - - private final GrpcChannelId channelId; - - private InternalLogChannelInterceptor(GrpcChannelId channelId) { - this.channelId = channelId; - } - - @Override - public ClientCall interceptCall( - MethodDescriptor methodDescriptor, - CallOptions callOptions, Channel channel) { - return new ForwardingClientCall.SimpleForwardingClientCall( - channel.newCall(methodDescriptor, callOptions.withoutWaitForReady())) { - - @Override - public void sendMessage(ReqT message) { - if (enableMessageLog && !methodDescriptor.getFullMethodName() - .startsWith(SET_FORWARDING_PIPELINE_CONFIG_METHOD)) { - log.info("*** SENDING GRPC MESSAGE [{}]\n{}:\n{}", - channelId, methodDescriptor.getFullMethodName(), - message.toString()); - } - super.sendMessage(message); - } - - @Override - public void start(Listener responseListener, Metadata headers) { - - ClientCall.Listener listener = new ForwardingClientCallListener() { - @Override - protected Listener delegate() { - return responseListener; - } - - @Override - public void onMessage(RespT message) { - if (enableMessageLog) { - log.info("*** RECEIVED GRPC MESSAGE [{}]\n{}:\n{}", - channelId, methodDescriptor.getFullMethodName(), - message.toString()); - } - super.onMessage(message); - } - }; - super.start(listener, headers); - } - }; - } - } } diff --git a/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java new file mode 100644 index 0000000000..774152e5fb --- /dev/null +++ b/protocols/grpc/ctl/src/main/java/org/onosproject/grpc/ctl/GrpcLoggingInterceptor.java @@ -0,0 +1,204 @@ +/* + * Copyright 2019-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onosproject.grpc.ctl; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.ForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.protobuf.lite.ProtoLiteUtils; +import org.onosproject.grpc.api.GrpcChannelId; +import org.slf4j.Logger; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.StringJoiner; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static java.lang.String.format; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * gRPC client interceptor that logs to file all messages sent and received. + */ +final class GrpcLoggingInterceptor implements ClientInterceptor { + + private static final Metadata.Key GRPC_STATUS_KEY = + Metadata.Key.of( + "grpc-status-details-bin", + ProtoLiteUtils.metadataMarshaller( + com.google.rpc.Status.getDefaultInstance())); + + private static final Logger log = getLogger(GrpcLoggingInterceptor.class); + + private final AtomicLong callIdGenerator = new AtomicLong(); + private final GrpcChannelId channelId; + private final AtomicBoolean enabled; + + private FileWriter writer; + + GrpcLoggingInterceptor(GrpcChannelId channelId, AtomicBoolean enabled) { + this.channelId = channelId; + this.enabled = enabled; + try { + writer = initWriter(); + write(format("GRPC CALL LOG - %s\n\n", channelId)); + } catch (IOException e) { + log.error("Unable to initialize gRPC call log writer", e); + } + } + + private FileWriter initWriter() throws IOException { + final String safeChName = channelId.id() + .replaceAll("[^A-Za-z0-9]", "_"); + final String fileName = format("grpc_%s_", safeChName).toLowerCase(); + final File tmpFile = File.createTempFile(fileName, ".log"); + log.info("Created gRPC call log file for channel {}: {}", + channelId, tmpFile.getAbsolutePath()); + return new FileWriter(tmpFile); + } + + void close() { + synchronized (this) { + if (writer == null) { + return; + } + try { + log.info("Closing log writer for {}...", channelId); + writer.close(); + } catch (IOException e) { + log.error("Unable to close gRPC call log writer", e); + } + writer = null; + } + } + + private void write(String message) { + synchronized (this) { + if (writer != null) { + if (message.length() > 4096) { + message = message.substring(0, 256) + "... TRUNCATED!\n\n"; + } + try { + writer.write(format( + "*** %s - %s", + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S") + .format(new Date()), + message)); + } catch (IOException e) { + log.error("Unable to write gRPC call log", e); + } + } + } + } + + @Override + public ClientCall interceptCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions, Channel channel) { + return new ForwardingClientCall.SimpleForwardingClientCall( + channel.newCall(methodDescriptor, callOptions.withoutWaitForReady())) { + + private final long callId = callIdGenerator.getAndIncrement(); + + @Override + public void sendMessage(ReqT message) { + if (enabled.get()) { + write(format( + "%s >> OUTBOUND >> [callId=%s]\n%s\n", + methodDescriptor.getFullMethodName(), + callId, + message.toString())); + } + super.sendMessage(message); + } + + @Override + public void start(Listener responseListener, Metadata headers) { + + if (enabled.get()) { + write(format( + "%s STARTED [callId=%s]\n%s\n\n", + methodDescriptor.getFullMethodName(), + callId, + headers.toString())); + } + + Listener listener = new ForwardingClientCallListener() { + @Override + protected Listener delegate() { + return responseListener; + } + + @Override + public void onMessage(RespT message) { + if (enabled.get()) { + write(format( + "%s << INBOUND << [callId=%s]\n%s\n", + methodDescriptor.getFullMethodName(), + callId, + message.toString())); + } + super.onMessage(message); + } + + @Override + public void onClose(Status status, Metadata trailers) { + if (enabled.get()) { + write(format( + "%s CLOSED [callId=%s]\n%s\n%s\n\n", + methodDescriptor.getFullMethodName(), + callId, + status.toString(), + parseTrailers(trailers))); + } + super.onClose(status, trailers); + } + + private String parseTrailers(Metadata trailers) { + StringJoiner joiner = new StringJoiner("\n"); + joiner.add(trailers.toString()); + // If Google's RPC Status trailers are found, parse them. + final Iterable statuses = trailers.getAll( + GRPC_STATUS_KEY); + if (statuses == null) { + return joiner.toString(); + } + statuses.forEach(s -> joiner.add(s.toString())); + return joiner.toString(); + } + + @Override + public void onHeaders(Metadata headers) { + super.onHeaders(headers); + } + }; + + super.start(listener, headers); + } + }; + } +}