Publish a statistics record to InfluxDB

Change-Id: I046207ab16b91c5ff65ae6df9e5929b9979907e1
This commit is contained in:
Boyoung Jeong 2018-07-20 17:09:20 +09:00 committed by Jian Li
parent 8f64feb640
commit 4d1c9d1bc8
12 changed files with 252 additions and 13 deletions

View File

@ -39,6 +39,7 @@ public final class Constants {
public static final String DEFAULT_INFLUXDB_USERNAME = "onos";
public static final String DEFAULT_INFLUXDB_PASSWORD = "onos";
public static final String DEFAULT_INFLUXDB_DATABASE = "onos";
public static final String DEFAULT_INFLUXDB_MEASUREMENT = "sonaflow";
public static final boolean DEFAULT_INFLUXDB_ENABLE_BATCH = true;
// default configuration variables for Kafka

View File

@ -15,6 +15,8 @@
*/
package org.onosproject.openstacktelemetry.api;
import java.util.Set;
/**
* Service API for publishing openstack telemetry through InfluxDB producer.
*/
@ -25,5 +27,5 @@ public interface InfluxDbTelemetryService extends TelemetryService {
*
* @param record a network metric to be published
*/
void publish(InfluxRecord<String, Object> record);
void publish(InfluxRecord<String, Set<FlowInfo>> record);
}

View File

@ -20,4 +20,18 @@ package org.onosproject.openstacktelemetry.api;
*/
public interface InfluxRecord<K, V> {
/**
* Gets measurement name in InfluxDB.
*
* @return measurement name
*/
K measurement();
/**
* Gets flow information and its statistics data.
*
* @return flow information and its statistics data
*/
V flowInfos();
}

View File

@ -57,6 +57,13 @@ public interface InfluxDbTelemetryConfig extends TelemetryConfig {
*/
String database();
/**
* Obtains InfluxDB measurement name.
*
* @return InfluxDB measurement name
*/
String measurement();
/**
* Obtains InfluxDB enable batch flag.
*
@ -108,6 +115,14 @@ public interface InfluxDbTelemetryConfig extends TelemetryConfig {
*/
Builder withPassword(String password);
/**
* Sets InfluxDB measurement.
*
* @param measurement InfluxDB measurement
* @return builder instance
*/
Builder withMeasurement(String measurement);
/**
* Sets InfluxDB database.
*

View File

@ -36,18 +36,21 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo
private final String username;
private final String password;
private final String database;
private final String measurement;
private final boolean enableBatch;
private final Map<String, Object> configMap;
private DefaultInfluxDbTelemetryConfig(String address, int port,
String username, String password,
String database, boolean enableBatch,
String database, String measurement,
boolean enableBatch,
Map<String, Object> configMap) {
this.address = address;
this.port = port;
this.username = username;
this.password = password;
this.database = database;
this.measurement = measurement;
this.enableBatch = enableBatch;
this.configMap = configMap;
}
@ -77,6 +80,11 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo
return database;
}
@Override
public String measurement() {
return measurement;
}
@Override
public boolean enableBatch() {
return enableBatch;
@ -104,6 +112,7 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo
Objects.equals(this.username, other.username) &&
Objects.equals(this.password, other.password) &&
Objects.equals(this.database, other.database) &&
Objects.equals(this.measurement, other.measurement) &&
Objects.equals(this.enableBatch, other.enableBatch) &&
Objects.equals(this.configMap, other.configMap);
}
@ -112,7 +121,8 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo
@Override
public int hashCode() {
return Objects.hash(address, port, username, password, database, enableBatch, configMap);
return Objects.hash(address, port, username, password, database,
measurement, enableBatch, configMap);
}
@Override
@ -123,6 +133,7 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo
.add("username", username)
.add("password", password)
.add("database", database)
.add("measurement", measurement)
.add("enableBatch", enableBatch)
.add("configMap", configMap)
.toString();
@ -143,6 +154,7 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo
private String username;
private String password;
private String database;
private String measurement;
private boolean enableBatch;
private Map<String, Object> configMap;
@ -176,6 +188,12 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo
return this;
}
@Override
public Builder withMeasurement(String measurement) {
this.measurement = measurement;
return this;
}
@Override
public Builder withEnableBatch(boolean enableBatch) {
this.enableBatch = enableBatch;
@ -194,9 +212,10 @@ public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryCo
checkNotNull(username, "InfluxDB server username cannot be null");
checkNotNull(password, "InfluxDB server password cannot be null");
checkNotNull(database, "InfluxDB server database cannot be null");
checkNotNull(measurement, "InfluxDB server measurement cannot be null");
return new DefaultInfluxDbTelemetryConfig(address, port, username,
password, database, enableBatch, configMap);
password, database, measurement, enableBatch, configMap);
}
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.openstacktelemetry.impl;
import org.onosproject.openstacktelemetry.api.InfluxRecord;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT;
public final class DefaultInfluxRecord<K, V> implements InfluxRecord<K, V> {
public static final String MEASUREMENT_NAME = DEFAULT_INFLUXDB_MEASUREMENT;
private final K measurement;
private final V flowInfos;
protected DefaultInfluxRecord(K measurement, V flowInfos) {
if ((measurement == null) || (measurement.equals(""))) {
this.measurement = (K) MEASUREMENT_NAME;
} else {
this.measurement = measurement;
}
this.flowInfos = flowInfos;
}
@Override
public K measurement() {
return measurement;
}
@Override
public V flowInfos() {
return flowInfos;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof DefaultInfluxRecord) {
final DefaultInfluxRecord other = (DefaultInfluxRecord) obj;
return Objects.equals(this.measurement, other.measurement) &&
Objects.equals(this.flowInfos, other.flowInfos);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(measurement, flowInfos);
}
public String toString() {
return toStringHelper(this)
.add("measurement", measurement)
.add("flowInfos", flowInfos)
.toString();
}
}

View File

@ -26,7 +26,7 @@ public final class DefaultStatsFlowRule implements StatsFlowRule {
private final IpPrefix srcIpPrefix;
private final IpPrefix dstIpPrefix;
private final byte ipProtocol;
private final TpPort srcTpPort;
private final TpPort srcTpPort;
private final TpPort dstTpPort;
private static final String NOT_NULL_MSG = "Element % cannot be null";

View File

@ -38,6 +38,7 @@ import java.util.Dictionary;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_DISABLE;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_DATABASE;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_ENABLE_BATCH;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_PASSWORD;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_SERVER_IP;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_SERVER_PORT;
@ -59,6 +60,7 @@ public class InfluxDbTelemetryConfigManager implements InfluxDbTelemetryConfigSe
private static final String USERNAME = "username";
private static final String PASSWORD = "password";
private static final String DATABASE = "database";
private static final String MEASUREMENT = "measurement";
private static final String ENABLE_BATCH = "enableBatch";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@ -87,6 +89,10 @@ public class InfluxDbTelemetryConfigManager implements InfluxDbTelemetryConfigSe
label = "Database of InfluxDB server")
protected String database = DEFAULT_INFLUXDB_DATABASE;
@Property(name = MEASUREMENT, value = DEFAULT_INFLUXDB_MEASUREMENT,
label = "Measurement of InfluxDB server")
protected String measurement = DEFAULT_INFLUXDB_MEASUREMENT;
@Property(name = ENABLE_BATCH, boolValue = DEFAULT_INFLUXDB_ENABLE_BATCH,
label = "Flag value of enabling batch mode of InfluxDB server")
protected Boolean enableBatch = DEFAULT_INFLUXDB_ENABLE_BATCH;
@ -141,6 +147,7 @@ public class InfluxDbTelemetryConfigManager implements InfluxDbTelemetryConfigSe
.withUsername(username)
.withPassword(password)
.withDatabase(database)
.withMeasurement(measurement)
.withEnableBatch(enableBatch)
.build();
}
@ -178,6 +185,10 @@ public class InfluxDbTelemetryConfigManager implements InfluxDbTelemetryConfigSe
database = databaseStr != null ? databaseStr : DEFAULT_INFLUXDB_DATABASE;
log.info("Configured. InfluxDB server database is {}", database);
String measurementStr = Tools.get(properties, MEASUREMENT);
measurement = measurementStr != null ? measurementStr : DEFAULT_INFLUXDB_MEASUREMENT;
log.info("Configured. InfluxDB server measurement is {}", measurement);
Boolean enableBatchConfigured = getBooleanProperty(properties, ENABLE_BATCH);
if (enableBatchConfigured == null) {
enableBatch = DEFAULT_INFLUXDB_ENABLE_BATCH;

View File

@ -23,6 +23,10 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.onlab.packet.TpPort;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
import org.onosproject.openstacktelemetry.api.InfluxRecord;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
@ -31,6 +35,8 @@ import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
/**
* InfluxDB telemetry manager.
*/
@ -40,11 +46,38 @@ public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final String FLOW_TYPE = "flowType";
private static final String DEVICE_ID = "deviceId";
private static final String INPUT_INTERFACE_ID = "inputInterfaceId";
private static final String OUTPUT_INTERFACE_ID = "outputInterfaceId";
private static final String VLAN_ID = "vlanId";
private static final String VXLAN_ID = "vxlanId";
private static final String SRC_IP = "srcIp";
private static final String DST_IP = "dstIp";
private static final String SRC_PORT = "srcPort";
private static final String DST_PORT = "dstPort";
private static final String PROTOCOL = "protocol";
private static final String SRC_MAC = "srcMac";
private static final String DST_MAC = "dstMac";
private static final String STARTUP_TIME = "startupTime";
private static final String FST_PKT_ARR_TIME = "fstPktArrTime";
private static final String LST_PKT_OFFSET = "lstPktOffset";
private static final String PREV_ACC_BYTES = "prevAccBytes";
private static final String PREV_ACC_PKTS = "prevAccPkts";
private static final String CURR_ACC_BYTES = "currAccBytes";
private static final String CURR_ACC_PKTS = "currAccPkts";
private static final String ERROR_PKTS = "errorPkts";
private static final String DROP_PKTS = "dropPkts";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenstackTelemetryService openstackTelemetryService;
private static final String PROTOCOL = "http";
private static final String INFLUX_PROTOCOL = "http";
private InfluxDB producer = null;
private String database = null;
private String measurement = null;
@Activate
protected void activate() {
@ -73,7 +106,7 @@ public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config;
StringBuilder influxDbServerBuilder = new StringBuilder();
influxDbServerBuilder.append(PROTOCOL);
influxDbServerBuilder.append(INFLUX_PROTOCOL);
influxDbServerBuilder.append(":");
influxDbServerBuilder.append("//");
influxDbServerBuilder.append(influxDbConfig.address());
@ -82,12 +115,18 @@ public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
influxDbConfig.username(), influxDbConfig.password());
database = influxDbConfig.database();
measurement = influxDbConfig.measurement();
log.info("InfluxDB producer has Started");
createDB();
}
@Override
public void stop() {
if (producer != null) {
producer.close();
producer = null;
}
@ -101,16 +140,70 @@ public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
}
@Override
public void publish(InfluxRecord<String, Object> record) {
// TODO: need to find a way to invoke InfluxDB endpoint using producer
public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
if (producer == null) {
log.warn("InfluxDB telemetry service has not been enabled!");
return;
}
if (record.flowInfos().size() == 0) {
log.warn("No record to publish");
return;
}
log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size());
BatchPoints batchPoints = BatchPoints.database(database).build();
for (FlowInfo flowInfo: record.flowInfos()) {
Point point = Point
.measurement((measurement == null) ? record.measurement() : measurement)
.tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
.tag(DEVICE_ID, flowInfo.deviceId().toString())
.tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
.tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
.tag(VLAN_ID, flowInfo.vlanId().toString())
.tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
.tag(SRC_IP, flowInfo.srcIp().toString())
.tag(DST_IP, flowInfo.dstIp().toString())
.tag(SRC_PORT, getTpPort(flowInfo.srcPort()))
.tag(DST_PORT, getTpPort(flowInfo.dstPort()))
.tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
.tag(SRC_MAC, flowInfo.srcMac().toString())
.tag(DST_MAC, flowInfo.dstMac().toString())
.addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
.addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
.addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
.addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
.addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
.addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
.addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
.addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
.addField(DROP_PKTS, flowInfo.statsInfo().dropPkts())
.build();
batchPoints.point(point);
}
producer.write(batchPoints);
}
@Override
public boolean isRunning() {
return producer != null;
}
private void createDB() {
if (producer.databaseExists(database)) {
log.debug("Database {} is already created", database);
} else {
producer.createDatabase(database);
log.debug("Database {} is created", database);
}
}
private String getTpPort(TpPort tpPort) {
if (tpPort == null) {
return "";
}
return tpPort.toString();
}
}

