mirror of
https://github.com/opennetworkinglab/onos.git
synced 2025-12-26 11:41:30 +01:00
Fix: check the remote server connectivity before adding to map
1. replace boolean typed enable flag into enumeration status type 2. put the service in pending state, if the activation was failed Change-Id: I15110f3d837d9a3ecf048c2777ec6fae9bf264ff
This commit is contained in:
parent
7fe7eaf528
commit
667c6eb819
@ -41,7 +41,7 @@ public final class DefaultTelemetryConfig implements TelemetryConfig {
|
||||
|
||||
private final String manufacturer;
|
||||
private final String swVersion;
|
||||
private final boolean enabled;
|
||||
private final Status status;
|
||||
|
||||
private final Map<String, String> properties;
|
||||
|
||||
@ -53,20 +53,20 @@ public final class DefaultTelemetryConfig implements TelemetryConfig {
|
||||
* @param parents optional parent configurations
|
||||
* @param manufacturer off-platform application manufacturer
|
||||
* @param swVersion off-platform application software version
|
||||
* @param enabled service enable flag
|
||||
* @param status service status
|
||||
* @param properties properties for telemetry configuration
|
||||
*/
|
||||
public DefaultTelemetryConfig(String name, ConfigType type,
|
||||
List<TelemetryConfig> parents,
|
||||
String manufacturer, String swVersion,
|
||||
boolean enabled, Map<String, String> properties) {
|
||||
Status status, Map<String, String> properties) {
|
||||
this.name = checkNotNull(name, "Name cannot be null");
|
||||
this.type = checkNotNull(type, "type cannot be null");
|
||||
this.parents = parents == null ? ImmutableList.of() : ImmutableList.copyOf(parents);
|
||||
this.manufacturer = checkNotNull(manufacturer, "Manufacturer cannot be null");
|
||||
this.swVersion = checkNotNull(swVersion, "SW version cannot be null");
|
||||
this.properties = copyOf(checkNotNull(properties, "Properties cannot be null"));
|
||||
this.enabled = enabled;
|
||||
this.status = checkNotNull(status, "status cannot be null");
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -99,8 +99,8 @@ public final class DefaultTelemetryConfig implements TelemetryConfig {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean enabled() {
|
||||
return enabled;
|
||||
public Status status() {
|
||||
return status;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -155,20 +155,20 @@ public final class DefaultTelemetryConfig implements TelemetryConfig {
|
||||
|
||||
return new DefaultTelemetryConfig(name, type,
|
||||
!completeParents.isEmpty() ? completeParents : other.parents(),
|
||||
manufacturer, swVersion, enabled, properties.build());
|
||||
manufacturer, swVersion, status, properties.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TelemetryConfig updateProperties(Map<String, String> properties) {
|
||||
|
||||
return new DefaultTelemetryConfig(name, type, parents, manufacturer,
|
||||
swVersion, enabled, properties);
|
||||
swVersion, status, properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TelemetryConfig updateEnabled(boolean enabled) {
|
||||
public TelemetryConfig updateStatus(Status status) {
|
||||
return new DefaultTelemetryConfig(name, type, parents, manufacturer,
|
||||
swVersion, enabled, properties);
|
||||
swVersion, status, properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -189,7 +189,7 @@ public final class DefaultTelemetryConfig implements TelemetryConfig {
|
||||
.add("parents", parents)
|
||||
.add("manufacturer", manufacturer)
|
||||
.add("swVersion", swVersion)
|
||||
.add("enabled", enabled)
|
||||
.add("status", status)
|
||||
.add("properties", properties)
|
||||
.toString();
|
||||
}
|
||||
|
||||
@ -24,8 +24,9 @@ public interface TelemetryAdminService extends TelemetryService {
|
||||
* Prepares and launches the telemetry producer.
|
||||
*
|
||||
* @param name telemetry service name
|
||||
* @return true if the service is successfully started, false otherwise
|
||||
*/
|
||||
void start(String name);
|
||||
boolean start(String name);
|
||||
|
||||
/**
|
||||
* Terminates the telemetry producer.
|
||||
@ -38,8 +39,9 @@ public interface TelemetryAdminService extends TelemetryService {
|
||||
* Restarts the telemetry producer.
|
||||
*
|
||||
* @param name telemetry service name
|
||||
* @return true if the service is successfully restarted, false otherwise
|
||||
*/
|
||||
void restart(String name);
|
||||
boolean restart(String name);
|
||||
|
||||
/**
|
||||
* Launches all telemetry services.
|
||||
|
||||
@ -47,7 +47,11 @@ public class TelemetryConfigEvent
|
||||
/**
|
||||
* Signifies that a telemetry service is disabled.
|
||||
*/
|
||||
SERVICE_DISABLED
|
||||
SERVICE_DISABLED,
|
||||
/**
|
||||
* Signifies that a telemetry service in a pending status due to previous error.
|
||||
*/
|
||||
SERVICE_PENDING
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -61,6 +61,28 @@ public interface TelemetryConfig extends Annotations {
|
||||
UNKNOWN
|
||||
}
|
||||
|
||||
enum Status {
|
||||
/**
|
||||
* Signifies that the service is in enable status.
|
||||
*/
|
||||
ENABLED,
|
||||
|
||||
/**
|
||||
* Signifies that the service is in disable status.
|
||||
*/
|
||||
DISABLED,
|
||||
|
||||
/**
|
||||
* Signifies that the service is in pending status.
|
||||
*/
|
||||
PENDING,
|
||||
|
||||
/**
|
||||
* Signifies that the service is in unknown status.
|
||||
*/
|
||||
UNKNOWN,
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the telemetry configuration name.
|
||||
*
|
||||
@ -98,11 +120,11 @@ public interface TelemetryConfig extends Annotations {
|
||||
String swVersion();
|
||||
|
||||
/**
|
||||
* Returns the service enable flag.
|
||||
* Returns the service status.
|
||||
*
|
||||
* @return enable flag
|
||||
* @return service status
|
||||
*/
|
||||
boolean enabled();
|
||||
Status status();
|
||||
|
||||
/**
|
||||
* Returns the set of annotations as map of key/value properties.
|
||||
@ -153,10 +175,10 @@ public interface TelemetryConfig extends Annotations {
|
||||
TelemetryConfig updateProperties(Map<String, String> properties);
|
||||
|
||||
/**
|
||||
* Obtains the cloned instance with updated enabled value.
|
||||
* Obtains the cloned instance with updated status.
|
||||
*
|
||||
* @param enabled service flag
|
||||
* @param status service status
|
||||
* @return a cloned instance
|
||||
*/
|
||||
TelemetryConfig updateEnabled(boolean enabled);
|
||||
TelemetryConfig updateStatus(Status status);
|
||||
}
|
||||
|
||||
@ -26,6 +26,8 @@ import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.DISABLED;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
|
||||
|
||||
/**
|
||||
* Unit tests for DefaultTelemetryConfig class.
|
||||
@ -56,8 +58,8 @@ public final class DefaultTelemetryConfigTest {
|
||||
private static final String PROP_2_VALUE_1 = "value21";
|
||||
private static final String PROP_2_VALUE_2 = "value22";
|
||||
|
||||
private static final boolean ENABLED_1 = true;
|
||||
private static final boolean ENABLED_2 = false;
|
||||
private static final TelemetryConfig.Status STATUS_1 = ENABLED;
|
||||
private static final TelemetryConfig.Status STATUS_2 = DISABLED;
|
||||
|
||||
private TelemetryConfig config1;
|
||||
private TelemetryConfig sameAsConfig1;
|
||||
@ -74,11 +76,11 @@ public final class DefaultTelemetryConfigTest {
|
||||
PROP_2.put(PROP_2_KEY_2, PROP_2_VALUE_2);
|
||||
|
||||
config1 = new DefaultTelemetryConfig(NAME_1, TYPE_1, null,
|
||||
MANUFACTURER_1, SW_VERSION_1, ENABLED_1, PROP_1);
|
||||
MANUFACTURER_1, SW_VERSION_1, STATUS_1, PROP_1);
|
||||
sameAsConfig1 = new DefaultTelemetryConfig(NAME_1, TYPE_1, null,
|
||||
MANUFACTURER_1, SW_VERSION_1, ENABLED_1, PROP_1);
|
||||
MANUFACTURER_1, SW_VERSION_1, STATUS_1, PROP_1);
|
||||
config2 = new DefaultTelemetryConfig(NAME_2, TYPE_2, null,
|
||||
MANUFACTURER_2, SW_VERSION_2, ENABLED_2, PROP_2);
|
||||
MANUFACTURER_2, SW_VERSION_2, STATUS_2, PROP_2);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -111,6 +113,6 @@ public final class DefaultTelemetryConfigTest {
|
||||
assertEquals(config.manufacturer(), MANUFACTURER_1);
|
||||
assertEquals(config.swVersion(), SW_VERSION_1);
|
||||
assertEquals(config.properties(), PROP_1);
|
||||
assertEquals(config.enabled(), ENABLED_1);
|
||||
assertEquals(config.status(), STATUS_1);
|
||||
}
|
||||
}
|
||||
|
||||
@ -48,7 +48,7 @@ public class TelemetryConfigListCommand extends AbstractShellCommand {
|
||||
for (TelemetryConfig config : configs) {
|
||||
print(FORMAT, config.name(),
|
||||
config.type(),
|
||||
config.enabled() ? "ENABLED" : "DISABLED",
|
||||
config.status().name(),
|
||||
config.manufacturer(),
|
||||
config.swVersion());
|
||||
}
|
||||
|
||||
@ -23,6 +23,9 @@ import org.onosproject.cli.AbstractShellCommand;
|
||||
import org.onosproject.openstacktelemetry.api.TelemetryConfigAdminService;
|
||||
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.DISABLED;
|
||||
|
||||
/**
|
||||
* Disables a telemetry service.
|
||||
*/
|
||||
@ -36,7 +39,9 @@ public class TelemetryServiceDisableCommand extends AbstractShellCommand {
|
||||
@Completion(TelemetryConfigNameCompleter.class)
|
||||
private String configName = null;
|
||||
|
||||
private static final String FORMAT = "Successfully disabled telemetry service %s!";
|
||||
private static final long SLEEP_MS = 2000; // wait 2s for checking status
|
||||
private static final String SUCCESS_FORMAT = "Successfully disabled telemetry service %s!";
|
||||
private static final String FAIL_FORMAT = "Failed to disable telemetry service %s!";
|
||||
private static final String NO_ELEMENT =
|
||||
"No telemetry config is found with the given name";
|
||||
|
||||
@ -50,9 +55,22 @@ public class TelemetryServiceDisableCommand extends AbstractShellCommand {
|
||||
return;
|
||||
}
|
||||
|
||||
TelemetryConfig updatedConfig = config.updateEnabled(false);
|
||||
TelemetryConfig updatedConfig = config.updateStatus(DISABLED);
|
||||
|
||||
service.updateTelemetryConfig(updatedConfig);
|
||||
print(FORMAT, config.name());
|
||||
|
||||
try {
|
||||
sleep(SLEEP_MS);
|
||||
} catch (InterruptedException e) {
|
||||
error("Exception caused during status checking...");
|
||||
}
|
||||
|
||||
TelemetryConfig finalConfig = service.getConfig(configName);
|
||||
|
||||
if (finalConfig.status() == DISABLED) {
|
||||
print(SUCCESS_FORMAT, config.name());
|
||||
} else {
|
||||
print(FAIL_FORMAT, config.name());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -23,6 +23,9 @@ import org.onosproject.cli.AbstractShellCommand;
|
||||
import org.onosproject.openstacktelemetry.api.TelemetryConfigAdminService;
|
||||
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
|
||||
|
||||
/**
|
||||
* Enables a telemetry service.
|
||||
*/
|
||||
@ -36,7 +39,9 @@ public class TelemetryServiceEnableCommand extends AbstractShellCommand {
|
||||
@Completion(TelemetryConfigNameCompleter.class)
|
||||
private String configName = null;
|
||||
|
||||
private static final String FORMAT = "Successfully enabled telemetry service %s!";
|
||||
private static final long SLEEP_MS = 2000; // wait 2s for checking status
|
||||
private static final String SUCCESS_FORMAT = "Successfully enabled telemetry service %s!";
|
||||
private static final String FAIL_FORMAT = "Failed to enable telemetry service %s!";
|
||||
private static final String NO_ELEMENT =
|
||||
"No telemetry config is found with the given name";
|
||||
|
||||
@ -50,9 +55,22 @@ public class TelemetryServiceEnableCommand extends AbstractShellCommand {
|
||||
return;
|
||||
}
|
||||
|
||||
TelemetryConfig updatedConfig = config.updateEnabled(true);
|
||||
TelemetryConfig updatedConfig = config.updateStatus(ENABLED);
|
||||
|
||||
service.updateTelemetryConfig(updatedConfig);
|
||||
print(FORMAT, config.name());
|
||||
|
||||
try {
|
||||
sleep(SLEEP_MS);
|
||||
} catch (InterruptedException e) {
|
||||
error("Exception caused during status checking...");
|
||||
}
|
||||
|
||||
TelemetryConfig finalConfig = service.getConfig(configName);
|
||||
|
||||
if (finalConfig.status() == ENABLED) {
|
||||
print(SUCCESS_FORMAT, config.name());
|
||||
} else {
|
||||
print(FAIL_FORMAT, config.name());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -54,6 +54,10 @@ import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.C
|
||||
import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_UPDATED;
|
||||
import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.SERVICE_DISABLED;
|
||||
import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.SERVICE_ENABLED;
|
||||
import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.SERVICE_PENDING;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.DISABLED;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.PENDING;
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
|
||||
/**
|
||||
@ -76,6 +80,7 @@ public class DistributedTelemetryConfigStore
|
||||
.register(DefaultTelemetryConfigProvider.class)
|
||||
.register(TelemetryConfig.class)
|
||||
.register(TelemetryConfig.ConfigType.class)
|
||||
.register(TelemetryConfig.Status.class)
|
||||
.register(DefaultTelemetryConfig.class)
|
||||
.build();
|
||||
|
||||
@ -210,15 +215,20 @@ public class DistributedTelemetryConfigStore
|
||||
TelemetryConfig oldValue = event.oldValue().value();
|
||||
TelemetryConfig newValue = event.newValue().value();
|
||||
|
||||
if (oldValue.enabled() && !newValue.enabled()) {
|
||||
if (oldValue.status() != DISABLED && newValue.status() == DISABLED) {
|
||||
log.debug("Telemetry service {} has been disabled!", newValue.name());
|
||||
notifyDelegate(new TelemetryConfigEvent(SERVICE_DISABLED, newValue));
|
||||
}
|
||||
|
||||
if (!oldValue.enabled() && newValue.enabled()) {
|
||||
if (oldValue.status() != ENABLED && newValue.status() == ENABLED) {
|
||||
log.debug("Telemetry service {} has been enabled!", newValue.name());
|
||||
notifyDelegate(new TelemetryConfigEvent(SERVICE_ENABLED, newValue));
|
||||
}
|
||||
|
||||
if (oldValue.status() != PENDING && newValue.status() == PENDING) {
|
||||
log.debug("Telemetry service {} was pended!", newValue.name());
|
||||
notifyDelegate(new TelemetryConfigEvent(SERVICE_PENDING, newValue));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -35,7 +35,9 @@ import java.util.Map;
|
||||
|
||||
import static org.onosproject.openstacktelemetry.api.Constants.GRPC_SCHEME;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.GRPC;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig.fromTelemetryConfig;
|
||||
import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.testConnectivity;
|
||||
|
||||
/**
|
||||
* gRPC telemetry manager.
|
||||
@ -71,19 +73,29 @@ public class GrpcTelemetryManager implements GrpcTelemetryAdminService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(String name) {
|
||||
public boolean start(String name) {
|
||||
boolean success = false;
|
||||
TelemetryConfig config = telemetryConfigService.getConfig(name);
|
||||
GrpcTelemetryConfig grpcConfig = fromTelemetryConfig(config);
|
||||
|
||||
if (grpcConfig != null && !config.name().equals(GRPC_SCHEME) && config.enabled()) {
|
||||
if (grpcConfig != null && !config.name().equals(GRPC_SCHEME) &&
|
||||
config.status() == ENABLED) {
|
||||
ManagedChannel channel = ManagedChannelBuilder
|
||||
.forAddress(grpcConfig.address(), grpcConfig.port())
|
||||
.maxInboundMessageSize(grpcConfig.maxInboundMsgSize())
|
||||
.usePlaintext(grpcConfig.usePlaintext())
|
||||
.build();
|
||||
|
||||
channels.put(name, channel);
|
||||
if (testConnectivity(grpcConfig.address(), grpcConfig.port())) {
|
||||
channels.put(name, channel);
|
||||
success = true;
|
||||
} else {
|
||||
log.warn("Unable to connect to {}:{}, " +
|
||||
"please check the connectivity manually",
|
||||
grpcConfig.address(), grpcConfig.port());
|
||||
}
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -97,9 +109,9 @@ public class GrpcTelemetryManager implements GrpcTelemetryAdminService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restart(String name) {
|
||||
public boolean restart(String name) {
|
||||
stop(name);
|
||||
start(name);
|
||||
return start(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -41,7 +41,9 @@ import java.util.Set;
|
||||
|
||||
import static org.onosproject.openstacktelemetry.api.Constants.INFLUXDB_SCHEME;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.INFLUXDB;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.fromTelemetryConfig;
|
||||
import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.testConnectivity;
|
||||
|
||||
/**
|
||||
* InfluxDB telemetry manager.
|
||||
@ -168,29 +170,14 @@ public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
|
||||
return !producers.isEmpty();
|
||||
}
|
||||
|
||||
private void createDB(InfluxDB producer, String database) {
|
||||
if (producer.databaseExists(database)) {
|
||||
log.debug("Database {} is already created", database);
|
||||
} else {
|
||||
producer.createDatabase(database);
|
||||
log.debug("Database {} is created", database);
|
||||
}
|
||||
}
|
||||
|
||||
private String getTpPort(TpPort tpPort) {
|
||||
if (tpPort == null) {
|
||||
return "";
|
||||
}
|
||||
return tpPort.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(String name) {
|
||||
public boolean start(String name) {
|
||||
boolean success = false;
|
||||
TelemetryConfig config = telemetryConfigService.getConfig(name);
|
||||
InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(config);
|
||||
|
||||
if (influxDbConfig != null &&
|
||||
!config.name().equals(INFLUXDB_SCHEME) && config.enabled()) {
|
||||
if (influxDbConfig != null && !config.name().equals(INFLUXDB_SCHEME) &&
|
||||
config.status() == ENABLED) {
|
||||
StringBuilder influxDbServerBuilder = new StringBuilder();
|
||||
influxDbServerBuilder.append(INFLUX_PROTOCOL);
|
||||
influxDbServerBuilder.append(":");
|
||||
@ -199,12 +186,20 @@ public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
|
||||
influxDbServerBuilder.append(":");
|
||||
influxDbServerBuilder.append(influxDbConfig.port());
|
||||
|
||||
InfluxDB producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
|
||||
influxDbConfig.username(), influxDbConfig.password());
|
||||
producers.put(name, producer);
|
||||
|
||||
createDB(producer, influxDbConfig.database());
|
||||
if (testConnectivity(influxDbConfig.address(), influxDbConfig.port())) {
|
||||
InfluxDB producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
|
||||
influxDbConfig.username(), influxDbConfig.password());
|
||||
producers.put(name, producer);
|
||||
createDB(producer, influxDbConfig.database());
|
||||
success = true;
|
||||
} else {
|
||||
log.warn("Unable to connect to {}:{}, " +
|
||||
"please check the connectivity manually",
|
||||
influxDbConfig.address(), influxDbConfig.port());
|
||||
}
|
||||
}
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -218,15 +213,16 @@ public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restart(String name) {
|
||||
public boolean restart(String name) {
|
||||
stop(name);
|
||||
start(name);
|
||||
return start(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startAll() {
|
||||
telemetryConfigService.getConfigsByType(INFLUXDB).forEach(c -> start(c.name()));
|
||||
log.info("InfluxDB producer has Started"); }
|
||||
log.info("InfluxDB producer has Started");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopAll() {
|
||||
@ -242,4 +238,21 @@ public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
|
||||
stopAll();
|
||||
startAll();
|
||||
}
|
||||
|
||||
|
||||
private void createDB(InfluxDB producer, String database) {
|
||||
if (producer.databaseExists(database)) {
|
||||
log.debug("Database {} is already created", database);
|
||||
} else {
|
||||
producer.createDatabase(database);
|
||||
log.debug("Database {} is created", database);
|
||||
}
|
||||
}
|
||||
|
||||
private String getTpPort(TpPort tpPort) {
|
||||
if (tpPort == null) {
|
||||
return "";
|
||||
}
|
||||
return tpPort.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@ -44,7 +44,9 @@ import java.util.concurrent.Future;
|
||||
|
||||
import static org.onosproject.openstacktelemetry.api.Constants.KAFKA_SCHEME;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
|
||||
import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.testConnectivity;
|
||||
|
||||
/**
|
||||
* Kafka telemetry manager.
|
||||
@ -123,12 +125,13 @@ public class KafkaTelemetryManager implements KafkaTelemetryAdminService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(String name) {
|
||||
public boolean start(String name) {
|
||||
boolean success = false;
|
||||
TelemetryConfig config = telemetryConfigService.getConfig(name);
|
||||
KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
|
||||
|
||||
if (kafkaConfig != null &&
|
||||
!config.name().equals(KAFKA_SCHEME) && config.enabled()) {
|
||||
if (kafkaConfig != null && !config.name().equals(KAFKA_SCHEME) &&
|
||||
config.status() == ENABLED) {
|
||||
StringBuilder kafkaServerBuilder = new StringBuilder();
|
||||
kafkaServerBuilder.append(kafkaConfig.address());
|
||||
kafkaServerBuilder.append(":");
|
||||
@ -145,8 +148,17 @@ public class KafkaTelemetryManager implements KafkaTelemetryAdminService {
|
||||
prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
|
||||
prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
|
||||
|
||||
producers.put(name, new KafkaProducer<>(prop));
|
||||
if (testConnectivity(kafkaConfig.address(), kafkaConfig.port())) {
|
||||
producers.put(name, new KafkaProducer<>(prop));
|
||||
success = true;
|
||||
} else {
|
||||
log.warn("Unable to connect to {}:{}, " +
|
||||
"please check the connectivity manually",
|
||||
kafkaConfig.address(), kafkaConfig.port());
|
||||
}
|
||||
}
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -160,9 +172,9 @@ public class KafkaTelemetryManager implements KafkaTelemetryAdminService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restart(String name) {
|
||||
public boolean restart(String name) {
|
||||
stop(name);
|
||||
start(name);
|
||||
return start(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -25,9 +25,9 @@ import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
|
||||
import org.onosproject.openstacktelemetry.api.PrometheusTelemetryService;
|
||||
import org.onosproject.openstacktelemetry.api.RestTelemetryService;
|
||||
import org.onosproject.openstacktelemetry.api.TelemetryAdminService;
|
||||
import org.onosproject.openstacktelemetry.api.TelemetryConfigAdminService;
|
||||
import org.onosproject.openstacktelemetry.api.TelemetryConfigEvent;
|
||||
import org.onosproject.openstacktelemetry.api.TelemetryConfigListener;
|
||||
import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
|
||||
import org.osgi.service.component.annotations.Activate;
|
||||
import org.osgi.service.component.annotations.Component;
|
||||
import org.osgi.service.component.annotations.Deactivate;
|
||||
@ -40,6 +40,7 @@ import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.PENDING;
|
||||
|
||||
/**
|
||||
* Openstack telemetry manager.
|
||||
@ -50,7 +51,7 @@ public class OpenstackTelemetryManager implements OpenstackTelemetryService {
|
||||
private final Logger log = LoggerFactory.getLogger(getClass());
|
||||
|
||||
@Reference(cardinality = ReferenceCardinality.MANDATORY)
|
||||
protected TelemetryConfigService telemetryConfigService;
|
||||
protected TelemetryConfigAdminService telemetryConfigService;
|
||||
|
||||
private List<TelemetryAdminService> telemetryServices = Lists.newArrayList();
|
||||
private InternalTelemetryConfigListener
|
||||
@ -156,7 +157,12 @@ public class OpenstackTelemetryManager implements OpenstackTelemetryService {
|
||||
|
||||
switch (event.type()) {
|
||||
case SERVICE_ENABLED:
|
||||
service.start(event.subject().name());
|
||||
if (!service.start(event.subject().name())) {
|
||||
// we enforce to make the service in PENDING status,
|
||||
// if we encountered a failure during service start
|
||||
telemetryConfigService.updateTelemetryConfig(
|
||||
event.subject().updateStatus(PENDING));
|
||||
}
|
||||
break;
|
||||
case SERVICE_DISABLED:
|
||||
service.stop(event.subject().name());
|
||||
|
||||
@ -43,6 +43,7 @@ import java.util.Set;
|
||||
|
||||
import static org.onosproject.openstacktelemetry.api.Constants.PROMETHEUS_SCHEME;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.PROMETHEUS;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultPrometheusTelemetryConfig.fromTelemetryConfig;
|
||||
|
||||
/**
|
||||
@ -207,55 +208,19 @@ public class PrometheusTelemetryManager implements PrometheusTelemetryAdminServi
|
||||
}
|
||||
}
|
||||
|
||||
private String[] getLabelValues(FlowInfo flowInfo) {
|
||||
String[] labelValues = new String[LABEL_TAGS.length];
|
||||
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(FLOW_TYPE)]
|
||||
= String.valueOf(flowInfo.flowType());
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(DEVICE_ID)]
|
||||
= flowInfo.deviceId().toString();
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(INPUT_INTERFACE_ID)]
|
||||
= String.valueOf(flowInfo.inputInterfaceId());
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(OUTPUT_INTERFACE_ID)]
|
||||
= String.valueOf(flowInfo.outputInterfaceId());
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(VXLAN_ID)]
|
||||
= String.valueOf(flowInfo.vxlanId());
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(SRC_IP)]
|
||||
= flowInfo.srcIp().toString();
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(DST_IP)]
|
||||
= flowInfo.dstIp().toString();
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(SRC_PORT)]
|
||||
= getTpPort(flowInfo.srcPort());
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(DST_PORT)]
|
||||
= getTpPort(flowInfo.dstPort());
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(PROTOCOL)]
|
||||
= String.valueOf(flowInfo.protocol());
|
||||
if (flowInfo.vlanId() != null) {
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(VLAN_ID)]
|
||||
= flowInfo.vlanId().toString();
|
||||
}
|
||||
return labelValues;
|
||||
}
|
||||
|
||||
private String getTpPort(TpPort tpPort) {
|
||||
if (tpPort == null) {
|
||||
return "";
|
||||
}
|
||||
return tpPort.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return !prometheusExporters.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(String name) {
|
||||
public boolean start(String name) {
|
||||
boolean success = false;
|
||||
TelemetryConfig config = telemetryConfigService.getConfig(name);
|
||||
PrometheusTelemetryConfig prometheusConfig = fromTelemetryConfig(config);
|
||||
|
||||
if (prometheusConfig != null &&
|
||||
!config.name().equals(PROMETHEUS_SCHEME) && config.enabled()) {
|
||||
if (prometheusConfig != null && !config.name().equals(PROMETHEUS_SCHEME) &&
|
||||
config.status() == ENABLED) {
|
||||
try {
|
||||
// TODO Offer a 'Authentication'
|
||||
Server prometheusExporter = new Server(prometheusConfig.port());
|
||||
@ -270,10 +235,14 @@ public class PrometheusTelemetryManager implements PrometheusTelemetryAdminServi
|
||||
|
||||
prometheusExporters.put(name, prometheusExporter);
|
||||
|
||||
success = true;
|
||||
|
||||
} catch (Exception ex) {
|
||||
log.warn("Exception: {}", ex);
|
||||
log.warn("Failed to start prometheus server due to {}", ex);
|
||||
}
|
||||
}
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -290,8 +259,45 @@ public class PrometheusTelemetryManager implements PrometheusTelemetryAdminServi
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restart(String name) {
|
||||
public boolean restart(String name) {
|
||||
stop(name);
|
||||
start(name);
|
||||
return start(name);
|
||||
}
|
||||
|
||||
private String[] getLabelValues(FlowInfo flowInfo) {
|
||||
String[] labelValues = new String[LABEL_TAGS.length];
|
||||
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(FLOW_TYPE)]
|
||||
= String.valueOf(flowInfo.flowType());
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(DEVICE_ID)]
|
||||
= flowInfo.deviceId().toString();
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(INPUT_INTERFACE_ID)]
|
||||
= String.valueOf(flowInfo.inputInterfaceId());
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(OUTPUT_INTERFACE_ID)]
|
||||
= String.valueOf(flowInfo.outputInterfaceId());
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(VXLAN_ID)]
|
||||
= String.valueOf(flowInfo.vxlanId());
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(SRC_IP)]
|
||||
= flowInfo.srcIp().toString();
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(DST_IP)]
|
||||
= flowInfo.dstIp().toString();
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(SRC_PORT)]
|
||||
= getTpPort(flowInfo.srcPort());
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(DST_PORT)]
|
||||
= getTpPort(flowInfo.dstPort());
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(PROTOCOL)]
|
||||
= String.valueOf(flowInfo.protocol());
|
||||
if (flowInfo.vlanId() != null) {
|
||||
labelValues[Arrays.asList(LABEL_TAGS).indexOf(VLAN_ID)]
|
||||
= flowInfo.vlanId().toString();
|
||||
}
|
||||
return labelValues;
|
||||
}
|
||||
|
||||
private String getTpPort(TpPort tpPort) {
|
||||
if (tpPort == null) {
|
||||
return "";
|
||||
}
|
||||
return tpPort.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@ -40,7 +40,9 @@ import java.util.Set;
|
||||
|
||||
import static org.onosproject.openstacktelemetry.api.Constants.REST_SCHEME;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.REST;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig.fromTelemetryConfig;
|
||||
import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.testConnectivity;
|
||||
|
||||
/**
|
||||
* REST telemetry manager.
|
||||
@ -130,12 +132,13 @@ public class RestTelemetryManager implements RestTelemetryAdminService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(String name) {
|
||||
public boolean start(String name) {
|
||||
boolean success = false;
|
||||
TelemetryConfig config = telemetryConfigService.getConfig(name);
|
||||
RestTelemetryConfig restConfig = fromTelemetryConfig(config);
|
||||
|
||||
if (restConfig != null &&
|
||||
!config.name().equals(REST_SCHEME) && config.enabled()) {
|
||||
if (restConfig != null && !config.name().equals(REST_SCHEME) &&
|
||||
config.status() == ENABLED) {
|
||||
StringBuilder restServerBuilder = new StringBuilder();
|
||||
restServerBuilder.append(PROTOCOL);
|
||||
restServerBuilder.append(":");
|
||||
@ -145,13 +148,23 @@ public class RestTelemetryManager implements RestTelemetryAdminService {
|
||||
restServerBuilder.append(restConfig.port());
|
||||
restServerBuilder.append("/");
|
||||
|
||||
Client client = ClientBuilder.newBuilder().build();
|
||||
if (testConnectivity(restConfig.address(), restConfig.port())) {
|
||||
Client client = ClientBuilder.newBuilder().build();
|
||||
|
||||
WebTarget target = client.target(
|
||||
restServerBuilder.toString()).path(restConfig.endpoint());
|
||||
WebTarget target = client.target(
|
||||
restServerBuilder.toString()).path(restConfig.endpoint());
|
||||
|
||||
targets.put(config.name(), target);
|
||||
targets.put(config.name(), target);
|
||||
|
||||
success = true;
|
||||
} else {
|
||||
log.warn("Unable to connect to {}:{}, " +
|
||||
"please check the connectivity manually",
|
||||
restConfig.address(), restConfig.port());
|
||||
}
|
||||
}
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -165,8 +178,8 @@ public class RestTelemetryManager implements RestTelemetryAdminService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restart(String name) {
|
||||
public boolean restart(String name) {
|
||||
stop(name);
|
||||
start(name);
|
||||
return start(name);
|
||||
}
|
||||
}
|
||||
|
||||
@ -84,6 +84,7 @@ public class TelemetryConfigManager
|
||||
.register(DefaultTelemetryConfigProvider.class)
|
||||
.register(TelemetryConfig.class)
|
||||
.register(TelemetryConfig.ConfigType.class)
|
||||
.register(TelemetryConfig.Status.class)
|
||||
.register(DefaultTelemetryConfig.class)
|
||||
.build();
|
||||
|
||||
|
||||
@ -36,7 +36,6 @@ import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Conf
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.PROMETHEUS;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.REST;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.UNKNOWN;
|
||||
|
||||
/**
|
||||
* Utility capable of reading telemetry configuration XML resources and producing
|
||||
@ -61,14 +60,12 @@ public class XmlTelemetryConfigLoader {
|
||||
|
||||
private static final String PROPERTY = "property";
|
||||
|
||||
private static final String TRUE = "true";
|
||||
|
||||
private static final String NAME = "[@name]";
|
||||
private static final String TYPE = "[@type]";
|
||||
private static final String EXTENDS = "[@extends]";
|
||||
private static final String MFG = "[@manufacturer]";
|
||||
private static final String SW = "[@swVersion]";
|
||||
private static final String ENABLED = "[@enabled]";
|
||||
private static final String STATUS = "[@status]";
|
||||
|
||||
private Map<String, TelemetryConfig> configs = Maps.newHashMap();
|
||||
|
||||
@ -148,9 +145,9 @@ public class XmlTelemetryConfigLoader {
|
||||
String swVersion = telemetryCfg.getString(SW, getParentAttribute(parents, SW));
|
||||
|
||||
// note that we do not inherits enabled property from parent
|
||||
String enabledStr = telemetryCfg.getString(ENABLED);
|
||||
|
||||
boolean enabled = enabledStr != null && enabledStr.equalsIgnoreCase(TRUE);
|
||||
String statusStr = telemetryCfg.getString(STATUS);
|
||||
TelemetryConfig.Status status =
|
||||
statusStr == null ? TelemetryConfig.Status.UNKNOWN : status(statusStr);
|
||||
|
||||
TelemetryConfig.ConfigType type = type(typeStr);
|
||||
|
||||
@ -159,7 +156,7 @@ public class XmlTelemetryConfigLoader {
|
||||
}
|
||||
|
||||
return new DefaultTelemetryConfig(name, type, parents, manufacturer,
|
||||
swVersion, enabled, parseProperties(parents, telemetryCfg));
|
||||
swVersion, status, parseProperties(parents, telemetryCfg));
|
||||
}
|
||||
|
||||
private TelemetryConfig.ConfigType type(String typeStr) {
|
||||
@ -176,7 +173,7 @@ public class XmlTelemetryConfigLoader {
|
||||
return PROMETHEUS;
|
||||
case "UNKNOWN":
|
||||
default:
|
||||
return UNKNOWN;
|
||||
return TelemetryConfig.ConfigType.UNKNOWN;
|
||||
}
|
||||
}
|
||||
|
||||
@ -224,4 +221,17 @@ public class XmlTelemetryConfigLoader {
|
||||
|
||||
return properties.build();
|
||||
}
|
||||
|
||||
private TelemetryConfig.Status status(String status) {
|
||||
switch (status.toUpperCase()) {
|
||||
case "ENABLED" :
|
||||
return TelemetryConfig.Status.ENABLED;
|
||||
case "DISABLED" :
|
||||
return TelemetryConfig.Status.DISABLED;
|
||||
case "PENDING" :
|
||||
return TelemetryConfig.Status.PENDING;
|
||||
default:
|
||||
return TelemetryConfig.Status.UNKNOWN;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,6 +19,10 @@ import com.google.common.base.Strings;
|
||||
import org.onlab.packet.IPv4;
|
||||
import org.onosproject.cfg.ConfigProperty;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.Dictionary;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
@ -34,6 +38,7 @@ public final class OpenstackTelemetryUtil {
|
||||
private static final String PROTOCOL_NAME_UDP = "udp";
|
||||
private static final String PROTOCOL_NAME_ANY = "any";
|
||||
private static final int ARBITRARY_PROTOCOL = 0x0;
|
||||
private static final int TIMEOUT = 2000;
|
||||
|
||||
/**
|
||||
* Prevents object instantiation from external.
|
||||
@ -110,4 +115,26 @@ public final class OpenstackTelemetryUtil {
|
||||
return PROTOCOL_NAME_ANY;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the connectivity with the given address and port.
|
||||
*
|
||||
* @param address address
|
||||
* @param port port number
|
||||
* @return true if the given address and port is accessible, false otherwise
|
||||
*/
|
||||
public static boolean testConnectivity(String address, int port) {
|
||||
|
||||
boolean isConnected = false;
|
||||
SocketAddress socketAddress = new InetSocketAddress(address, port);
|
||||
Socket socket = new Socket();
|
||||
try {
|
||||
socket.connect(socketAddress, TIMEOUT);
|
||||
socket.close();
|
||||
isConnected = true;
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
|
||||
return isConnected;
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,13 +22,13 @@
|
||||
<property name="password">onos</property>
|
||||
</config>
|
||||
<config name="sona-influxdb-connector-1" manufacturer="SK Telecom"
|
||||
swVersion="1.0" extends="influxdb" enabled="false">
|
||||
swVersion="1.0" extends="influxdb" status="disabled">
|
||||
<property name="database">ost</property>
|
||||
<property name="measurement">sonaflow</property>
|
||||
<property name="enableBatch">true</property>
|
||||
</config>
|
||||
<config name="sona-influxdb-connector-2" manufacturer="SK Telecom"
|
||||
swVersion="1.0" extends="influxdb" enabled="false">
|
||||
swVersion="1.0" extends="influxdb" status="disabled">
|
||||
<property name="database">ost2</property>
|
||||
<property name="measurement">sonaflow</property>
|
||||
<property name="enableBatch">true</property>
|
||||
|
||||
@ -21,7 +21,7 @@
|
||||
<property name="retries">0</property>
|
||||
</config>
|
||||
<config name="tina-kafka-exporter" manufacturer="SK Telecom"
|
||||
swVersion="1.0" extends="kafka" enabled="false">
|
||||
swVersion="1.0" extends="kafka" status="disabled">
|
||||
<property name="batchSize">16384</property>
|
||||
<property name="lingerMs">1</property>
|
||||
<property name="memoryBuffer">33554432</property>
|
||||
|
||||
@ -21,6 +21,6 @@
|
||||
</config>
|
||||
|
||||
<config name="sona-prometheus-exporter" manufacturer="SK Telecom"
|
||||
swVersion="1.0" extends="prometheus" enabled="false">
|
||||
swVersion="1.0" extends="prometheus" status="disabled">
|
||||
</config>
|
||||
</configs>
|
||||
@ -21,7 +21,7 @@
|
||||
<property name="requestMediaType">application/json</property>
|
||||
<property name="responseMediaType">application/json</property>
|
||||
</config>
|
||||
<config name="rest-connector" swVersion="1.0" extends="rest" enabled="false">
|
||||
<config name="rest-connector" swVersion="1.0" extends="rest" status="disabled">
|
||||
<property name="endpoint">telemetry</property>
|
||||
<property name="method">POST</property>
|
||||
</config>
|
||||
|
||||
@ -31,6 +31,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.GRPC;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig.ADDRESS;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig.MAX_INBOUND_MSG_SIZE;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig.PORT;
|
||||
@ -146,7 +147,7 @@ public final class DefaultGrpcTelemetryConfigTest {
|
||||
props.put(MAX_INBOUND_MSG_SIZE, String.valueOf(MSG_SIZE_1));
|
||||
props.put(USE_PLAINTEXT, String.valueOf(USE_PLAIN_TEXT_1));
|
||||
TelemetryConfig config = new DefaultTelemetryConfig(DUMMY, GRPC,
|
||||
ImmutableList.of(), DUMMY, DUMMY, false, props);
|
||||
ImmutableList.of(), DUMMY, DUMMY, ENABLED, props);
|
||||
|
||||
GrpcTelemetryConfig grpcConfig = fromTelemetryConfig(config);
|
||||
assertThat(grpcConfig.address(), is(IP_ADDRESS_1));
|
||||
|
||||
@ -31,6 +31,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.INFLUXDB;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.DISABLED;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.ADDRESS;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.DATABASE;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.ENABLE_BATCH;
|
||||
@ -174,7 +175,7 @@ public final class DefaultInfluxDbTelemetryConfigTest {
|
||||
props.put(MEASUREMENT, MEASUREMENT_1);
|
||||
|
||||
TelemetryConfig config = new DefaultTelemetryConfig(DUMMY, INFLUXDB,
|
||||
ImmutableList.of(), DUMMY, DUMMY, false, props);
|
||||
ImmutableList.of(), DUMMY, DUMMY, DISABLED, props);
|
||||
|
||||
InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(config);
|
||||
assertThat(influxDbConfig.address(), is(IP_ADDRESS_1));
|
||||
|
||||
@ -31,6 +31,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.DISABLED;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.ADDRESS;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.BATCH_SIZE;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.CODEC;
|
||||
@ -215,7 +216,7 @@ public final class DefaultKafkaTelemetryConfigTest {
|
||||
props.put(CODEC, CODEC_1);
|
||||
|
||||
TelemetryConfig config = new DefaultTelemetryConfig(DUMMY, KAFKA,
|
||||
ImmutableList.of(), DUMMY, DUMMY, false, props);
|
||||
ImmutableList.of(), DUMMY, DUMMY, DISABLED, props);
|
||||
|
||||
KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
|
||||
assertThat(kafkaConfig.address(), is(IP_ADDRESS_1));
|
||||
|
||||
@ -31,6 +31,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.PROMETHEUS;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.DISABLED;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultPrometheusTelemetryConfig.ADDRESS;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultPrometheusTelemetryConfig.PORT;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultPrometheusTelemetryConfig.fromTelemetryConfig;
|
||||
@ -128,7 +129,7 @@ public class DefaultPrometheusTelemetryConfigTest {
|
||||
props.put(PORT, String.valueOf(PORT_1));
|
||||
|
||||
TelemetryConfig config = new DefaultTelemetryConfig(DUMMY, PROMETHEUS,
|
||||
ImmutableList.of(), DUMMY, DUMMY, false, props);
|
||||
ImmutableList.of(), DUMMY, DUMMY, DISABLED, props);
|
||||
|
||||
PrometheusTelemetryConfig prometheusConfig = fromTelemetryConfig(config);
|
||||
assertThat(prometheusConfig.address(), is(IP_ADDRESS_1));
|
||||
|
||||
@ -31,6 +31,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.REST;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.DISABLED;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig.ADDRESS;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig.ENDPOINT;
|
||||
import static org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig.METHOD;
|
||||
@ -161,7 +162,7 @@ public final class DefaultRestTelemetryConfigTest {
|
||||
props.put(RESPONSE_MEDIA_TYPE, RESPONSE_MEDIA_TYPE_1);
|
||||
|
||||
TelemetryConfig config = new DefaultTelemetryConfig(DUMMY, REST,
|
||||
ImmutableList.of(), DUMMY, DUMMY, false, props);
|
||||
ImmutableList.of(), DUMMY, DUMMY, DISABLED, props);
|
||||
RestTelemetryConfig restConfig = DefaultRestTelemetryConfig.fromTelemetryConfig(config);
|
||||
assertThat(restConfig.address(), is(IP_ADDRESS_1));
|
||||
assertThat(restConfig.port(), is(PORT_1));
|
||||
|
||||
@ -32,6 +32,8 @@ import java.util.Map;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.DISABLED;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
|
||||
|
||||
/**
|
||||
* Distributed TelemetryConfig store test suite.
|
||||
@ -62,8 +64,8 @@ public class DistributedTelemetryConfigStoreTest {
|
||||
private static final String PROP_2_VALUE_1 = "value21";
|
||||
private static final String PROP_2_VALUE_2 = "value22";
|
||||
|
||||
private static final boolean ENABLED_1 = true;
|
||||
private static final boolean ENABLED_2 = false;
|
||||
private static final TelemetryConfig.Status STATUS_1 = ENABLED;
|
||||
private static final TelemetryConfig.Status STATUS_2 = DISABLED;
|
||||
|
||||
private TelemetryConfig config1;
|
||||
private TelemetryConfig config2;
|
||||
@ -132,8 +134,8 @@ public class DistributedTelemetryConfigStoreTest {
|
||||
PROP_2.put(PROP_2_KEY_2, PROP_2_VALUE_2);
|
||||
|
||||
config1 = new DefaultTelemetryConfig(NAME_1, TYPE_1, null,
|
||||
MANUFACTURER_1, SW_VERSION_1, ENABLED_1, PROP_1);
|
||||
MANUFACTURER_1, SW_VERSION_1, STATUS_1, PROP_1);
|
||||
config2 = new DefaultTelemetryConfig(NAME_2, TYPE_2, null,
|
||||
MANUFACTURER_2, SW_VERSION_2, ENABLED_2, PROP_2);
|
||||
MANUFACTURER_2, SW_VERSION_2, STATUS_2, PROP_2);
|
||||
}
|
||||
}
|
||||
|
||||
@ -40,7 +40,7 @@ public final class TelemetryConfigJsonCodec extends JsonCodec<TelemetryConfig> {
|
||||
private static final String TYPE = "type";
|
||||
private static final String MANUFACTURER = "manufacturer";
|
||||
private static final String SW_VERSION = "swVersion";
|
||||
private static final String ENABLED = "enabled";
|
||||
private static final String STATUS = "status";
|
||||
private static final String PROPS = "props";
|
||||
private static final String KEY = "key";
|
||||
private static final String VALUE = "value";
|
||||
@ -56,7 +56,7 @@ public final class TelemetryConfigJsonCodec extends JsonCodec<TelemetryConfig> {
|
||||
.put(TYPE, config.type().name())
|
||||
.put(MANUFACTURER, config.manufacturer())
|
||||
.put(SW_VERSION, config.swVersion())
|
||||
.put(ENABLED, config.enabled());
|
||||
.put(STATUS, config.status().name());
|
||||
|
||||
Map<String, String> props = config.properties();
|
||||
ArrayNode propsJson = context.mapper().createArrayNode();
|
||||
@ -94,9 +94,9 @@ public final class TelemetryConfigJsonCodec extends JsonCodec<TelemetryConfig> {
|
||||
String swVersion = nullIsIllegal(json.get(SW_VERSION),
|
||||
SW_VERSION + MISSING_MESSAGE).asText();
|
||||
|
||||
// parse enabled flag
|
||||
boolean enabled = nullIsIllegal(json.get(ENABLED),
|
||||
ENABLED + MISSING_MESSAGE).asBoolean();
|
||||
// parse status
|
||||
TelemetryConfig.Status status = status(nullIsIllegal(json.get(STATUS),
|
||||
STATUS + MISSING_MESSAGE).asText());
|
||||
|
||||
JsonNode propertiesJson = json.get(PROPS);
|
||||
Map<String, String> properties = Maps.newConcurrentMap();
|
||||
@ -109,7 +109,20 @@ public final class TelemetryConfigJsonCodec extends JsonCodec<TelemetryConfig> {
|
||||
}
|
||||
|
||||
return new DefaultTelemetryConfig(name, configType,
|
||||
ImmutableList.of(), manufacturer, swVersion, enabled, properties);
|
||||
ImmutableList.of(), manufacturer, swVersion, status, properties);
|
||||
}
|
||||
|
||||
private TelemetryConfig.Status status(String status) {
|
||||
switch (status.toUpperCase()) {
|
||||
case "ENABLED" :
|
||||
return TelemetryConfig.Status.ENABLED;
|
||||
case "DISABLED" :
|
||||
return TelemetryConfig.Status.DISABLED;
|
||||
case "PENDING" :
|
||||
return TelemetryConfig.Status.PENDING;
|
||||
default:
|
||||
return TelemetryConfig.Status.UNKNOWN;
|
||||
}
|
||||
}
|
||||
|
||||
private TelemetryConfig.ConfigType configType(String type) {
|
||||
|
||||
@ -38,6 +38,8 @@ import java.util.Map;
|
||||
|
||||
import static org.onlab.util.Tools.nullIsIllegal;
|
||||
import static org.onlab.util.Tools.nullIsNotFound;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.DISABLED;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
|
||||
|
||||
/**
|
||||
* Handles REST API call of openstack telemetry configuration.
|
||||
@ -175,7 +177,7 @@ public class OpenstackTelemetryConfigWebResource extends AbstractWebResource {
|
||||
log.warn("There is no config found to enable for {}", configName);
|
||||
return Response.notModified().build();
|
||||
} else {
|
||||
TelemetryConfig updatedConfig = config.updateEnabled(true);
|
||||
TelemetryConfig updatedConfig = config.updateStatus(ENABLED);
|
||||
configService.updateTelemetryConfig(updatedConfig);
|
||||
return Response.ok().build();
|
||||
}
|
||||
@ -204,7 +206,7 @@ public class OpenstackTelemetryConfigWebResource extends AbstractWebResource {
|
||||
log.warn("There is no config found to disable for {}", configName);
|
||||
return Response.notModified().build();
|
||||
} else {
|
||||
TelemetryConfig updatedConfig = config.updateEnabled(false);
|
||||
TelemetryConfig updatedConfig = config.updateStatus(DISABLED);
|
||||
configService.updateTelemetryConfig(updatedConfig);
|
||||
return Response.ok().build();
|
||||
}
|
||||
|
||||
@ -36,13 +36,13 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
import static junit.framework.TestCase.assertTrue;
|
||||
import static org.easymock.EasyMock.createMock;
|
||||
import static org.easymock.EasyMock.expect;
|
||||
import static org.easymock.EasyMock.replay;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.onosproject.net.NetTestTools.APP_ID;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
|
||||
import static org.onosproject.openstacktelemetry.codec.rest.TelemetryConfigJsonMatcher.matchesTelemetryConfig;
|
||||
|
||||
/**
|
||||
@ -80,14 +80,14 @@ public class TelemetryConfigCodecTest {
|
||||
TelemetryConfig.ConfigType type = TelemetryConfig.ConfigType.GRPC;
|
||||
String manufacturer = "grpc.io";
|
||||
String swVersion = "1.0";
|
||||
boolean enabled = true;
|
||||
TelemetryConfig.Status status = ENABLED;
|
||||
|
||||
Map<String, String> properties = Maps.newConcurrentMap();
|
||||
properties.put("key1", "value1");
|
||||
properties.put("key2", "value2");
|
||||
|
||||
TelemetryConfig config = new DefaultTelemetryConfig(name, type,
|
||||
ImmutableList.of(), manufacturer, swVersion, enabled, properties);
|
||||
ImmutableList.of(), manufacturer, swVersion, status, properties);
|
||||
|
||||
ObjectNode configJson = telemetryConfigCodec.encode(config, context);
|
||||
assertThat(configJson, matchesTelemetryConfig(config));
|
||||
@ -104,7 +104,7 @@ public class TelemetryConfigCodecTest {
|
||||
assertEquals(config.type().name(), "GRPC");
|
||||
assertEquals(config.manufacturer(), "grpc.io");
|
||||
assertEquals(config.swVersion(), "1.0");
|
||||
assertTrue(config.enabled());
|
||||
assertEquals(config.status().name(), "ENABLED");
|
||||
|
||||
config.properties().forEach((k, v) -> {
|
||||
if (k.equals("address")) {
|
||||
|
||||
@ -32,7 +32,7 @@ public final class TelemetryConfigJsonMatcher extends TypeSafeDiagnosingMatcher<
|
||||
private static final String TYPE = "type";
|
||||
private static final String MANUFACTURER = "manufacturer";
|
||||
private static final String SW_VERSION = "swVersion";
|
||||
private static final String ENABLED = "enabled";
|
||||
private static final String STATUS = "status";
|
||||
private static final String PROPS = "props";
|
||||
private static final String KEY = "key";
|
||||
private static final String VALUE = "value";
|
||||
@ -76,11 +76,11 @@ public final class TelemetryConfigJsonMatcher extends TypeSafeDiagnosingMatcher<
|
||||
return false;
|
||||
}
|
||||
|
||||
// check enabled
|
||||
JsonNode jsonEnabled = jsonNode.get(ENABLED);
|
||||
boolean enabled = telemetryConfig.enabled();
|
||||
if (jsonEnabled == null || jsonEnabled.asBoolean() != enabled) {
|
||||
description.appendText("Enabled was " + jsonEnabled);
|
||||
// check status
|
||||
JsonNode jsonStatus = jsonNode.get(STATUS);
|
||||
TelemetryConfig.Status status = telemetryConfig.status();
|
||||
if (jsonStatus == null || !jsonStatus.asText().equals(status.name())) {
|
||||
description.appendText("Enabled was " + jsonStatus);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@ -39,6 +39,7 @@ import static org.easymock.EasyMock.expect;
|
||||
import static org.easymock.EasyMock.replay;
|
||||
import static org.easymock.EasyMock.verify;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.GRPC;
|
||||
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
|
||||
|
||||
/**
|
||||
* Unit tests for openstack telemetry config REST API.
|
||||
@ -60,7 +61,7 @@ public class OpenstackTelemetryConfigWebResourceTest extends ResourceTest {
|
||||
private static final String PROP_VALUE_1 = "value11";
|
||||
private static final String PROP_VALUE_2 = "value12";
|
||||
|
||||
private static final boolean ENABLED = true;
|
||||
private static final TelemetryConfig.Status STATUS = ENABLED;
|
||||
|
||||
private final TelemetryConfigAdminService mockConfigAdminService =
|
||||
createMock(TelemetryConfigAdminService.class);
|
||||
@ -90,7 +91,7 @@ public class OpenstackTelemetryConfigWebResourceTest extends ResourceTest {
|
||||
PROP.put(PROP_KEY_2, PROP_VALUE_2);
|
||||
|
||||
telemetryConfig = new DefaultTelemetryConfig(NAME, TYPE, null,
|
||||
MANUFACTURER, SW_VERSION, ENABLED, PROP);
|
||||
MANUFACTURER, SW_VERSION, STATUS, PROP);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
"type": "GRPC",
|
||||
"manufacturer": "grpc.io",
|
||||
"swVersion": "1.0",
|
||||
"enabled": true,
|
||||
"status": "enabled",
|
||||
"props": [
|
||||
{
|
||||
"key": "address",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user