diff --git a/apps/openstacktelemetry/app/BUCK b/apps/openstacktelemetry/app/BUCK index f702a2a24c..063e928d8a 100644 --- a/apps/openstacktelemetry/app/BUCK +++ b/apps/openstacktelemetry/app/BUCK @@ -1,3 +1,5 @@ +GRPC_VER = '1.3.1' + COMPILE_DEPS = [ '//lib:CORE_DEPS', '//lib:JACKSON', @@ -8,6 +10,11 @@ COMPILE_DEPS = [ '//cli:onos-cli', '//lib:org.apache.karaf.shell.console', '//apps/openstacktelemetry/api:onos-apps-openstacktelemetry-api', + '//lib:kafka-clients', + '//lib:influxdb-java', + '//lib:GRPC_1.3', + '//incubator/grpc-dependencies:grpc-core-repkg-' + GRPC_VER, + '//lib:grpc-protobuf-lite-' + GRPC_VER, ] TEST_DEPS = [ diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java new file mode 100644 index 0000000000..20ded3e6eb --- /dev/null +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java @@ -0,0 +1,90 @@ +/* + * Copyright 2018-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.openstacktelemetry.impl; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.openstacktelemetry.api.GrpcTelemetryAdminService; +import org.onosproject.openstacktelemetry.api.config.GrpcTelemetryConfig; +import org.onosproject.openstacktelemetry.api.config.TelemetryConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * gRPC telemetry manager. + */ +@Component(immediate = true) +@Service +public class GrpcTelemetryManager implements GrpcTelemetryAdminService { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private ManagedChannel channel = null; + + @Activate + protected void activate() { + log.info("Started"); + } + + @Deactivate + protected void deactivate() { + stop(); + log.info("Stopped"); + } + + @Override + public void start(TelemetryConfig config) { + if (channel != null) { + log.info("gRPC producer has already been started"); + return; + } + + GrpcTelemetryConfig grpcConfig = (GrpcTelemetryConfig) config; + channel = ManagedChannelBuilder + .forAddress(grpcConfig.address(), grpcConfig.port()) + .maxInboundMessageSize(grpcConfig.maxInboundMsgSize()) + .usePlaintext(grpcConfig.usePlaintext()) + .build(); + + log.info("gRPC producer has Started"); + } + + @Override + public void stop() { + if (channel != null) { + channel.shutdown(); + channel = null; + } + + log.info("gRPC producer has Stopped"); + } + + @Override + public void restart(TelemetryConfig config) { + stop(); + start(config); + } + + @Override + public Object publish(Object record) { + // TODO: need to find a way to invoke gRPC endpoint using channel + return null; + } +} diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java new file mode 100644 index 0000000000..b79d7037ee --- /dev/null +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java @@ -0,0 +1,95 @@ +/* + * Copyright 2018-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.openstacktelemetry.impl; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService; +import org.onosproject.openstacktelemetry.api.InfluxRecord; +import org.onosproject.openstacktelemetry.api.config.InfluxDbTelemetryConfig; +import org.onosproject.openstacktelemetry.api.config.TelemetryConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * InfluxDB telemetry manager. + */ +@Component(immediate = true) +@Service +public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private static final String PROTOCOL = "http"; + private InfluxDB producer = null; + + @Activate + protected void activate() { + log.info("Started"); + } + + @Deactivate + protected void deactivate() { + stop(); + log.info("Stopped"); + } + + @Override + public void start(TelemetryConfig config) { + if (producer != null) { + log.info("InfluxDB producer has already been started"); + return; + } + + InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config; + + StringBuilder influxDbServerBuilder = new StringBuilder(); + influxDbServerBuilder.append(PROTOCOL); + influxDbServerBuilder.append(":"); + influxDbServerBuilder.append("//"); + influxDbServerBuilder.append(influxDbConfig.address()); + influxDbServerBuilder.append(":"); + influxDbServerBuilder.append(influxDbConfig.port()); + + producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(), + influxDbConfig.username(), influxDbConfig.password()); + log.info("InfluxDB producer has Started"); + } + + @Override + public void stop() { + if (producer != null) { + producer = null; + } + + log.info("Kafka producer has Stopped"); + } + + @Override + public void restart(TelemetryConfig config) { + stop(); + start(config); + } + + @Override + public void publish(InfluxRecord record) { + // TODO: need to find a way to invoke InfluxDB endpoint using producer + } +} diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java new file mode 100644 index 0000000000..e935111fcf --- /dev/null +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java @@ -0,0 +1,115 @@ +/* + * Copyright 2018-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.openstacktelemetry.impl; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService; +import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig; +import org.onosproject.openstacktelemetry.api.config.TelemetryConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; +import java.util.concurrent.Future; + +/** + * Kafka telemetry manager. + */ +@Component(immediate = true) +@Service +public class KafkaTelemetryManager implements KafkaTelemetryAdminService { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; + private static final String RETRIES = "retries"; + private static final String ACKS = "acks"; + private static final String BATCH_SIZE = "batch.size"; + private static final String LINGER_MS = "linger.ms"; + private static final String MEMORY_BUFFER = "buffer.memory"; + private static final String KEY_SERIALIZER = "key.serializer"; + private static final String VALUE_SERIALIZER = "value.serializer"; + + private Producer producer = null; + + @Activate + protected void activate() { + log.info("Started"); + } + + @Deactivate + protected void deactivate() { + stop(); + log.info("Stopped"); + } + + @Override + public void start(TelemetryConfig config) { + if (producer != null) { + log.info("Kafka producer has already been started"); + return; + } + + KafkaTelemetryConfig kafkaConfig = (KafkaTelemetryConfig) config; + + StringBuilder kafkaServerBuilder = new StringBuilder(); + kafkaServerBuilder.append(kafkaConfig.address()); + kafkaServerBuilder.append(":"); + kafkaServerBuilder.append(kafkaConfig.port()); + + // Configure Kafka server properties + Properties prop = new Properties(); + prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString()); + prop.put(RETRIES, kafkaConfig.retries()); + prop.put(ACKS, kafkaConfig.requiredAcks()); + prop.put(BATCH_SIZE, kafkaConfig.batchSize()); + prop.put(LINGER_MS, kafkaConfig.lingerMs()); + prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer()); + prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer()); + prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer()); + + producer = new KafkaProducer<>(prop); + log.info("Kafka producer has Started"); + } + + @Override + public void stop() { + if (producer != null) { + producer.close(); + producer = null; + } + + log.info("Kafka producer has Stopped"); + } + + @Override + public void restart(TelemetryConfig config) { + stop(); + start(config); + } + + @Override + public Future publish(ProducerRecord record) { + return producer.send(record); + } +} diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java new file mode 100644 index 0000000000..6f5710b409 --- /dev/null +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java @@ -0,0 +1,132 @@ +/* + * Copyright 2018-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.openstacktelemetry.impl; + +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Service; +import org.onosproject.openstacktelemetry.api.RestTelemetryAdminService; +import org.onosproject.openstacktelemetry.api.config.RestTelemetryConfig; +import org.onosproject.openstacktelemetry.api.config.TelemetryConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; + +/** + * REST telemetry manager. + */ +@Component(immediate = true) +@Service +public class RestTelemetryManager implements RestTelemetryAdminService { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private static final String PROTOCOL = "http"; + private static final String POST_METHOD = "POST"; + private static final String GET_METHOD = "GET"; + + private WebTarget target = null; + private RestTelemetryConfig restConfig = null; + + @Activate + protected void activate() { + log.info("Started"); + } + + @Deactivate + protected void deactivate() { + stop(); + log.info("Stopped"); + } + + @Override + public void start(TelemetryConfig config) { + if (target != null) { + log.info("REST producer has already been started"); + return; + } + + restConfig = (RestTelemetryConfig) config; + + StringBuilder restServerBuilder = new StringBuilder(); + restServerBuilder.append(PROTOCOL); + restServerBuilder.append(":"); + restServerBuilder.append("//"); + restServerBuilder.append(restConfig.address()); + restServerBuilder.append(":"); + restServerBuilder.append(restConfig.port()); + restServerBuilder.append("/"); + + Client client = ClientBuilder.newBuilder().build(); + + target = client.target(restServerBuilder.toString()).path(restConfig.endpoint()); + + log.info("REST producer has Started"); + } + + @Override + public void stop() { + if (target != null) { + target = null; + } + + log.info("REST producer has Stopped"); + } + + @Override + public void restart(TelemetryConfig config) { + stop(); + start(config); + } + + @Override + public Response publish(String endpoint, String method, String record) { + // TODO: need to find a way to invoke REST endpoint using target + return null; + } + + @Override + public Response publish(String method, String record) { + switch (method) { + case POST_METHOD: + return target.request(restConfig.requestMediaType()) + .post(Entity.json(record)); + case GET_METHOD: + return target.request(restConfig.requestMediaType()).get(); + default: + return null; + } + } + + @Override + public Response publish(String record) { + switch (restConfig.method()) { + case POST_METHOD: + return target.request(restConfig.requestMediaType()) + .post(Entity.json(record)); + case GET_METHOD: + return target.request(restConfig.requestMediaType()).get(); + default: + return null; + } + } +}