From b3b0168b723d1d0a60d85d8854743d3e48b0c0df Mon Sep 17 00:00:00 2001 From: Jian Li Date: Wed, 9 Jan 2019 17:10:12 +0900 Subject: [PATCH] Fix: gracefully handle connection refuse due to kafka broker down Change-Id: Ia2f6c9dc19326eda237b621d8a430a23cb62f0eb --- .../impl/KafkaTelemetryManager.java | 44 ++++++++++++------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java index bbd8527f7c..4ddb4c16fa 100644 --- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java @@ -42,6 +42,18 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.Future; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.TIMEOUT_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; import static org.onosproject.openstacktelemetry.api.Constants.KAFKA_SCHEME; import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA; import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED; @@ -56,14 +68,10 @@ public class KafkaTelemetryManager implements KafkaTelemetryAdminService { private final Logger log = LoggerFactory.getLogger(getClass()); - private static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; - private static final String RETRIES = "retries"; - private static final String ACKS = "acks"; - private static final String BATCH_SIZE = "batch.size"; - private static final String LINGER_MS = "linger.ms"; - private static final String MEMORY_BUFFER = "buffer.memory"; - private static final String KEY_SERIALIZER = "key.serializer"; - private static final String VALUE_SERIALIZER = "value.serializer"; + private static final int METADATA_FETCH_TIMEOUT_VAL = 300; + private static final int TIMEOUT_VAL = 300; + private static final int RETRY_BACKOFF_MS_VAL = 10000; + private static final int RECONNECT_BACKOFF_MS_VAL = 10000; @Reference(cardinality = ReferenceCardinality.MANDATORY) protected OpenstackTelemetryService openstackTelemetryService; @@ -139,14 +147,18 @@ public class KafkaTelemetryManager implements KafkaTelemetryAdminService { // Configure Kafka server properties Properties prop = new Properties(); - prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString()); - prop.put(RETRIES, kafkaConfig.retries()); - prop.put(ACKS, kafkaConfig.requiredAcks()); - prop.put(BATCH_SIZE, kafkaConfig.batchSize()); - prop.put(LINGER_MS, kafkaConfig.lingerMs()); - prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer()); - prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer()); - prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer()); + prop.put(BOOTSTRAP_SERVERS_CONFIG, kafkaServerBuilder.toString()); + prop.put(RETRIES_CONFIG, kafkaConfig.retries()); + prop.put(ACKS_CONFIG, kafkaConfig.requiredAcks()); + prop.put(BATCH_SIZE_CONFIG, kafkaConfig.batchSize()); + prop.put(LINGER_MS_CONFIG, kafkaConfig.lingerMs()); + prop.put(BUFFER_MEMORY_CONFIG, kafkaConfig.memoryBuffer()); + prop.put(KEY_SERIALIZER_CLASS_CONFIG, kafkaConfig.keySerializer()); + prop.put(VALUE_SERIALIZER_CLASS_CONFIG, kafkaConfig.valueSerializer()); + prop.put(METADATA_FETCH_TIMEOUT_CONFIG, METADATA_FETCH_TIMEOUT_VAL); + prop.put(TIMEOUT_CONFIG, TIMEOUT_VAL); + prop.put(RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS_VAL); + prop.put(RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MS_VAL); if (testConnectivity(kafkaConfig.address(), kafkaConfig.port())) { producers.put(name, new KafkaProducer<>(prop));