diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java index 5d7283809c..504fa75a71 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java @@ -18,7 +18,6 @@ package org.onosproject.store.primitives.impl; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import org.onlab.util.Tools; import org.onosproject.store.service.AsyncAtomicCounter; /** @@ -27,46 +26,45 @@ import org.onosproject.store.service.AsyncAtomicCounter; */ public class ExecutingAsyncAtomicCounter extends ExecutingDistributedPrimitive implements AsyncAtomicCounter { private final AsyncAtomicCounter delegateCounter; - private final Executor executor; - public ExecutingAsyncAtomicCounter(AsyncAtomicCounter delegateCounter, Executor executor) { - super(delegateCounter, executor); + public ExecutingAsyncAtomicCounter( + AsyncAtomicCounter delegateCounter, Executor orderedExecutor, Executor threadPoolExecutor) { + super(delegateCounter, orderedExecutor, threadPoolExecutor); this.delegateCounter = delegateCounter; - this.executor = executor; } @Override public CompletableFuture incrementAndGet() { - return Tools.asyncFuture(delegateCounter.incrementAndGet(), executor); + return asyncFuture(delegateCounter.incrementAndGet()); } @Override public CompletableFuture getAndIncrement() { - return Tools.asyncFuture(delegateCounter.getAndIncrement(), executor); + return asyncFuture(delegateCounter.getAndIncrement()); } @Override public CompletableFuture getAndAdd(long delta) { - return Tools.asyncFuture(delegateCounter.getAndAdd(delta), executor); + return asyncFuture(delegateCounter.getAndAdd(delta)); } @Override public CompletableFuture addAndGet(long delta) { - return Tools.asyncFuture(delegateCounter.addAndGet(delta), executor); + return asyncFuture(delegateCounter.addAndGet(delta)); } @Override public CompletableFuture get() { - return Tools.asyncFuture(delegateCounter.get(), executor); + return asyncFuture(delegateCounter.get()); } @Override public CompletableFuture set(long value) { - return Tools.asyncFuture(delegateCounter.set(value), executor); + return asyncFuture(delegateCounter.set(value)); } @Override public CompletableFuture compareAndSet(long expectedValue, long updateValue) { - return Tools.asyncFuture(delegateCounter.compareAndSet(expectedValue, updateValue), executor); + return asyncFuture(delegateCounter.compareAndSet(expectedValue, updateValue)); } } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java index 9a039f2b69..a17a2f02c2 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java @@ -18,7 +18,6 @@ package org.onosproject.store.primitives.impl; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import org.onlab.util.Tools; import org.onosproject.store.service.AsyncAtomicCounterMap; /** @@ -28,86 +27,85 @@ import org.onosproject.store.service.AsyncAtomicCounterMap; public class ExecutingAsyncAtomicCounterMap extends ExecutingDistributedPrimitive implements AsyncAtomicCounterMap { private final AsyncAtomicCounterMap delegateMap; - private final Executor executor; - public ExecutingAsyncAtomicCounterMap(AsyncAtomicCounterMap delegateMap, Executor executor) { - super(delegateMap, executor); + public ExecutingAsyncAtomicCounterMap( + AsyncAtomicCounterMap delegateMap, Executor orderedExecutor, Executor threadPoolExecutor) { + super(delegateMap, orderedExecutor, threadPoolExecutor); this.delegateMap = delegateMap; - this.executor = executor; } @Override public CompletableFuture incrementAndGet(K key) { - return Tools.asyncFuture(delegateMap.incrementAndGet(key), executor); + return asyncFuture(delegateMap.incrementAndGet(key)); } @Override public CompletableFuture decrementAndGet(K key) { - return Tools.asyncFuture(delegateMap.decrementAndGet(key), executor); + return asyncFuture(delegateMap.decrementAndGet(key)); } @Override public CompletableFuture getAndIncrement(K key) { - return Tools.asyncFuture(delegateMap.getAndIncrement(key), executor); + return asyncFuture(delegateMap.getAndIncrement(key)); } @Override public CompletableFuture getAndDecrement(K key) { - return Tools.asyncFuture(delegateMap.getAndDecrement(key), executor); + return asyncFuture(delegateMap.getAndDecrement(key)); } @Override public CompletableFuture addAndGet(K key, long delta) { - return Tools.asyncFuture(delegateMap.addAndGet(key, delta), executor); + return asyncFuture(delegateMap.addAndGet(key, delta)); } @Override public CompletableFuture getAndAdd(K key, long delta) { - return Tools.asyncFuture(delegateMap.getAndAdd(key, delta), executor); + return asyncFuture(delegateMap.getAndAdd(key, delta)); } @Override public CompletableFuture get(K key) { - return Tools.asyncFuture(delegateMap.get(key), executor); + return asyncFuture(delegateMap.get(key)); } @Override public CompletableFuture put(K key, long newValue) { - return Tools.asyncFuture(delegateMap.put(key, newValue), executor); + return asyncFuture(delegateMap.put(key, newValue)); } @Override public CompletableFuture putIfAbsent(K key, long newValue) { - return Tools.asyncFuture(delegateMap.putIfAbsent(key, newValue), executor); + return asyncFuture(delegateMap.putIfAbsent(key, newValue)); } @Override public CompletableFuture replace(K key, long expectedOldValue, long newValue) { - return Tools.asyncFuture(delegateMap.replace(key, expectedOldValue, newValue), executor); + return asyncFuture(delegateMap.replace(key, expectedOldValue, newValue)); } @Override public CompletableFuture remove(K key) { - return Tools.asyncFuture(delegateMap.remove(key), executor); + return asyncFuture(delegateMap.remove(key)); } @Override public CompletableFuture remove(K key, long value) { - return Tools.asyncFuture(delegateMap.remove(key, value), executor); + return asyncFuture(delegateMap.remove(key, value)); } @Override public CompletableFuture size() { - return Tools.asyncFuture(delegateMap.size(), executor); + return asyncFuture(delegateMap.size()); } @Override public CompletableFuture isEmpty() { - return Tools.asyncFuture(delegateMap.isEmpty(), executor); + return asyncFuture(delegateMap.isEmpty()); } @Override public CompletableFuture clear() { - return Tools.asyncFuture(delegateMap.clear(), executor); + return asyncFuture(delegateMap.clear()); } } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java index 40eacc6f30..c8bba5239a 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java @@ -20,7 +20,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import com.google.common.collect.Maps; -import org.onlab.util.Tools; import org.onosproject.store.service.AsyncAtomicValue; import org.onosproject.store.service.AtomicValueEventListener; @@ -30,47 +29,48 @@ import org.onosproject.store.service.AtomicValueEventListener; */ public class ExecutingAsyncAtomicValue extends ExecutingDistributedPrimitive implements AsyncAtomicValue { private final AsyncAtomicValue delegateValue; - private final Executor executor; + private final Executor orderedExecutor; private final Map, AtomicValueEventListener> listenerMap = Maps.newConcurrentMap(); - public ExecutingAsyncAtomicValue(AsyncAtomicValue delegateValue, Executor executor) { - super(delegateValue, executor); + public ExecutingAsyncAtomicValue( + AsyncAtomicValue delegateValue, Executor orderedExecutor, Executor threadPoolExecutor) { + super(delegateValue, orderedExecutor, threadPoolExecutor); this.delegateValue = delegateValue; - this.executor = executor; + this.orderedExecutor = orderedExecutor; } @Override public CompletableFuture compareAndSet(V expect, V update) { - return Tools.asyncFuture(delegateValue.compareAndSet(expect, update), executor); + return asyncFuture(delegateValue.compareAndSet(expect, update)); } @Override public CompletableFuture get() { - return Tools.asyncFuture(delegateValue.get(), executor); + return asyncFuture(delegateValue.get()); } @Override public CompletableFuture getAndSet(V value) { - return Tools.asyncFuture(delegateValue.getAndSet(value), executor); + return asyncFuture(delegateValue.getAndSet(value)); } @Override public CompletableFuture set(V value) { - return Tools.asyncFuture(delegateValue.set(value), executor); + return asyncFuture(delegateValue.set(value)); } @Override public CompletableFuture addListener(AtomicValueEventListener listener) { - AtomicValueEventListener wrappedListener = e -> executor.execute(() -> listener.event(e)); + AtomicValueEventListener wrappedListener = e -> orderedExecutor.execute(() -> listener.event(e)); listenerMap.put(listener, wrappedListener); - return Tools.asyncFuture(delegateValue.addListener(wrappedListener), executor); + return asyncFuture(delegateValue.addListener(wrappedListener)); } @Override public CompletableFuture removeListener(AtomicValueEventListener listener) { AtomicValueEventListener wrappedListener = listenerMap.remove(listener); if (wrappedListener != null) { - return Tools.asyncFuture(delegateValue.removeListener(wrappedListener), executor); + return asyncFuture(delegateValue.removeListener(wrappedListener)); } return CompletableFuture.completedFuture(null); } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java index 07911aff8d..d955121a41 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java @@ -23,7 +23,6 @@ import java.util.concurrent.Executor; import java.util.function.BiFunction; import java.util.function.Predicate; -import org.onlab.util.Tools; import org.onosproject.store.primitives.MapUpdate; import org.onosproject.store.primitives.TransactionId; import org.onosproject.store.service.AsyncConsistentMap; @@ -39,147 +38,146 @@ import org.onosproject.store.service.Versioned; public class ExecutingAsyncConsistentMap extends ExecutingDistributedPrimitive implements AsyncConsistentMap { private final AsyncConsistentMap delegateMap; - private final Executor executor; - public ExecutingAsyncConsistentMap(AsyncConsistentMap delegateMap, Executor executor) { - super(delegateMap, executor); + public ExecutingAsyncConsistentMap( + AsyncConsistentMap delegateMap, Executor orderedExecutor, Executor threadPoolExecutor) { + super(delegateMap, orderedExecutor, threadPoolExecutor); this.delegateMap = delegateMap; - this.executor = executor; } @Override public CompletableFuture size() { - return Tools.asyncFuture(delegateMap.size(), executor); + return asyncFuture(delegateMap.size()); } @Override public CompletableFuture containsKey(K key) { - return Tools.asyncFuture(delegateMap.containsKey(key), executor); + return asyncFuture(delegateMap.containsKey(key)); } @Override public CompletableFuture containsValue(V value) { - return Tools.asyncFuture(delegateMap.containsValue(value), executor); + return asyncFuture(delegateMap.containsValue(value)); } @Override public CompletableFuture> get(K key) { - return Tools.asyncFuture(delegateMap.get(key), executor); + return asyncFuture(delegateMap.get(key)); } @Override public CompletableFuture> getOrDefault(K key, V defaultValue) { - return Tools.asyncFuture(delegateMap.getOrDefault(key, defaultValue), executor); + return asyncFuture(delegateMap.getOrDefault(key, defaultValue)); } @Override public CompletableFuture> computeIf( K key, Predicate condition, BiFunction remappingFunction) { - return Tools.asyncFuture(delegateMap.computeIf(key, condition, remappingFunction), executor); + return asyncFuture(delegateMap.computeIf(key, condition, remappingFunction)); } @Override public CompletableFuture> put(K key, V value) { - return Tools.asyncFuture(delegateMap.put(key, value), executor); + return asyncFuture(delegateMap.put(key, value)); } @Override public CompletableFuture> putAndGet(K key, V value) { - return Tools.asyncFuture(delegateMap.putAndGet(key, value), executor); + return asyncFuture(delegateMap.putAndGet(key, value)); } @Override public CompletableFuture> remove(K key) { - return Tools.asyncFuture(delegateMap.remove(key), executor); + return asyncFuture(delegateMap.remove(key)); } @Override public CompletableFuture clear() { - return Tools.asyncFuture(delegateMap.clear(), executor); + return asyncFuture(delegateMap.clear()); } @Override public CompletableFuture> keySet() { - return Tools.asyncFuture(delegateMap.keySet(), executor); + return asyncFuture(delegateMap.keySet()); } @Override public CompletableFuture>> values() { - return Tools.asyncFuture(delegateMap.values(), executor); + return asyncFuture(delegateMap.values()); } @Override public CompletableFuture>>> entrySet() { - return Tools.asyncFuture(delegateMap.entrySet(), executor); + return asyncFuture(delegateMap.entrySet()); } @Override public CompletableFuture> putIfAbsent(K key, V value) { - return Tools.asyncFuture(delegateMap.putIfAbsent(key, value), executor); + return asyncFuture(delegateMap.putIfAbsent(key, value)); } @Override public CompletableFuture remove(K key, V value) { - return Tools.asyncFuture(delegateMap.remove(key, value), executor); + return asyncFuture(delegateMap.remove(key, value)); } @Override public CompletableFuture remove(K key, long version) { - return Tools.asyncFuture(delegateMap.remove(key, version), executor); + return asyncFuture(delegateMap.remove(key, version)); } @Override public CompletableFuture> replace(K key, V value) { - return Tools.asyncFuture(delegateMap.replace(key, value), executor); + return asyncFuture(delegateMap.replace(key, value)); } @Override public CompletableFuture replace(K key, V oldValue, V newValue) { - return Tools.asyncFuture(delegateMap.replace(key, oldValue, newValue), executor); + return asyncFuture(delegateMap.replace(key, oldValue, newValue)); } @Override public CompletableFuture replace(K key, long oldVersion, V newValue) { - return Tools.asyncFuture(delegateMap.replace(key, oldVersion, newValue), executor); + return asyncFuture(delegateMap.replace(key, oldVersion, newValue)); } @Override public CompletableFuture begin(TransactionId transactionId) { - return Tools.asyncFuture(delegateMap.begin(transactionId), executor); + return asyncFuture(delegateMap.begin(transactionId)); } @Override public CompletableFuture prepare(TransactionLog> transactionLog) { - return Tools.asyncFuture(delegateMap.prepare(transactionLog), executor); + return asyncFuture(delegateMap.prepare(transactionLog)); } @Override public CompletableFuture commit(TransactionId transactionId) { - return Tools.asyncFuture(delegateMap.commit(transactionId), executor); + return asyncFuture(delegateMap.commit(transactionId)); } @Override public CompletableFuture rollback(TransactionId transactionId) { - return Tools.asyncFuture(delegateMap.rollback(transactionId), executor); + return asyncFuture(delegateMap.rollback(transactionId)); } @Override public CompletableFuture prepareAndCommit(TransactionLog> transactionLog) { - return Tools.asyncFuture(delegateMap.prepareAndCommit(transactionLog), executor); + return asyncFuture(delegateMap.prepareAndCommit(transactionLog)); } @Override public CompletableFuture addListener(MapEventListener listener) { - return addListener(listener, executor); + return addListener(listener); } @Override public CompletableFuture addListener(MapEventListener listener, Executor executor) { - return Tools.asyncFuture(delegateMap.addListener(listener, executor), this.executor); + return asyncFuture(delegateMap.addListener(listener, executor)); } @Override public CompletableFuture removeListener(MapEventListener listener) { - return Tools.asyncFuture(delegateMap.removeListener(listener), executor); + return asyncFuture(delegateMap.removeListener(listener)); } } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java index a2b308a481..6ec0a6938a 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java @@ -22,7 +22,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import com.google.common.collect.Multiset; -import org.onlab.util.Tools; import org.onosproject.store.service.AsyncConsistentMultimap; import org.onosproject.store.service.Versioned; @@ -33,101 +32,100 @@ import org.onosproject.store.service.Versioned; public class ExecutingAsyncConsistentMultimap extends ExecutingDistributedPrimitive implements AsyncConsistentMultimap { private final AsyncConsistentMultimap delegateMap; - private final Executor executor; - public ExecutingAsyncConsistentMultimap(AsyncConsistentMultimap delegateMap, Executor executor) { - super(delegateMap, executor); + public ExecutingAsyncConsistentMultimap( + AsyncConsistentMultimap delegateMap, Executor orderedExecutor, Executor threadPoolExecutor) { + super(delegateMap, orderedExecutor, threadPoolExecutor); this.delegateMap = delegateMap; - this.executor = executor; } @Override public CompletableFuture size() { - return Tools.asyncFuture(delegateMap.size(), executor); + return asyncFuture(delegateMap.size()); } @Override public CompletableFuture isEmpty() { - return Tools.asyncFuture(delegateMap.isEmpty(), executor); + return asyncFuture(delegateMap.isEmpty()); } @Override public CompletableFuture containsKey(K key) { - return Tools.asyncFuture(delegateMap.containsKey(key), executor); + return asyncFuture(delegateMap.containsKey(key)); } @Override public CompletableFuture containsValue(V value) { - return Tools.asyncFuture(delegateMap.containsValue(value), executor); + return asyncFuture(delegateMap.containsValue(value)); } @Override public CompletableFuture containsEntry(K key, V value) { - return Tools.asyncFuture(delegateMap.containsEntry(key, value), executor); + return asyncFuture(delegateMap.containsEntry(key, value)); } @Override public CompletableFuture put(K key, V value) { - return Tools.asyncFuture(delegateMap.put(key, value), executor); + return asyncFuture(delegateMap.put(key, value)); } @Override public CompletableFuture remove(K key, V value) { - return Tools.asyncFuture(delegateMap.remove(key, value), executor); + return asyncFuture(delegateMap.remove(key, value)); } @Override public CompletableFuture removeAll(K key, Collection values) { - return Tools.asyncFuture(delegateMap.removeAll(key, values), executor); + return asyncFuture(delegateMap.removeAll(key, values)); } @Override public CompletableFuture>> removeAll(K key) { - return Tools.asyncFuture(delegateMap.removeAll(key), executor); + return asyncFuture(delegateMap.removeAll(key)); } @Override public CompletableFuture putAll(K key, Collection values) { - return Tools.asyncFuture(delegateMap.putAll(key, values), executor); + return asyncFuture(delegateMap.putAll(key, values)); } @Override public CompletableFuture>> replaceValues(K key, Collection values) { - return Tools.asyncFuture(delegateMap.replaceValues(key, values), executor); + return asyncFuture(delegateMap.replaceValues(key, values)); } @Override public CompletableFuture clear() { - return Tools.asyncFuture(delegateMap.clear(), executor); + return asyncFuture(delegateMap.clear()); } @Override public CompletableFuture>> get(K key) { - return Tools.asyncFuture(delegateMap.get(key), executor); + return asyncFuture(delegateMap.get(key)); } @Override public CompletableFuture> keySet() { - return Tools.asyncFuture(delegateMap.keySet(), executor); + return asyncFuture(delegateMap.keySet()); } @Override public CompletableFuture> keys() { - return Tools.asyncFuture(delegateMap.keys(), executor); + return asyncFuture(delegateMap.keys()); } @Override public CompletableFuture> values() { - return Tools.asyncFuture(delegateMap.values(), executor); + return asyncFuture(delegateMap.values()); } @Override public CompletableFuture>> entries() { - return Tools.asyncFuture(delegateMap.entries(), executor); + return asyncFuture(delegateMap.entries()); } @Override public CompletableFuture>> asMap() { - return Tools.asyncFuture(delegateMap.asMap(), executor); + return asyncFuture(delegateMap.asMap()); } } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java index 14419410e9..3a4fe85e54 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java @@ -25,7 +25,6 @@ import java.util.concurrent.Executor; import java.util.function.BiFunction; import java.util.function.Predicate; -import org.onlab.util.Tools; import org.onosproject.store.primitives.MapUpdate; import org.onosproject.store.primitives.TransactionId; import org.onosproject.store.service.AsyncConsistentTreeMap; @@ -41,229 +40,228 @@ import org.onosproject.store.service.Versioned; public class ExecutingAsyncConsistentTreeMap extends ExecutingDistributedPrimitive implements AsyncConsistentTreeMap { private final AsyncConsistentTreeMap delegateMap; - private final Executor executor; - public ExecutingAsyncConsistentTreeMap(AsyncConsistentTreeMap delegateMap, Executor executor) { - super(delegateMap, executor); + public ExecutingAsyncConsistentTreeMap( + AsyncConsistentTreeMap delegateMap, Executor orderedExecutor, Executor threadPoolExecutor) { + super(delegateMap, orderedExecutor, threadPoolExecutor); this.delegateMap = delegateMap; - this.executor = executor; } @Override public CompletableFuture firstKey() { - return Tools.asyncFuture(delegateMap.firstKey(), executor); + return asyncFuture(delegateMap.firstKey()); } @Override public CompletableFuture lastKey() { - return Tools.asyncFuture(delegateMap.lastKey(), executor); + return asyncFuture(delegateMap.lastKey()); } @Override public CompletableFuture>> ceilingEntry(String key) { - return Tools.asyncFuture(delegateMap.ceilingEntry(key), executor); + return asyncFuture(delegateMap.ceilingEntry(key)); } @Override public CompletableFuture>> floorEntry(String key) { - return Tools.asyncFuture(delegateMap.floorEntry(key), executor); + return asyncFuture(delegateMap.floorEntry(key)); } @Override public CompletableFuture>> higherEntry(String key) { - return Tools.asyncFuture(delegateMap.higherEntry(key), executor); + return asyncFuture(delegateMap.higherEntry(key)); } @Override public CompletableFuture>> lowerEntry(String key) { - return Tools.asyncFuture(delegateMap.lowerEntry(key), executor); + return asyncFuture(delegateMap.lowerEntry(key)); } @Override public CompletableFuture>> firstEntry() { - return Tools.asyncFuture(delegateMap.firstEntry(), executor); + return asyncFuture(delegateMap.firstEntry()); } @Override public CompletableFuture size() { - return Tools.asyncFuture(delegateMap.size(), executor); + return asyncFuture(delegateMap.size()); } @Override public CompletableFuture>> lastEntry() { - return Tools.asyncFuture(delegateMap.lastEntry(), executor); + return asyncFuture(delegateMap.lastEntry()); } @Override public CompletableFuture>> pollFirstEntry() { - return Tools.asyncFuture(delegateMap.pollFirstEntry(), executor); + return asyncFuture(delegateMap.pollFirstEntry()); } @Override public CompletableFuture containsKey(String key) { - return Tools.asyncFuture(delegateMap.containsKey(key), executor); + return asyncFuture(delegateMap.containsKey(key)); } @Override public CompletableFuture>> pollLastEntry() { - return Tools.asyncFuture(delegateMap.pollLastEntry(), executor); + return asyncFuture(delegateMap.pollLastEntry()); } @Override public CompletableFuture lowerKey(String key) { - return Tools.asyncFuture(delegateMap.lowerKey(key), executor); + return asyncFuture(delegateMap.lowerKey(key)); } @Override public CompletableFuture containsValue(V value) { - return Tools.asyncFuture(delegateMap.containsValue(value), executor); + return asyncFuture(delegateMap.containsValue(value)); } @Override public CompletableFuture floorKey(String key) { - return Tools.asyncFuture(delegateMap.floorKey(key), executor); + return asyncFuture(delegateMap.floorKey(key)); } @Override public CompletableFuture ceilingKey(String key) { - return Tools.asyncFuture(delegateMap.ceilingKey(key), executor); + return asyncFuture(delegateMap.ceilingKey(key)); } @Override public CompletableFuture> get(String key) { - return Tools.asyncFuture(delegateMap.get(key), executor); + return asyncFuture(delegateMap.get(key)); } @Override public CompletableFuture> getOrDefault(String key, V defaultValue) { - return Tools.asyncFuture(delegateMap.getOrDefault(key, defaultValue), executor); + return asyncFuture(delegateMap.getOrDefault(key, defaultValue)); } @Override public CompletableFuture higherKey(String key) { - return Tools.asyncFuture(delegateMap.higherKey(key), executor); + return asyncFuture(delegateMap.higherKey(key)); } @Override public CompletableFuture> navigableKeySet() { - return Tools.asyncFuture(delegateMap.navigableKeySet(), executor); + return asyncFuture(delegateMap.navigableKeySet()); } @Override public CompletableFuture> subMap( String upperKey, String lowerKey, boolean inclusiveUpper, boolean inclusiveLower) { - return Tools.asyncFuture(delegateMap.subMap(upperKey, lowerKey, inclusiveUpper, inclusiveLower), executor); + return asyncFuture(delegateMap.subMap(upperKey, lowerKey, inclusiveUpper, inclusiveLower)); } @Override public CompletableFuture> computeIf( String key, Predicate condition, BiFunction remappingFunction) { - return Tools.asyncFuture(delegateMap.computeIf(key, condition, remappingFunction), executor); + return asyncFuture(delegateMap.computeIf(key, condition, remappingFunction)); } @Override public CompletableFuture> put(String key, V value) { - return Tools.asyncFuture(delegateMap.put(key, value), executor); + return asyncFuture(delegateMap.put(key, value)); } @Override public CompletableFuture> putAndGet(String key, V value) { - return Tools.asyncFuture(delegateMap.putAndGet(key, value), executor); + return asyncFuture(delegateMap.putAndGet(key, value)); } @Override public CompletableFuture> remove(String key) { - return Tools.asyncFuture(delegateMap.remove(key), executor); + return asyncFuture(delegateMap.remove(key)); } @Override public CompletableFuture clear() { - return Tools.asyncFuture(delegateMap.clear(), executor); + return asyncFuture(delegateMap.clear()); } @Override public CompletableFuture> keySet() { - return Tools.asyncFuture(delegateMap.keySet(), executor); + return asyncFuture(delegateMap.keySet()); } @Override public CompletableFuture>> values() { - return Tools.asyncFuture(delegateMap.values(), executor); + return asyncFuture(delegateMap.values()); } @Override public CompletableFuture>>> entrySet() { - return Tools.asyncFuture(delegateMap.entrySet(), executor); + return asyncFuture(delegateMap.entrySet()); } @Override public CompletableFuture> putIfAbsent(String key, V value) { - return Tools.asyncFuture(delegateMap.putIfAbsent(key, value), executor); + return asyncFuture(delegateMap.putIfAbsent(key, value)); } @Override public CompletableFuture remove(String key, V value) { - return Tools.asyncFuture(delegateMap.remove(key, value), executor); + return asyncFuture(delegateMap.remove(key, value)); } @Override public CompletableFuture remove(String key, long version) { - return Tools.asyncFuture(delegateMap.remove(key, version), executor); + return asyncFuture(delegateMap.remove(key, version)); } @Override public CompletableFuture> replace(String key, V value) { - return Tools.asyncFuture(delegateMap.replace(key, value), executor); + return asyncFuture(delegateMap.replace(key, value)); } @Override public CompletableFuture replace(String key, V oldValue, V newValue) { - return Tools.asyncFuture(delegateMap.replace(key, oldValue, newValue), executor); + return asyncFuture(delegateMap.replace(key, oldValue, newValue)); } @Override public CompletableFuture replace(String key, long oldVersion, V newValue) { - return Tools.asyncFuture(delegateMap.replace(key, oldVersion, newValue), executor); + return asyncFuture(delegateMap.replace(key, oldVersion, newValue)); } @Override public CompletableFuture begin(TransactionId transactionId) { - return Tools.asyncFuture(delegateMap.begin(transactionId), executor); + return asyncFuture(delegateMap.begin(transactionId)); } @Override public CompletableFuture prepare(TransactionLog> transactionLog) { - return Tools.asyncFuture(delegateMap.prepare(transactionLog), executor); + return asyncFuture(delegateMap.prepare(transactionLog)); } @Override public CompletableFuture commit(TransactionId transactionId) { - return Tools.asyncFuture(delegateMap.commit(transactionId), executor); + return asyncFuture(delegateMap.commit(transactionId)); } @Override public CompletableFuture rollback(TransactionId transactionId) { - return Tools.asyncFuture(delegateMap.rollback(transactionId), executor); + return asyncFuture(delegateMap.rollback(transactionId)); } @Override public CompletableFuture prepareAndCommit(TransactionLog> transactionLog) { - return Tools.asyncFuture(delegateMap.prepareAndCommit(transactionLog), executor); + return asyncFuture(delegateMap.prepareAndCommit(transactionLog)); } @Override public CompletableFuture addListener(MapEventListener listener) { - return addListener(listener, executor); + return addListener(listener); } @Override public CompletableFuture addListener(MapEventListener listener, Executor executor) { - return Tools.asyncFuture(delegateMap.addListener(listener, executor), this.executor); + return asyncFuture(delegateMap.addListener(listener, executor)); } @Override public CompletableFuture removeListener(MapEventListener listener) { - return Tools.asyncFuture(delegateMap.removeListener(listener), executor); + return asyncFuture(delegateMap.removeListener(listener)); } } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java index 352ee7a4da..f6fc3d106b 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java @@ -20,7 +20,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import com.google.common.collect.Maps; -import org.onlab.util.Tools; import org.onosproject.store.service.AsyncDocumentTree; import org.onosproject.store.service.DocumentPath; import org.onosproject.store.service.DocumentTreeListener; @@ -32,13 +31,14 @@ import org.onosproject.store.service.Versioned; */ public class ExecutingAsyncDocumentTree extends ExecutingDistributedPrimitive implements AsyncDocumentTree { private final AsyncDocumentTree delegateTree; - private final Executor executor; + private final Executor orderedExecutor; private final Map, DocumentTreeListener> listenerMap = Maps.newConcurrentMap(); - public ExecutingAsyncDocumentTree(AsyncDocumentTree delegateTree, Executor executor) { - super(delegateTree, executor); + public ExecutingAsyncDocumentTree( + AsyncDocumentTree delegateTree, Executor orderedExecutor, Executor threadPoolExecutor) { + super(delegateTree, orderedExecutor, threadPoolExecutor); this.delegateTree = delegateTree; - this.executor = executor; + this.orderedExecutor = orderedExecutor; } @Override @@ -48,56 +48,56 @@ public class ExecutingAsyncDocumentTree extends ExecutingDistributedPrimitive @Override public CompletableFuture>> getChildren(DocumentPath path) { - return Tools.asyncFuture(delegateTree.getChildren(path), executor); + return asyncFuture(delegateTree.getChildren(path)); } @Override public CompletableFuture> get(DocumentPath path) { - return Tools.asyncFuture(delegateTree.get(path), executor); + return asyncFuture(delegateTree.get(path)); } @Override public CompletableFuture> set(DocumentPath path, V value) { - return Tools.asyncFuture(delegateTree.set(path, value), executor); + return asyncFuture(delegateTree.set(path, value)); } @Override public CompletableFuture create(DocumentPath path, V value) { - return Tools.asyncFuture(delegateTree.create(path, value), executor); + return asyncFuture(delegateTree.create(path, value)); } @Override public CompletableFuture createRecursive(DocumentPath path, V value) { - return Tools.asyncFuture(delegateTree.createRecursive(path, value), executor); + return asyncFuture(delegateTree.createRecursive(path, value)); } @Override public CompletableFuture replace(DocumentPath path, V newValue, long version) { - return Tools.asyncFuture(delegateTree.replace(path, newValue, version), executor); + return asyncFuture(delegateTree.replace(path, newValue, version)); } @Override public CompletableFuture replace(DocumentPath path, V newValue, V currentValue) { - return Tools.asyncFuture(delegateTree.replace(path, newValue, currentValue), executor); + return asyncFuture(delegateTree.replace(path, newValue, currentValue)); } @Override public CompletableFuture> removeNode(DocumentPath path) { - return Tools.asyncFuture(delegateTree.removeNode(path), executor); + return asyncFuture(delegateTree.removeNode(path)); } @Override public CompletableFuture addListener(DocumentPath path, DocumentTreeListener listener) { - DocumentTreeListener wrappedListener = e -> executor.execute(() -> listener.event(e)); + DocumentTreeListener wrappedListener = e -> orderedExecutor.execute(() -> listener.event(e)); listenerMap.put(listener, wrappedListener); - return Tools.asyncFuture(delegateTree.addListener(path, wrappedListener), executor); + return asyncFuture(delegateTree.addListener(path, wrappedListener)); } @Override public CompletableFuture removeListener(DocumentTreeListener listener) { DocumentTreeListener wrappedListener = listenerMap.remove(listener); if (wrappedListener != null) { - return Tools.asyncFuture(delegateTree.removeListener(wrappedListener), executor); + return asyncFuture(delegateTree.removeListener(wrappedListener)); } return CompletableFuture.completedFuture(null); } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java index ecc5b8d0d2..ba7cb812dc 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java @@ -21,7 +21,6 @@ import java.util.concurrent.Executor; import java.util.function.Consumer; import com.google.common.collect.Maps; -import org.onlab.util.Tools; import org.onosproject.cluster.Leadership; import org.onosproject.cluster.NodeId; import org.onosproject.event.Change; @@ -33,62 +32,63 @@ import org.onosproject.store.service.AsyncLeaderElector; */ public class ExecutingAsyncLeaderElector extends ExecutingDistributedPrimitive implements AsyncLeaderElector { private final AsyncLeaderElector delegateElector; - private final Executor executor; + private final Executor orderedExecutor; private final Map>, Consumer>> listenerMap = Maps.newConcurrentMap(); - public ExecutingAsyncLeaderElector(AsyncLeaderElector delegateElector, Executor executor) { - super(delegateElector, executor); + public ExecutingAsyncLeaderElector( + AsyncLeaderElector delegateElector, Executor orderedExecutor, Executor threadPoolExecutor) { + super(delegateElector, orderedExecutor, threadPoolExecutor); this.delegateElector = delegateElector; - this.executor = executor; + this.orderedExecutor = orderedExecutor; } @Override public CompletableFuture run(String topic, NodeId nodeId) { - return Tools.asyncFuture(delegateElector.run(topic, nodeId), executor); + return asyncFuture(delegateElector.run(topic, nodeId)); } @Override public CompletableFuture withdraw(String topic) { - return Tools.asyncFuture(delegateElector.withdraw(topic), executor); + return asyncFuture(delegateElector.withdraw(topic)); } @Override public CompletableFuture anoint(String topic, NodeId nodeId) { - return Tools.asyncFuture(delegateElector.anoint(topic, nodeId), executor); + return asyncFuture(delegateElector.anoint(topic, nodeId)); } @Override public CompletableFuture evict(NodeId nodeId) { - return Tools.asyncFuture(delegateElector.evict(nodeId), executor); + return asyncFuture(delegateElector.evict(nodeId)); } @Override public CompletableFuture promote(String topic, NodeId nodeId) { - return Tools.asyncFuture(delegateElector.promote(topic, nodeId), executor); + return asyncFuture(delegateElector.promote(topic, nodeId)); } @Override public CompletableFuture getLeadership(String topic) { - return Tools.asyncFuture(delegateElector.getLeadership(topic), executor); + return asyncFuture(delegateElector.getLeadership(topic)); } @Override public CompletableFuture> getLeaderships() { - return Tools.asyncFuture(delegateElector.getLeaderships(), executor); + return asyncFuture(delegateElector.getLeaderships()); } @Override public CompletableFuture addChangeListener(Consumer> listener) { - Consumer> wrappedListener = e -> executor.execute(() -> listener.accept(e)); + Consumer> wrappedListener = e -> orderedExecutor.execute(() -> listener.accept(e)); listenerMap.put(listener, wrappedListener); - return Tools.asyncFuture(delegateElector.addChangeListener(wrappedListener), executor); + return asyncFuture(delegateElector.addChangeListener(wrappedListener)); } @Override public CompletableFuture removeChangeListener(Consumer> listener) { Consumer> wrappedListener = listenerMap.remove(listener); if (wrappedListener != null) { - return Tools.asyncFuture(delegateElector.removeChangeListener(wrappedListener), executor); + return asyncFuture(delegateElector.removeChangeListener(wrappedListener)); } return CompletableFuture.completedFuture(null); } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java index 021dbe5556..836a6824cb 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java @@ -32,24 +32,38 @@ import static com.google.common.base.Preconditions.checkNotNull; public abstract class ExecutingDistributedPrimitive extends DelegatingDistributedPrimitive { private final DistributedPrimitive primitive; - private final Executor executor; + private final Executor orderedExecutor; + private final Executor threadPoolExecutor; private final Map, Consumer> listenerMap = Maps.newConcurrentMap(); - protected ExecutingDistributedPrimitive(DistributedPrimitive primitive, Executor executor) { + protected ExecutingDistributedPrimitive( + DistributedPrimitive primitive, Executor orderedExecutor, Executor threadPoolExecutor) { super(primitive); this.primitive = primitive; - this.executor = checkNotNull(executor); + this.orderedExecutor = checkNotNull(orderedExecutor); + this.threadPoolExecutor = checkNotNull(threadPoolExecutor); + } + + /** + * Creates a future to be completed asynchronously on the provided ordered and thread pool executors. + * + * @param future the future to be completed asynchronously + * @param future result type + * @return a new {@link CompletableFuture} to be completed asynchronously using the primitive thread model + */ + protected CompletableFuture asyncFuture(CompletableFuture future) { + return Tools.orderedFuture(future, orderedExecutor, threadPoolExecutor); } @Override public CompletableFuture destroy() { - return Tools.asyncFuture(primitive.destroy(), executor); + return asyncFuture(primitive.destroy()); } @Override public void addStatusChangeListener(Consumer listener) { Consumer wrappedListener = - status -> executor.execute(() -> listener.accept(status)); + status -> orderedExecutor.execute(() -> listener.accept(status)); listenerMap.put(listener, wrappedListener); primitive.addStatusChangeListener(wrappedListener); } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java index 301f73e1a0..e6290b89b6 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java @@ -20,7 +20,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.function.Consumer; -import org.onlab.util.Tools; import org.onosproject.store.service.AsyncAtomicValue; import org.onosproject.store.service.Task; import org.onosproject.store.service.WorkQueue; @@ -32,44 +31,40 @@ import org.onosproject.store.service.WorkQueueStats; */ public class ExecutingWorkQueue extends ExecutingDistributedPrimitive implements WorkQueue { private final WorkQueue delegateQueue; - private final Executor executor; - public ExecutingWorkQueue(WorkQueue delegateQueue, Executor executor) { - super(delegateQueue, executor); + public ExecutingWorkQueue(WorkQueue delegateQueue, Executor orderedExecutor, Executor threadPoolExecutor) { + super(delegateQueue, orderedExecutor, threadPoolExecutor); this.delegateQueue = delegateQueue; - this.executor = executor; } @Override public CompletableFuture addMultiple(Collection items) { - return Tools.asyncFuture(delegateQueue.addMultiple(items), executor); + return asyncFuture(delegateQueue.addMultiple(items)); } @Override public CompletableFuture>> take(int maxItems) { - return Tools.asyncFuture(delegateQueue.take(maxItems), executor); + return asyncFuture(delegateQueue.take(maxItems)); } @Override public CompletableFuture complete(Collection taskIds) { - return Tools.asyncFuture(delegateQueue.complete(taskIds), executor); + return asyncFuture(delegateQueue.complete(taskIds)); } @Override public CompletableFuture registerTaskProcessor( Consumer taskProcessor, int parallelism, Executor executor) { - return Tools.asyncFuture( - delegateQueue.registerTaskProcessor(taskProcessor, parallelism, executor), - this.executor); + return asyncFuture(delegateQueue.registerTaskProcessor(taskProcessor, parallelism, executor)); } @Override public CompletableFuture stopProcessing() { - return Tools.asyncFuture(delegateQueue.stopProcessing(), executor); + return asyncFuture(delegateQueue.stopProcessing()); } @Override public CompletableFuture stats() { - return Tools.asyncFuture(delegateQueue.stats(), executor); + return asyncFuture(delegateQueue.stats()); } } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java index 83547aeaf9..f06b052e0f 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java @@ -15,6 +15,14 @@ */ package org.onosproject.store.primitives.impl; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + import com.google.common.base.Suppliers; import io.atomix.AtomixClient; import io.atomix.catalyst.transport.Transport; @@ -30,7 +38,7 @@ import io.atomix.resource.ResourceRegistry; import io.atomix.resource.ResourceType; import io.atomix.variables.DistributedLong; import org.onlab.util.HexString; -import org.onlab.util.SerialExecutor; +import org.onlab.util.OrderedExecutor; import org.onosproject.store.primitives.DistributedPrimitiveCreator; import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMap; import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap; @@ -56,14 +64,6 @@ import org.onosproject.store.service.Serializer; import org.onosproject.store.service.WorkQueue; import org.slf4j.Logger; -import java.util.Collection; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; - import static org.slf4j.LoggerFactory.getLogger; /** @@ -137,7 +137,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana * @return the executor */ private Executor defaultExecutor(Supplier executorSupplier) { - return executorSupplier != null ? executorSupplier.get() : new SerialExecutor(sharedExecutor); + return executorSupplier != null ? executorSupplier.get() : new OrderedExecutor(sharedExecutor); } @Override @@ -165,7 +165,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana value -> value == null ? null : serializer.encode(value), bytes -> serializer.decode(bytes)); - return new ExecutingAsyncConsistentMap<>(transcodedMap, defaultExecutor(executorSupplier)); + return new ExecutingAsyncConsistentMap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor); } @Override @@ -193,7 +193,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana value -> value == null ? null : serializer.encode(value), bytes -> serializer.decode(bytes)); - return new ExecutingAsyncConsistentTreeMap<>(transcodedMap, defaultExecutor(executorSupplier)); + return new ExecutingAsyncConsistentTreeMap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor); } @Override @@ -225,7 +225,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana value -> serializer.encode(value), bytes -> serializer.decode(bytes)); - return new ExecutingAsyncConsistentMultimap<>(transcodedMap, defaultExecutor(executorSupplier)); + return new ExecutingAsyncConsistentMultimap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor); } @Override @@ -247,28 +247,28 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana key -> HexString.toHexString(serializer.encode(key)), string -> serializer.decode(HexString.fromHexString(string))); - return new ExecutingAsyncAtomicCounterMap<>(transcodedMap, defaultExecutor(executorSupplier)); + return new ExecutingAsyncAtomicCounterMap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor); } @Override public AsyncAtomicCounter newAsyncCounter(String name, Supplier executorSupplier) { DistributedLong distributedLong = client.getLong(name).join(); AsyncAtomicCounter asyncCounter = new AtomixCounter(name, distributedLong); - return new ExecutingAsyncAtomicCounter(asyncCounter, defaultExecutor(executorSupplier)); + return new ExecutingAsyncAtomicCounter(asyncCounter, defaultExecutor(executorSupplier), sharedExecutor); } @Override public AsyncAtomicValue newAsyncAtomicValue( String name, Serializer serializer, Supplier executorSupplier) { AsyncAtomicValue asyncValue = new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get()); - return new ExecutingAsyncAtomicValue<>(asyncValue, defaultExecutor(executorSupplier)); + return new ExecutingAsyncAtomicValue<>(asyncValue, defaultExecutor(executorSupplier), sharedExecutor); } @Override public WorkQueue newWorkQueue(String name, Serializer serializer, Supplier executorSupplier) { AtomixWorkQueue atomixWorkQueue = client.getResource(name, AtomixWorkQueue.class).join(); WorkQueue workQueue = new DefaultDistributedWorkQueue<>(atomixWorkQueue, serializer); - return new ExecutingWorkQueue<>(workQueue, defaultExecutor(executorSupplier)); + return new ExecutingWorkQueue<>(workQueue, defaultExecutor(executorSupplier), sharedExecutor); } @Override @@ -277,7 +277,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana AtomixDocumentTree atomixDocumentTree = client.getResource(name, AtomixDocumentTree.class).join(); AsyncDocumentTree asyncDocumentTree = new DefaultDistributedDocumentTree<>( name, atomixDocumentTree, serializer); - return new ExecutingAsyncDocumentTree<>(asyncDocumentTree, defaultExecutor(executorSupplier)); + return new ExecutingAsyncDocumentTree<>(asyncDocumentTree, defaultExecutor(executorSupplier), sharedExecutor); } @Override @@ -285,12 +285,10 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class) .thenCompose(AtomixLeaderElector::setupCache) .join(); - Consumer statusListener = state -> { - leaderElector.statusChangeListeners() - .forEach(listener -> listener.accept(mapper.apply(state))); - }; + Consumer statusListener = state -> leaderElector.statusChangeListeners() + .forEach(listener -> listener.accept(mapper.apply(state))); resourceClient.client().onStateChange(statusListener); - return new ExecutingAsyncLeaderElector(leaderElector, defaultExecutor(executorSupplier)); + return new ExecutingAsyncLeaderElector(leaderElector, defaultExecutor(executorSupplier), sharedExecutor); } @Override diff --git a/utils/misc/src/main/java/org/onlab/util/BestEffortSerialExecutor.java b/utils/misc/src/main/java/org/onlab/util/BestEffortSerialExecutor.java deleted file mode 100644 index 936a33ff2a..0000000000 --- a/utils/misc/src/main/java/org/onlab/util/BestEffortSerialExecutor.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2017-present Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.util; - -import java.util.LinkedList; -import java.util.concurrent.Executor; - -/** - * Executor that executes tasks in serial on a shared thread pool, falling back to parallel execution when threads - * are blocked. - *

- * This executor attempts to execute tasks in serial as if they occur on a single thread. However, in the event tasks - * are blocking a thread (a thread is in the {@link Thread.State#WAITING} or {@link Thread.State#TIMED_WAITING} state) - * the executor will execute tasks on parallel on the underlying {@link Executor}. This is useful for ensuring blocked - * threads cannot block events, but mimics a single-threaded model otherwise. - */ -public class BestEffortSerialExecutor implements Executor { - private final Executor parent; - private final LinkedList tasks = new LinkedList<>(); - private volatile Thread thread; - - public BestEffortSerialExecutor(Executor parent) { - this.parent = parent; - } - - private void run() { - synchronized (tasks) { - thread = Thread.currentThread(); - } - for (;;) { - if (!runTask()) { - synchronized (tasks) { - thread = null; - } - return; - } - } - } - - private boolean runTask() { - final Runnable task; - synchronized (tasks) { - task = tasks.poll(); - if (task == null) { - return false; - } - } - task.run(); - return true; - } - - @Override - public void execute(Runnable command) { - synchronized (tasks) { - tasks.add(command); - if (thread == null) { - parent.execute(this::run); - } else if (thread.getState() == Thread.State.WAITING || thread.getState() == Thread.State.TIMED_WAITING) { - parent.execute(this::runTask); - } - } - } -} diff --git a/utils/misc/src/main/java/org/onlab/util/BlockingAwareFuture.java b/utils/misc/src/main/java/org/onlab/util/BlockingAwareFuture.java new file mode 100644 index 0000000000..51f1809dae --- /dev/null +++ b/utils/misc/src/main/java/org/onlab/util/BlockingAwareFuture.java @@ -0,0 +1,291 @@ +/* + * Copyright 2017-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.util; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A {@link CompletableFuture} that tracks whether the future or one of its descendants has been blocked on + * a {@link CompletableFuture#get()} or {@link CompletableFuture#join()} call. + */ +public class BlockingAwareFuture extends CompletableFuture { + private final AtomicBoolean blocked; + + public BlockingAwareFuture() { + this(new AtomicBoolean()); + } + + private BlockingAwareFuture(AtomicBoolean blocked) { + this.blocked = blocked; + } + + /** + * Returns a boolean indicating whether the future is blocked. + * + * @return indicates whether the future is blocked + */ + public boolean isBlocked() { + return blocked.get(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + blocked.set(true); + try { + return super.get(); + } finally { + blocked.set(false); + } + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + blocked.set(true); + try { + return super.get(timeout, unit); + } finally { + blocked.set(false); + } + } + + @Override + public synchronized T join() { + blocked.set(true); + try { + return super.join(); + } finally { + blocked.set(false); + } + } + + /** + * Wraps the given future in a new blockable future. + * + * @param future the future to wrap + * @param the future value type + * @return a new blockable future + */ + private CompletableFuture wrap(CompletableFuture future) { + BlockingAwareFuture blockingFuture = new BlockingAwareFuture(blocked); + future.whenComplete((result, error) -> { + if (error == null) { + blockingFuture.complete(result); + } else { + blockingFuture.completeExceptionally(error); + } + }); + return blockingFuture; + } + + @Override + public CompletableFuture thenApply(Function fn) { + return wrap(super.thenApply(fn)); + } + + @Override + public CompletableFuture thenApplyAsync(Function fn) { + return wrap(super.thenApplyAsync(fn)); + } + + @Override + public CompletableFuture thenApplyAsync(Function fn, Executor executor) { + return wrap(super.thenApplyAsync(fn, executor)); + } + + @Override + public CompletableFuture thenAccept(Consumer action) { + return wrap(super.thenAccept(action)); + } + + @Override + public CompletableFuture thenAcceptAsync(Consumer action) { + return wrap(super.thenAcceptAsync(action)); + } + + @Override + public CompletableFuture thenAcceptAsync(Consumer action, Executor executor) { + return wrap(super.thenAcceptAsync(action, executor)); + } + + @Override + public CompletableFuture thenRun(Runnable action) { + return wrap(super.thenRun(action)); + } + + @Override + public CompletableFuture thenRunAsync(Runnable action) { + return wrap(super.thenRunAsync(action)); + } + + @Override + public CompletableFuture thenRunAsync(Runnable action, Executor executor) { + return wrap(super.thenRunAsync(action, executor)); + } + + @Override + public CompletableFuture thenCombine( + CompletionStage other, BiFunction fn) { + return wrap(super.thenCombine(other, fn)); + } + + @Override + public CompletableFuture thenCombineAsync( + CompletionStage other, BiFunction fn) { + return wrap(super.thenCombineAsync(other, fn)); + } + + @Override + public CompletableFuture thenCombineAsync( + CompletionStage other, BiFunction fn, Executor executor) { + return wrap(super.thenCombineAsync(other, fn, executor)); + } + + @Override + public CompletableFuture thenAcceptBoth( + CompletionStage other, BiConsumer action) { + return wrap(super.thenAcceptBoth(other, action)); + } + + @Override + public CompletableFuture thenAcceptBothAsync( + CompletionStage other, BiConsumer action) { + return wrap(super.thenAcceptBothAsync(other, action)); + } + + @Override + public CompletableFuture thenAcceptBothAsync( + CompletionStage other, BiConsumer action, Executor executor) { + return wrap(super.thenAcceptBothAsync(other, action, executor)); + } + + @Override + public CompletableFuture runAfterBoth(CompletionStage other, Runnable action) { + return wrap(super.runAfterBoth(other, action)); + } + + @Override + public CompletableFuture runAfterBothAsync(CompletionStage other, Runnable action) { + return wrap(super.runAfterBothAsync(other, action)); + } + + @Override + public CompletableFuture runAfterBothAsync(CompletionStage other, Runnable action, Executor executor) { + return wrap(super.runAfterBothAsync(other, action, executor)); + } + + @Override + public CompletableFuture applyToEither(CompletionStage other, Function fn) { + return wrap(super.applyToEither(other, fn)); + } + + @Override + public CompletableFuture applyToEitherAsync(CompletionStage other, Function fn) { + return wrap(super.applyToEitherAsync(other, fn)); + } + + @Override + public CompletableFuture applyToEitherAsync( + CompletionStage other, Function fn, Executor executor) { + return wrap(super.applyToEitherAsync(other, fn, executor)); + } + + @Override + public CompletableFuture acceptEither(CompletionStage other, Consumer action) { + return wrap(super.acceptEither(other, action)); + } + + @Override + public CompletableFuture acceptEitherAsync(CompletionStage other, Consumer action) { + return wrap(super.acceptEitherAsync(other, action)); + } + + @Override + public CompletableFuture acceptEitherAsync( + CompletionStage other, Consumer action, Executor executor) { + return wrap(super.acceptEitherAsync(other, action, executor)); + } + + @Override + public CompletableFuture runAfterEither(CompletionStage other, Runnable action) { + return wrap(super.runAfterEither(other, action)); + } + + @Override + public CompletableFuture runAfterEitherAsync(CompletionStage other, Runnable action) { + return wrap(super.runAfterEitherAsync(other, action)); + } + + @Override + public CompletableFuture runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor) { + return wrap(super.runAfterEitherAsync(other, action, executor)); + } + + @Override + public CompletableFuture thenCompose(Function> fn) { + return wrap(super.thenCompose(fn)); + } + + @Override + public CompletableFuture thenComposeAsync(Function> fn) { + return wrap(super.thenComposeAsync(fn)); + } + + @Override + public CompletableFuture thenComposeAsync( + Function> fn, Executor executor) { + return wrap(super.thenComposeAsync(fn, executor)); + } + + @Override + public CompletableFuture whenComplete(BiConsumer action) { + return wrap(super.whenComplete(action)); + } + + @Override + public CompletableFuture whenCompleteAsync(BiConsumer action) { + return wrap(super.whenCompleteAsync(action)); + } + + @Override + public CompletableFuture whenCompleteAsync(BiConsumer action, Executor executor) { + return wrap(super.whenCompleteAsync(action, executor)); + } + + @Override + public CompletableFuture handle(BiFunction fn) { + return wrap(super.handle(fn)); + } + + @Override + public CompletableFuture handleAsync(BiFunction fn) { + return wrap(super.handleAsync(fn)); + } + + @Override + public CompletableFuture handleAsync(BiFunction fn, Executor executor) { + return wrap(super.handleAsync(fn, executor)); + } +} diff --git a/utils/misc/src/main/java/org/onlab/util/SerialExecutor.java b/utils/misc/src/main/java/org/onlab/util/OrderedExecutor.java similarity index 82% rename from utils/misc/src/main/java/org/onlab/util/SerialExecutor.java rename to utils/misc/src/main/java/org/onlab/util/OrderedExecutor.java index 9e54ac232f..8fe2cc2e41 100644 --- a/utils/misc/src/main/java/org/onlab/util/SerialExecutor.java +++ b/utils/misc/src/main/java/org/onlab/util/OrderedExecutor.java @@ -19,17 +19,17 @@ import java.util.LinkedList; import java.util.concurrent.Executor; /** - * Executor that executes tasks in serial on a shared thread pool. + * Executor that executes tasks in order on a shared thread pool. *

- * The serial executor behaves semantically like a single-threaded executor, but multiplexes tasks on a shared thread - * pool, ensuring blocked threads in the shared thread pool don't block individual serial executors. + * The ordered executor behaves semantically like a single-threaded executor, but multiplexes tasks on a shared thread + * pool, ensuring blocked threads in the shared thread pool don't block individual ordered executors. */ -public class SerialExecutor implements Executor { +public class OrderedExecutor implements Executor { private final Executor parent; private final LinkedList tasks = new LinkedList<>(); private boolean running; - public SerialExecutor(Executor parent) { + public OrderedExecutor(Executor parent) { this.parent = parent; } diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java index 883a7c8889..0c76a9198d 100644 --- a/utils/misc/src/main/java/org/onlab/util/Tools.java +++ b/utils/misc/src/main/java/org/onlab/util/Tools.java @@ -643,28 +643,41 @@ public abstract class Tools { } /** - * Returns a future that's completed using the given {@link Executor} once the given {@code future} is completed. + * Returns a future that's completed using the given {@code orderedExecutor} if the future is not blocked or the + * given {@code threadPoolExecutor} if the future is blocked. *

- * {@link CompletableFuture}'s async methods cannot be relied upon to complete futures on an executor thread. If a - * future is completed synchronously, {@code CompletableFuture} async methods will often complete the future on the - * current thread, ignoring the provided {@code Executor}. This method ensures a more reliable and consistent thread - * model by ensuring that futures are always completed using the provided {@code Executor}. + * This method allows futures to maintain single-thread semantics via the provided {@code orderedExecutor} while + * ensuring user code can block without blocking completion of futures. When the returned future or any of its + * descendants is blocked on a {@link CompletableFuture#get()} or {@link CompletableFuture#join()} call, completion + * of the returned future will be done using the provided {@code threadPoolExecutor}. * * @param future the future to convert into an asynchronous future - * @param executor the executor with which to complete the returned future + * @param orderedExecutor the ordered executor with which to attempt to complete the future + * @param threadPoolExecutor the backup executor with which to complete blocked futures * @param future value type * @return a new completable future to be completed using the provided {@code executor} once the provided * {@code future} is complete */ - public static CompletableFuture asyncFuture(CompletableFuture future, Executor executor) { - CompletableFuture newFuture = new CompletableFuture(); - future.whenComplete((result, error) -> executor.execute(() -> { - if (future.isCompletedExceptionally()) { - newFuture.completeExceptionally(error); + public static CompletableFuture orderedFuture( + CompletableFuture future, + Executor orderedExecutor, + Executor threadPoolExecutor) { + BlockingAwareFuture newFuture = new BlockingAwareFuture(); + future.whenComplete((result, error) -> { + Runnable completer = () -> { + if (future.isCompletedExceptionally()) { + newFuture.completeExceptionally(error); + } else { + newFuture.complete(result); + } + }; + + if (newFuture.isBlocked()) { + threadPoolExecutor.execute(completer); } else { - newFuture.complete(result); + orderedExecutor.execute(completer); } - })); + }); return newFuture; } diff --git a/utils/misc/src/test/java/org/onlab/util/BestEffortSerialExecutorTest.java b/utils/misc/src/test/java/org/onlab/util/BestEffortSerialExecutorTest.java deleted file mode 100644 index 39a1f87e3f..0000000000 --- a/utils/misc/src/test/java/org/onlab/util/BestEffortSerialExecutorTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2017-present Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onlab.util; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -/** - * Best effort serial executor test. - */ -public class BestEffortSerialExecutorTest { - - @Test - public void testSerialExecution() throws Throwable { - Executor executor = new BestEffortSerialExecutor(SharedExecutors.getPoolThreadExecutor()); - CountDownLatch latch = new CountDownLatch(2); - executor.execute(latch::countDown); - executor.execute(latch::countDown); - latch.await(); - assertEquals(0, latch.getCount()); - } - - @Test - public void testBlockedExecution() throws Throwable { - Executor executor = new BestEffortSerialExecutor(SharedExecutors.getPoolThreadExecutor()); - CountDownLatch latch = new CountDownLatch(3); - executor.execute(() -> { - try { - Thread.sleep(2000); - latch.countDown(); - } catch (InterruptedException e) { - } - }); - Thread.sleep(10); - executor.execute(() -> { - try { - new CompletableFuture<>().get(2, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - latch.countDown(); - } - }); - Thread.sleep(10); - executor.execute(latch::countDown); - latch.await(1, TimeUnit.SECONDS); - assertEquals(2, latch.getCount()); - latch.await(3, TimeUnit.SECONDS); - assertEquals(0, latch.getCount()); - } - -} diff --git a/utils/misc/src/test/java/org/onlab/util/BlockingAwareFutureTest.java b/utils/misc/src/test/java/org/onlab/util/BlockingAwareFutureTest.java new file mode 100644 index 0000000000..6a72b58247 --- /dev/null +++ b/utils/misc/src/test/java/org/onlab/util/BlockingAwareFutureTest.java @@ -0,0 +1,79 @@ +/* + * Copyright 2017-present Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onlab.util; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Blocking-aware future test. + */ +public class BlockingAwareFutureTest { + + /** + * Tests normal callback execution. + */ + @Test + public void testNonBlockingThread() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + Executor executor = SharedExecutors.getPoolThreadExecutor(); + BlockingAwareFuture blockingFuture = + (BlockingAwareFuture) Tools.orderedFuture(future, new OrderedExecutor(executor), executor); + CountDownLatch latch = new CountDownLatch(1); + blockingFuture.thenRun(() -> latch.countDown()); + executor.execute(() -> future.complete("foo")); + latch.await(5, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + assertEquals("foo", blockingFuture.join()); + assertFalse(blockingFuture.isBlocked()); + } + + /** + * Tests blocking an ordered thread. + */ + @Test + public void testBlockingThread() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + Executor executor = SharedExecutors.getPoolThreadExecutor(); + BlockingAwareFuture blockingFuture = + (BlockingAwareFuture) Tools.orderedFuture(future, new OrderedExecutor(executor), executor); + CountDownLatch latch = new CountDownLatch(2); + CompletableFuture wrappedFuture = blockingFuture.thenApply(v -> { + assertEquals("foo", v); + latch.countDown(); + return v; + }); + wrappedFuture.thenRun(() -> latch.countDown()); + executor.execute(() -> wrappedFuture.join()); + Thread.sleep(100); + assertTrue(blockingFuture.isBlocked()); + future.complete("foo"); + latch.await(5, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + assertEquals("foo", blockingFuture.join()); + assertEquals("foo", wrappedFuture.join()); + assertFalse(blockingFuture.isBlocked()); + } + +} diff --git a/utils/misc/src/test/java/org/onlab/util/SerialExecutorTest.java b/utils/misc/src/test/java/org/onlab/util/OrderedExecutorTest.java similarity index 87% rename from utils/misc/src/test/java/org/onlab/util/SerialExecutorTest.java rename to utils/misc/src/test/java/org/onlab/util/OrderedExecutorTest.java index 140ce1ff7f..b73fc564c5 100644 --- a/utils/misc/src/test/java/org/onlab/util/SerialExecutorTest.java +++ b/utils/misc/src/test/java/org/onlab/util/OrderedExecutorTest.java @@ -23,13 +23,13 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; /** - * Serial executor test. + * Ordered executor test. */ -public class SerialExecutorTest { +public class OrderedExecutorTest { @Test public void testSerialExecution() throws Throwable { - Executor executor = new SerialExecutor(SharedExecutors.getPoolThreadExecutor()); + Executor executor = new OrderedExecutor(SharedExecutors.getPoolThreadExecutor()); CountDownLatch latch = new CountDownLatch(2); executor.execute(latch::countDown); executor.execute(latch::countDown);