View File

@ -36,6 +36,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT;
import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_KEY;
import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_TOPIC;
@ -73,7 +74,6 @@ public class OpenstackTelemetryManager implements OpenstackTelemetryService {
@Override
public void publish(Set<FlowInfo> flowInfos) {
telemetryServices.forEach(service -> {
if (service instanceof GrpcTelemetryManager) {
invokeGrpcPublisher((GrpcTelemetryService) service, flowInfos);
}
@ -99,7 +99,9 @@ public class OpenstackTelemetryManager implements OpenstackTelemetryService {
}
private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, Set<FlowInfo> flowInfos) {
// TODO: need provide implementation
DefaultInfluxRecord<String, Set<FlowInfo>> influxRecord
= new DefaultInfluxRecord<>(DEFAULT_INFLUXDB_MEASUREMENT, flowInfos);
service.publish(influxRecord);
}
private void invokeKafkaPublisher(KafkaTelemetryService service, Set<FlowInfo> flowInfos) {

View File

@ -40,6 +40,9 @@ public final class DefaultInfluxDbTelemetryConfigTest {
private static final String DATABASE_1 = "database1";
private static final String DATABASE_2 = "database2";
private static final String MEASUREMENT_1 = "measurement1";
private static final String MEASUREMENT_2 = "measurement2";
private static final String USERNAME_1 = "username1";
private static final String USERNAME_2 = "username2";
@ -72,6 +75,7 @@ public final class DefaultInfluxDbTelemetryConfigTest {
.withAddress(IP_ADDRESS_1)
.withPort(PORT_1)
.withDatabase(DATABASE_1)
.withMeasurement(MEASUREMENT_1)
.withUsername(USERNAME_1)
.withPassword(PASSWORD_1)
.withEnableBatch(ENABLE_BATCH_1)
@ -82,6 +86,7 @@ public final class DefaultInfluxDbTelemetryConfigTest {
.withAddress(IP_ADDRESS_1)
.withPort(PORT_1)
.withDatabase(DATABASE_1)
.withMeasurement(MEASUREMENT_1)
.withUsername(USERNAME_1)
.withPassword(PASSWORD_1)
.withEnableBatch(ENABLE_BATCH_1)
@ -92,6 +97,7 @@ public final class DefaultInfluxDbTelemetryConfigTest {
.withAddress(IP_ADDRESS_2)
.withPort(PORT_2)
.withDatabase(DATABASE_2)
.withMeasurement(MEASUREMENT_2)
.withUsername(USERNAME_2)
.withPassword(PASSWORD_2)
.withEnableBatch(ENABLE_BATCH_2)
@ -113,6 +119,7 @@ public final class DefaultInfluxDbTelemetryConfigTest {
assertThat(config.address(), is(IP_ADDRESS_1));
assertThat(config.port(), is(PORT_1));
assertThat(config.database(), is(DATABASE_1));
assertThat(config.measurement(), is(MEASUREMENT_1));
assertThat(config.username(), is(USERNAME_1));
assertThat(config.password(), is(PASSWORD_1));
assertThat(config.enableBatch(), is(ENABLE_BATCH_1));

View File

@ -31,7 +31,7 @@
<description>SONA Openstack Telemetry Application</description>
<properties>
<influxdb-java.version>2.2</influxdb-java.version>
<influxdb-java.version>2.9</influxdb-java.version>
<kafka-client.version>0.8.2.2_1</kafka-client.version>
<protobuf-java.version>3.2.0</protobuf-java.version>
<io-grpc.version>1.3.1</io-grpc.version>