From 4d1c9d1bc813cc97408d4525755ea2672c95fa2c Mon Sep 17 00:00:00 2001 From: Boyoung Jeong Date: Fri, 20 Jul 2018 17:09:20 +0900 Subject: [PATCH] Publish a statistics record to InfluxDB Change-Id: I046207ab16b91c5ff65ae6df9e5929b9979907e1 --- .../openstacktelemetry/api/Constants.java | 1 + .../api/InfluxDbTelemetryService.java | 4 +- .../openstacktelemetry/api/InfluxRecord.java | 14 +++ .../api/config/InfluxDbTelemetryConfig.java | 15 +++ .../DefaultInfluxDbTelemetryConfig.java | 25 ++++- .../impl/DefaultInfluxRecord.java | 75 +++++++++++++ .../impl/DefaultStatsFlowRule.java | 2 +- .../impl/InfluxDbTelemetryConfigManager.java | 11 ++ .../impl/InfluxDbTelemetryManager.java | 103 +++++++++++++++++- .../impl/OpenstackTelemetryManager.java | 6 +- .../DefaultInfluxDbTelemetryConfigTest.java | 7 ++ apps/openstacktelemetry/pom.xml | 2 +- 12 files changed, 252 insertions(+), 13 deletions(-) create mode 100644 apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultInfluxRecord.java diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java index 20a4afa378..9621467d87 100644 --- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java +++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java @@ -39,6 +39,7 @@ public final class Constants { public static final String DEFAULT_INFLUXDB_USERNAME = "onos"; public static final String DEFAULT_INFLUXDB_PASSWORD = "onos"; public static final String DEFAULT_INFLUXDB_DATABASE = "onos"; + public static final String DEFAULT_INFLUXDB_MEASUREMENT = "sonaflow"; public static final boolean DEFAULT_INFLUXDB_ENABLE_BATCH = true; // default configuration variables for Kafka diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java index 1cfd8310b3..4a1bd5c9b9 100644 --- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java +++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java @@ -15,6 +15,8 @@ */ package org.onosproject.openstacktelemetry.api; +import java.util.Set; + /** * Service API for publishing openstack telemetry through InfluxDB producer. */ @@ -25,5 +27,5 @@ public interface InfluxDbTelemetryService extends TelemetryService { * * @param record a network metric to be published */ - void publish(InfluxRecord record); + void publish(InfluxRecord> record); } diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxRecord.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxRecord.java index 57f2285b59..241e6e1b83 100644 --- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxRecord.java +++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxRecord.java @@ -20,4 +20,18 @@ package org.onosproject.openstacktelemetry.api; */ public interface InfluxRecord { + /** + * Gets measurement name in InfluxDB. + * + * @return measurement name + */ + K measurement(); + + /** + * Gets flow information and its statistics data. + * + * @return flow information and its statistics data + */ + V flowInfos(); + } diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java index de8a36f17b..32c150f38b 100644 --- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java +++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java @@ -57,6 +57,13 @@ public interface InfluxDbTelemetryConfig extends TelemetryConfig { */ String database(); + /** + * Obtains InfluxDB measurement name. + * + * @return InfluxDB measurement name + */ + String measurement(); + /** * Obtains InfluxDB enable batch flag. * @@ -108,6 +115,14 @@ public interface InfluxDbTelemetryConfig extends TelemetryConfig { */ Builder withPassword(String password); + /** + * Sets InfluxDB measurement. + * + * @param measurement InfluxDB measurement + * @return builder instance + */ + Builder withMeasurement(String measurement); + /** * Sets InfluxDB database. * diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java index 461fee5143..c63f2bcf22 100644 --- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java @@ -36,18 +36,21 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo private final String username; private final String password; private final String database; + private final String measurement; private final boolean enableBatch; private final Map configMap; private DefaultInfluxDbTelemetryConfig(String address, int port, String username, String password, - String database, boolean enableBatch, + String database, String measurement, + boolean enableBatch, Map configMap) { this.address = address; this.port = port; this.username = username; this.password = password; this.database = database; + this.measurement = measurement; this.enableBatch = enableBatch; this.configMap = configMap; } @@ -77,6 +80,11 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo return database; } + @Override + public String measurement() { + return measurement; + } + @Override public boolean enableBatch() { return enableBatch; @@ -104,6 +112,7 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo Objects.equals(this.username, other.username) && Objects.equals(this.password, other.password) && Objects.equals(this.database, other.database) && + Objects.equals(this.measurement, other.measurement) && Objects.equals(this.enableBatch, other.enableBatch) && Objects.equals(this.configMap, other.configMap); } @@ -112,7 +121,8 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo @Override public int hashCode() { - return Objects.hash(address, port, username, password, database, enableBatch, configMap); + return Objects.hash(address, port, username, password, database, + measurement, enableBatch, configMap); } @Override @@ -123,6 +133,7 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo .add("username", username) .add("password", password) .add("database", database) + .add("measurement", measurement) .add("enableBatch", enableBatch) .add("configMap", configMap) .toString(); @@ -143,6 +154,7 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo private String username; private String password; private String database; + private String measurement; private boolean enableBatch; private Map configMap; @@ -176,6 +188,12 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo return this; } + @Override + public Builder withMeasurement(String measurement) { + this.measurement = measurement; + return this; + } + @Override public Builder withEnableBatch(boolean enableBatch) { this.enableBatch = enableBatch; @@ -194,9 +212,10 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo checkNotNull(username, "InfluxDB server username cannot be null"); checkNotNull(password, "InfluxDB server password cannot be null"); checkNotNull(database, "InfluxDB server database cannot be null"); + checkNotNull(measurement, "InfluxDB server measurement cannot be null"); return new DefaultInfluxDbTelemetryConfig(address, port, username, - password, database, enableBatch, configMap); + password, database, measurement, enableBatch, configMap); } } } diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultInfluxRecord.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultInfluxRecord.java new file mode 100644 index 0000000000..8c45a60262 --- /dev/null +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultInfluxRecord.java @@ -0,0 +1,75 @@ +/* + * Copyright 2018-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.openstacktelemetry.impl; + +import org.onosproject.openstacktelemetry.api.InfluxRecord; + +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT; + + +public final class DefaultInfluxRecord implements InfluxRecord { + public static final String MEASUREMENT_NAME = DEFAULT_INFLUXDB_MEASUREMENT; + private final K measurement; + private final V flowInfos; + + protected DefaultInfluxRecord(K measurement, V flowInfos) { + if ((measurement == null) || (measurement.equals(""))) { + this.measurement = (K) MEASUREMENT_NAME; + } else { + this.measurement = measurement; + } + this.flowInfos = flowInfos; + } + + @Override + public K measurement() { + return measurement; + } + + @Override + public V flowInfos() { + return flowInfos; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj instanceof DefaultInfluxRecord) { + final DefaultInfluxRecord other = (DefaultInfluxRecord) obj; + return Objects.equals(this.measurement, other.measurement) && + Objects.equals(this.flowInfos, other.flowInfos); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(measurement, flowInfos); + } + + public String toString() { + return toStringHelper(this) + .add("measurement", measurement) + .add("flowInfos", flowInfos) + .toString(); + } +} diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java index 6328e64a03..83c7519e70 100644 --- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java @@ -26,7 +26,7 @@ public final class DefaultStatsFlowRule implements StatsFlowRule { private final IpPrefix srcIpPrefix; private final IpPrefix dstIpPrefix; private final byte ipProtocol; - private final TpPort srcTpPort; + private final TpPort srcTpPort; private final TpPort dstTpPort; private static final String NOT_NULL_MSG = "Element % cannot be null"; diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java index 069c374881..4ce3e528b7 100644 --- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java @@ -38,6 +38,7 @@ import java.util.Dictionary; import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_DISABLE; import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_DATABASE; import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_ENABLE_BATCH; +import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT; import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_PASSWORD; import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_SERVER_IP; import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_SERVER_PORT; @@ -59,6 +60,7 @@ public class InfluxDbTelemetryConfigManager implements InfluxDbTelemetryConfigSe private static final String USERNAME = "username"; private static final String PASSWORD = "password"; private static final String DATABASE = "database"; + private static final String MEASUREMENT = "measurement"; private static final String ENABLE_BATCH = "enableBatch"; @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) @@ -87,6 +89,10 @@ public class InfluxDbTelemetryConfigManager implements InfluxDbTelemetryConfigSe label = "Database of InfluxDB server") protected String database = DEFAULT_INFLUXDB_DATABASE; + @Property(name = MEASUREMENT, value = DEFAULT_INFLUXDB_MEASUREMENT, + label = "Measurement of InfluxDB server") + protected String measurement = DEFAULT_INFLUXDB_MEASUREMENT; + @Property(name = ENABLE_BATCH, boolValue = DEFAULT_INFLUXDB_ENABLE_BATCH, label = "Flag value of enabling batch mode of InfluxDB server") protected Boolean enableBatch = DEFAULT_INFLUXDB_ENABLE_BATCH; @@ -141,6 +147,7 @@ public class InfluxDbTelemetryConfigManager implements InfluxDbTelemetryConfigSe .withUsername(username) .withPassword(password) .withDatabase(database) + .withMeasurement(measurement) .withEnableBatch(enableBatch) .build(); } @@ -178,6 +185,10 @@ public class InfluxDbTelemetryConfigManager implements InfluxDbTelemetryConfigSe database = databaseStr != null ? databaseStr : DEFAULT_INFLUXDB_DATABASE; log.info("Configured. InfluxDB server database is {}", database); + String measurementStr = Tools.get(properties, MEASUREMENT); + measurement = measurementStr != null ? measurementStr : DEFAULT_INFLUXDB_MEASUREMENT; + log.info("Configured. InfluxDB server measurement is {}", measurement); + Boolean enableBatchConfigured = getBooleanProperty(properties, ENABLE_BATCH); if (enableBatchConfigured == null) { enableBatch = DEFAULT_INFLUXDB_ENABLE_BATCH; diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java index 7e8a1022f9..19acaaa9e9 100644 --- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java @@ -23,6 +23,10 @@ import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.onlab.packet.TpPort; +import org.onosproject.openstacktelemetry.api.FlowInfo; import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService; import org.onosproject.openstacktelemetry.api.InfluxRecord; import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService; @@ -31,6 +35,8 @@ import org.onosproject.openstacktelemetry.api.config.TelemetryConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Set; + /** * InfluxDB telemetry manager. */ @@ -40,11 +46,38 @@ public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService { private final Logger log = LoggerFactory.getLogger(getClass()); + private static final String FLOW_TYPE = "flowType"; + private static final String DEVICE_ID = "deviceId"; + private static final String INPUT_INTERFACE_ID = "inputInterfaceId"; + private static final String OUTPUT_INTERFACE_ID = "outputInterfaceId"; + + private static final String VLAN_ID = "vlanId"; + private static final String VXLAN_ID = "vxlanId"; + private static final String SRC_IP = "srcIp"; + private static final String DST_IP = "dstIp"; + private static final String SRC_PORT = "srcPort"; + private static final String DST_PORT = "dstPort"; + private static final String PROTOCOL = "protocol"; + private static final String SRC_MAC = "srcMac"; + private static final String DST_MAC = "dstMac"; + + private static final String STARTUP_TIME = "startupTime"; + private static final String FST_PKT_ARR_TIME = "fstPktArrTime"; + private static final String LST_PKT_OFFSET = "lstPktOffset"; + private static final String PREV_ACC_BYTES = "prevAccBytes"; + private static final String PREV_ACC_PKTS = "prevAccPkts"; + private static final String CURR_ACC_BYTES = "currAccBytes"; + private static final String CURR_ACC_PKTS = "currAccPkts"; + private static final String ERROR_PKTS = "errorPkts"; + private static final String DROP_PKTS = "dropPkts"; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected OpenstackTelemetryService openstackTelemetryService; - private static final String PROTOCOL = "http"; + private static final String INFLUX_PROTOCOL = "http"; private InfluxDB producer = null; + private String database = null; + private String measurement = null; @Activate protected void activate() { @@ -73,7 +106,7 @@ public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService { InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config; StringBuilder influxDbServerBuilder = new StringBuilder(); - influxDbServerBuilder.append(PROTOCOL); + influxDbServerBuilder.append(INFLUX_PROTOCOL); influxDbServerBuilder.append(":"); influxDbServerBuilder.append("//"); influxDbServerBuilder.append(influxDbConfig.address()); @@ -82,12 +115,18 @@ public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService { producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(), influxDbConfig.username(), influxDbConfig.password()); + database = influxDbConfig.database(); + measurement = influxDbConfig.measurement(); + log.info("InfluxDB producer has Started"); + + createDB(); } @Override public void stop() { if (producer != null) { + producer.close(); producer = null; } @@ -101,16 +140,70 @@ public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService { } @Override - public void publish(InfluxRecord record) { - // TODO: need to find a way to invoke InfluxDB endpoint using producer - + public void publish(InfluxRecord> record) { if (producer == null) { log.warn("InfluxDB telemetry service has not been enabled!"); + return; } + + if (record.flowInfos().size() == 0) { + log.warn("No record to publish"); + return; + } + + log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size()); + + BatchPoints batchPoints = BatchPoints.database(database).build(); + + for (FlowInfo flowInfo: record.flowInfos()) { + Point point = Point + .measurement((measurement == null) ? record.measurement() : measurement) + .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType())) + .tag(DEVICE_ID, flowInfo.deviceId().toString()) + .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId())) + .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId())) + .tag(VLAN_ID, flowInfo.vlanId().toString()) + .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId())) + .tag(SRC_IP, flowInfo.srcIp().toString()) + .tag(DST_IP, flowInfo.dstIp().toString()) + .tag(SRC_PORT, getTpPort(flowInfo.srcPort())) + .tag(DST_PORT, getTpPort(flowInfo.dstPort())) + .tag(PROTOCOL, String.valueOf(flowInfo.protocol())) + .tag(SRC_MAC, flowInfo.srcMac().toString()) + .tag(DST_MAC, flowInfo.dstMac().toString()) + .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime()) + .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime()) + .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset()) + .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes()) + .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts()) + .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes()) + .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts()) + .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts()) + .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts()) + .build(); + batchPoints.point(point); + } + producer.write(batchPoints); } @Override public boolean isRunning() { return producer != null; } + + private void createDB() { + if (producer.databaseExists(database)) { + log.debug("Database {} is already created", database); + } else { + producer.createDatabase(database); + log.debug("Database {} is created", database); + } + } + + private String getTpPort(TpPort tpPort) { + if (tpPort == null) { + return ""; + } + return tpPort.toString(); + } } diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java index f3f386f5bb..e19a579568 100644 --- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java @@ -36,6 +36,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Set; +import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT; import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_KEY; import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_TOPIC; @@ -73,7 +74,6 @@ public class OpenstackTelemetryManager implements OpenstackTelemetryService { @Override public void publish(Set flowInfos) { telemetryServices.forEach(service -> { - if (service instanceof GrpcTelemetryManager) { invokeGrpcPublisher((GrpcTelemetryService) service, flowInfos); } @@ -99,7 +99,9 @@ public class OpenstackTelemetryManager implements OpenstackTelemetryService { } private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, Set flowInfos) { - // TODO: need provide implementation + DefaultInfluxRecord> influxRecord + = new DefaultInfluxRecord<>(DEFAULT_INFLUXDB_MEASUREMENT, flowInfos); + service.publish(influxRecord); } private void invokeKafkaPublisher(KafkaTelemetryService service, Set flowInfos) { diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java index 3c95f6d1d7..4af5b4ab8b 100644 --- a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java +++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java @@ -40,6 +40,9 @@ public final class DefaultInfluxDbTelemetryConfigTest { private static final String DATABASE_1 = "database1"; private static final String DATABASE_2 = "database2"; + private static final String MEASUREMENT_1 = "measurement1"; + private static final String MEASUREMENT_2 = "measurement2"; + private static final String USERNAME_1 = "username1"; private static final String USERNAME_2 = "username2"; @@ -72,6 +75,7 @@ public final class DefaultInfluxDbTelemetryConfigTest { .withAddress(IP_ADDRESS_1) .withPort(PORT_1) .withDatabase(DATABASE_1) + .withMeasurement(MEASUREMENT_1) .withUsername(USERNAME_1) .withPassword(PASSWORD_1) .withEnableBatch(ENABLE_BATCH_1) @@ -82,6 +86,7 @@ public final class DefaultInfluxDbTelemetryConfigTest { .withAddress(IP_ADDRESS_1) .withPort(PORT_1) .withDatabase(DATABASE_1) + .withMeasurement(MEASUREMENT_1) .withUsername(USERNAME_1) .withPassword(PASSWORD_1) .withEnableBatch(ENABLE_BATCH_1) @@ -92,6 +97,7 @@ public final class DefaultInfluxDbTelemetryConfigTest { .withAddress(IP_ADDRESS_2) .withPort(PORT_2) .withDatabase(DATABASE_2) + .withMeasurement(MEASUREMENT_2) .withUsername(USERNAME_2) .withPassword(PASSWORD_2) .withEnableBatch(ENABLE_BATCH_2) @@ -113,6 +119,7 @@ public final class DefaultInfluxDbTelemetryConfigTest { assertThat(config.address(), is(IP_ADDRESS_1)); assertThat(config.port(), is(PORT_1)); assertThat(config.database(), is(DATABASE_1)); + assertThat(config.measurement(), is(MEASUREMENT_1)); assertThat(config.username(), is(USERNAME_1)); assertThat(config.password(), is(PASSWORD_1)); assertThat(config.enableBatch(), is(ENABLE_BATCH_1)); diff --git a/apps/openstacktelemetry/pom.xml b/apps/openstacktelemetry/pom.xml index 0bf85906cc..4d40139487 100644 --- a/apps/openstacktelemetry/pom.xml +++ b/apps/openstacktelemetry/pom.xml @@ -31,7 +31,7 @@ SONA Openstack Telemetry Application - 2.2 + 2.9 0.8.2.2_1 3.2.0 1.3.1