diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicCounterBuilder.java b/core/api/src/main/java/org/onosproject/store/service/AtomicCounterBuilder.java index 8819977bc5..1f5400aa1b 100644 --- a/core/api/src/main/java/org/onosproject/store/service/AtomicCounterBuilder.java +++ b/core/api/src/main/java/org/onosproject/store/service/AtomicCounterBuilder.java @@ -1,5 +1,7 @@ package org.onosproject.store.service; +import java.util.concurrent.ScheduledExecutorService; + /** * Builder for AtomicCounter. */ @@ -32,6 +34,24 @@ public interface AtomicCounterBuilder { */ AtomicCounterBuilder withPartitionsDisabled(); + /** + * Enables retries when counter operations fail. + *

+ * Note: Use with caution. By default retries are disabled. + *

+ * @return this AtomicCounterBuilder + */ + AtomicCounterBuilder withRetryOnFailure(); + + /** + * Sets the executor service to use for retrying failed operations. + *

+ * Note: Must be set when retries are enabled + *

+ * @return this AtomicCounterBuilder + */ + AtomicCounterBuilder withRetryExecutor(ScheduledExecutorService executor); + /** * Builds a AtomicCounter based on the configuration options * supplied to this builder. diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java index a69c0cc63e..cdaf79233f 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java @@ -16,10 +16,20 @@ package org.onosproject.store.consistent.impl; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import org.onosproject.store.service.AsyncAtomicCounter; + + + + +import org.slf4j.Logger; + import static com.google.common.base.Preconditions.*; +import static org.slf4j.LoggerFactory.getLogger; /** * Default implementation for a distributed AsyncAtomicCounter backed by @@ -31,10 +41,20 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { private final String name; private final Database database; + private final boolean retryOnFailure; + private final ScheduledExecutorService retryExecutor; + // TODO: configure delay via builder + private static final int DELAY_BETWEEN_RETRY_SEC = 1; + private final Logger log = getLogger(getClass()); - public DefaultAsyncAtomicCounter(String name, Database database) { + public DefaultAsyncAtomicCounter(String name, + Database database, + boolean retryOnException, + ScheduledExecutorService retryExecutor) { this.name = checkNotNull(name); this.database = checkNotNull(database); + this.retryOnFailure = retryOnException; + this.retryExecutor = retryExecutor; } @Override @@ -54,11 +74,70 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { @Override public CompletableFuture getAndAdd(long delta) { - return database.counterGetAndAdd(name, delta); + CompletableFuture result = database.counterGetAndAdd(name, delta); + if (!retryOnFailure) { + return result; + } + + CompletableFuture future = new CompletableFuture<>(); + return result.whenComplete((r, e) -> { + if (e != null) { + log.warn("getAndAdd failed due to {}. Will retry", e.getMessage()); + retryExecutor.schedule(new RetryTask(database::counterGetAndAdd, delta, future), + DELAY_BETWEEN_RETRY_SEC, + TimeUnit.SECONDS); + } else { + future.complete(r); + } + }).thenCompose(v -> future); } @Override public CompletableFuture addAndGet(long delta) { - return database.counterAddAndGet(name, delta); + CompletableFuture result = database.counterAddAndGet(name, delta); + if (!retryOnFailure) { + return result; + } + + CompletableFuture future = new CompletableFuture<>(); + return result.whenComplete((r, e) -> { + if (e != null) { + log.warn("addAndGet failed due to {}. Will retry", e.getMessage()); + retryExecutor.schedule(new RetryTask(database::counterAddAndGet, delta, future), + DELAY_BETWEEN_RETRY_SEC, + TimeUnit.SECONDS); + } else { + future.complete(r); + } + }).thenCompose(v -> future); } -} + + private class RetryTask implements Runnable { + + private final BiFunction> function; + private final Long delta; + private final CompletableFuture result; + + public RetryTask(BiFunction> function, + Long delta, + CompletableFuture result) { + this.function = function; + this.delta = delta; + this.result = result; + } + + @Override + public void run() { + function.apply(name, delta).whenComplete((r, e) -> { + if (e == null) { + result.complete(r); + } else { + log.warn("{} retry failed due to {}. Will try again...", function, e.getMessage()); + // TODO: Exponential backoff + // TODO: limit retries + retryExecutor.schedule(this, DELAY_BETWEEN_RETRY_SEC, TimeUnit.SECONDS); + } + }); + } + } +} \ No newline at end of file diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java index fee91fa7d9..18120bb8ef 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java @@ -17,6 +17,7 @@ package org.onosproject.store.consistent.impl; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -36,8 +37,11 @@ public class DefaultAtomicCounter implements AtomicCounter { private final AsyncAtomicCounter asyncCounter; - public DefaultAtomicCounter(String name, Database database) { - asyncCounter = new DefaultAsyncAtomicCounter(name, database); + public DefaultAtomicCounter(String name, + Database database, + boolean retryOnException, + ScheduledExecutorService retryExecutor) { + asyncCounter = new DefaultAsyncAtomicCounter(name, database, retryOnException, retryExecutor); } @Override diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java index c84cff0f45..97361db2f1 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java @@ -1,5 +1,7 @@ package org.onosproject.store.consistent.impl; +import java.util.concurrent.ScheduledExecutorService; + import org.onosproject.store.service.AsyncAtomicCounter; import org.onosproject.store.service.AtomicCounter; import org.onosproject.store.service.AtomicCounterBuilder; @@ -15,6 +17,8 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder { private boolean partitionsEnabled = true; private final Database partitionedDatabase; private final Database inMemoryDatabase; + private boolean retryOnFailure = false; + private ScheduledExecutorService retryExecutor = null; public DefaultAtomicCounterBuilder(Database inMemoryDatabase, Database partitionedDatabase) { this.inMemoryDatabase = inMemoryDatabase; @@ -36,13 +40,35 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder { @Override public AtomicCounter build() { + validateInputs(); Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase; - return new DefaultAtomicCounter(name, database); + return new DefaultAtomicCounter(name, database, retryOnFailure, retryExecutor); } @Override public AsyncAtomicCounter buildAsyncCounter() { + validateInputs(); Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase; - return new DefaultAsyncAtomicCounter(name, database); + return new DefaultAsyncAtomicCounter(name, database, retryOnFailure, retryExecutor); + } + + @Override + public AtomicCounterBuilder withRetryOnFailure() { + retryOnFailure = true; + return this; + } + + @Override + public AtomicCounterBuilder withRetryExecutor(ScheduledExecutorService executor) { + this.retryExecutor = executor; + return this; + } + + private void validateInputs() { + if (retryOnFailure) { + if (retryExecutor == null) { + throw new IllegalArgumentException("RetryExecutor must be specified when retries are enabled"); + } + } } } diff --git a/core/store/dist/src/main/java/org/onosproject/store/core/impl/ConsistentApplicationIdStore.java b/core/store/dist/src/main/java/org/onosproject/store/core/impl/ConsistentApplicationIdStore.java index ef17efb252..ecfc3dcb90 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/core/impl/ConsistentApplicationIdStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/core/impl/ConsistentApplicationIdStore.java @@ -15,10 +15,14 @@ */ package org.onosproject.store.core.impl; +import static org.onlab.util.Tools.groupedThreads; import static org.slf4j.LoggerFactory.getLogger; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -30,7 +34,7 @@ import org.onosproject.core.ApplicationId; import org.onosproject.core.ApplicationIdStore; import org.onosproject.core.DefaultApplicationId; import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.service.AtomicCounter; +import org.onosproject.store.service.AsyncAtomicCounter; import org.onosproject.store.service.ConsistentMap; import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; @@ -39,6 +43,7 @@ import org.slf4j.Logger; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; /** * ApplicationIdStore implementation on top of {@code AtomicCounter} @@ -53,10 +58,11 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore { @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected StorageService storageService; - private AtomicCounter appIdCounter; + private AsyncAtomicCounter appIdCounter; private ConsistentMap registeredIds; private Map nameToAppIdCache = Maps.newConcurrentMap(); private Map idToAppIdCache = Maps.newConcurrentMap(); + private ScheduledExecutorService executor; private static final Serializer SERIALIZER = Serializer.using(new KryoNamespace.Builder() .register(KryoNamespaces.API) @@ -65,10 +71,13 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore { @Activate public void activate() { + executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/store/appId", "retry-handler")); appIdCounter = storageService.atomicCounterBuilder() .withName("onos-app-id-counter") .withPartitionsDisabled() - .build(); + .withRetryOnFailure() + .withRetryExecutor(executor) + .buildAsyncCounter(); registeredIds = storageService.consistentMapBuilder() .withName("onos-app-ids") @@ -83,6 +92,7 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore { @Deactivate public void deactivate() { + executor.shutdown(); log.info("Stopped"); } @@ -118,7 +128,7 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore { ApplicationId appId = nameToAppIdCache.computeIfAbsent(name, key -> { Versioned existingAppId = registeredIds.get(name); if (existingAppId == null) { - int id = (int) appIdCounter.incrementAndGet(); + int id = Futures.getUnchecked(appIdCounter.incrementAndGet()).intValue(); DefaultApplicationId newAppId = new DefaultApplicationId(id, name); existingAppId = registeredIds.putIfAbsent(name, newAppId); if (existingAppId != null) {