From a6d787bf410dcadbf15c47c3f4aa65303f9e063e Mon Sep 17 00:00:00 2001 From: Madan Jampani Date: Tue, 11 Aug 2015 11:02:02 -0700 Subject: [PATCH] ONOS-2440: Simplify DistributedQueue implementation by leveraging state change notification support Change-Id: Id0a48f07535d8b7e1d0f964bd1c0623ca81d4605 --- .../consistent/impl/DatabaseManager.java | 35 ---------- .../store/consistent/impl/DatabaseProxy.java | 10 ++- .../store/consistent/impl/DatabaseState.java | 5 +- .../impl/DefaultAsyncConsistentMap.java | 4 +- .../consistent/impl/DefaultDatabase.java | 7 +- .../consistent/impl/DefaultDatabaseState.java | 32 ++------- .../impl/DefaultDistributedQueue.java | 68 ++++++++----------- .../impl/DefaultDistributedQueueBuilder.java | 20 +----- .../consistent/impl/PartitionedDatabase.java | 7 +- .../consistent/impl/StateMachineUpdate.java | 13 +++- 10 files changed, 62 insertions(+), 139 deletions(-) diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java index 1bccf2eb7c..b7c3794bad 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java @@ -16,7 +16,6 @@ package org.onosproject.store.consistent.impl; -import com.google.common.base.Charsets; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -49,8 +48,6 @@ import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferencePolicy; import org.apache.felix.scr.annotations.Service; -import static org.onlab.util.Tools.groupedThreads; - import org.onosproject.app.ApplicationEvent; import org.onosproject.app.ApplicationListener; import org.onosproject.app.ApplicationService; @@ -61,7 +58,6 @@ import org.onosproject.core.IdGenerator; import org.onosproject.store.cluster.impl.ClusterDefinitionManager; import org.onosproject.store.cluster.impl.NodeInfo; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.cluster.messaging.MessageSubject; import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl; import org.onosproject.store.service.AtomicCounterBuilder; import org.onosproject.store.service.AtomicValueBuilder; @@ -86,7 +82,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -112,8 +107,6 @@ public class DatabaseManager implements StorageService, StorageAdminService { private static final int RAFT_ELECTION_TIMEOUT_MILLIS = 3000; private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000; - protected static final MessageSubject QUEUE_UPDATED_TOPIC = new MessageSubject("distributed-queue-updated"); - private ClusterCoordinator coordinator; protected PartitionedDatabase partitionedDatabase; protected Database inMemoryDatabase; @@ -122,15 +115,12 @@ public class DatabaseManager implements StorageService, StorageAdminService { private TransactionManager transactionManager; private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong(); - private ExecutorService eventDispatcher; - private ExecutorService queuePollExecutor; private ApplicationListener appListener = new InternalApplicationListener(); private final Multimap maps = Multimaps.synchronizedMultimap(ArrayListMultimap.create()); private final Multimap mapsByApplication = Multimaps.synchronizedMultimap(ArrayListMultimap.create()); - private final Map queues = Maps.newConcurrentMap(); @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ClusterService clusterService; @@ -237,21 +227,6 @@ public class DatabaseManager implements StorageService, StorageAdminService { transactionManager = new TransactionManager(partitionedDatabase, consistentMapBuilder()); partitionedDatabase.setTransactionManager(transactionManager); - eventDispatcher = Executors.newSingleThreadExecutor( - groupedThreads("onos/store/manager", "map-event-dispatcher")); - - queuePollExecutor = Executors.newFixedThreadPool(4, - groupedThreads("onos/store/manager", "queue-poll-handler")); - - clusterCommunicator.addSubscriber(QUEUE_UPDATED_TOPIC, - data -> new String(data, Charsets.UTF_8), - name -> { - DefaultDistributedQueue q = queues.get(name); - if (q != null) { - q.tryPoll(); - } - }, - queuePollExecutor); log.info("Started"); } @@ -277,13 +252,10 @@ public class DatabaseManager implements StorageService, StorageAdminService { log.info("Successfully closed databases."); } }); - clusterCommunicator.removeSubscriber(QUEUE_UPDATED_TOPIC); maps.values().forEach(this::unregisterMap); if (applicationService != null) { applicationService.removeListener(appListener); } - eventDispatcher.shutdown(); - queuePollExecutor.shutdown(); log.info("Stopped"); } @@ -467,13 +439,6 @@ public class DatabaseManager implements StorageService, StorageAdminService { } } - protected void registerQueue(DefaultDistributedQueue queue) { - // TODO: Support multiple local instances of the same queue. - if (queues.putIfAbsent(queue.name(), queue) != null) { - throw new IllegalStateException("Queue by name " + queue.name() + " already exists"); - } - } - private class InternalApplicationListener implements ApplicationListener { @Override public void event(ApplicationEvent event) { diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java index 08317b52b6..95f9e39a9a 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java @@ -21,7 +21,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import org.onosproject.cluster.NodeId; import org.onosproject.store.service.Transaction; import org.onosproject.store.service.Versioned; @@ -168,17 +167,16 @@ public interface DatabaseProxy { * Inserts an entry into the queue. * @param queueName queue name * @param entry queue entry - * @return set of nodes to notify about the queue update + * @return void future */ - CompletableFuture> queuePush(String queueName, byte[] entry); + CompletableFuture queuePush(String queueName, byte[] entry); /** * Removes an entry from the queue if the queue is non-empty. * @param queueName queue name - * @param nodeId If the queue is empty the identifier of node to notify when an entry becomes available - * @return entry. Can be null if queue is empty + * @return entry future. Can be completed with null if queue is empty */ - CompletableFuture queuePop(String queueName, NodeId nodeId); + CompletableFuture queuePop(String queueName); /** * Returns but does not remove an entry from the queue. diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java index 8b6db1e0be..b3dd1c4470 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java @@ -21,7 +21,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import org.onosproject.cluster.NodeId; import org.onosproject.store.service.Transaction; import org.onosproject.store.service.Versioned; @@ -93,10 +92,10 @@ public interface DatabaseState { byte[] queuePeek(String queueName); @Command - byte[] queuePop(String queueName, NodeId requestor); + byte[] queuePop(String queueName); @Command - Set queuePush(String queueName, byte[] entry); + void queuePush(String queueName, byte[] entry); @Query Long counterGet(String counterName); diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java index c9311c9fcc..9082ba64dd 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java @@ -47,7 +47,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkNotNull; -import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP; +import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP_UPDATE; import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.TX_COMMIT; import static org.slf4j.LoggerFactory.getLogger; @@ -122,7 +122,7 @@ public class DefaultAsyncConsistentMap implements AsyncConsistentMap { SharedExecutors.getSingleThreadExecutor().execute(() -> { - if (update.target() == MAP) { + if (update.target() == MAP_UPDATE) { Result> result = update.output(); if (result.success() && result.value().mapName().equals(name)) { MapEvent mapEvent = result.value().map(this::dK, serializer::decode).toMapEvent(); diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java index ba0b1be809..4d9776ee4a 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java @@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Supplier; -import org.onosproject.cluster.NodeId; import org.onosproject.store.service.Transaction; import org.onosproject.store.service.Versioned; @@ -159,13 +158,13 @@ public class DefaultDatabase extends AbstractResource implements Datab } @Override - public CompletableFuture> queuePush(String queueName, byte[] entry) { + public CompletableFuture queuePush(String queueName, byte[] entry) { return checkOpen(() -> proxy.queuePush(queueName, entry)); } @Override - public CompletableFuture queuePop(String queueName, NodeId nodeId) { - return checkOpen(() -> proxy.queuePop(queueName, nodeId)); + public CompletableFuture queuePop(String queueName) { + return checkOpen(() -> proxy.queuePop(queueName)); } @Override diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java index 219b8470b6..9d3505bd22 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java @@ -18,7 +18,6 @@ package org.onosproject.store.consistent.impl; import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; @@ -27,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.Set; -import org.onosproject.cluster.NodeId; import org.onosproject.store.service.DatabaseUpdate; import org.onosproject.store.service.Transaction; import org.onosproject.store.service.Versioned; @@ -48,7 +46,6 @@ public class DefaultDatabaseState implements DatabaseState { private Map counters; private Map>> maps; private Map> queues; - private Map> queueUpdateNotificationTargets; /** * This locks map has a structure similar to the "tables" map above and @@ -85,11 +82,6 @@ public class DefaultDatabaseState implements DatabaseState { queues = Maps.newConcurrentMap(); context.put("queues", queues); } - queueUpdateNotificationTargets = context.get("queueUpdateNotificationTargets"); - if (queueUpdateNotificationTargets == null) { - queueUpdateNotificationTargets = Maps.newConcurrentMap(); - context.put("queueUpdateNotificationTargets", queueUpdateNotificationTargets); - } nextVersion = context.get("nextVersion"); if (nextVersion == null) { nextVersion = new Long(0); @@ -214,27 +206,17 @@ public class DefaultDatabaseState implements DatabaseState { @Override public byte[] queuePeek(String queueName) { - Queue queue = getQueue(queueName); - return queue.peek(); + return getQueue(queueName).peek(); } @Override - public byte[] queuePop(String queueName, NodeId requestor) { - Queue queue = getQueue(queueName); - if (queue.size() == 0 && requestor != null) { - getQueueUpdateNotificationTargets(queueName).add(requestor); - return null; - } else { - return queue.remove(); - } + public byte[] queuePop(String queueName) { + return getQueue(queueName).poll(); } @Override - public Set queuePush(String queueName, byte[] entry) { - getQueue(queueName).add(entry); - Set notifyList = ImmutableSet.copyOf(getQueueUpdateNotificationTargets(queueName)); - getQueueUpdateNotificationTargets(queueName).clear(); - return notifyList; + public void queuePush(String queueName, byte[] entry) { + getQueue(queueName).offer(entry); } @Override @@ -289,10 +271,6 @@ public class DefaultDatabaseState implements DatabaseState { return queues.computeIfAbsent(queueName, name -> new LinkedList<>()); } - private Set getQueueUpdateNotificationTargets(String queueName) { - return queueUpdateNotificationTargets.computeIfAbsent(queueName, name -> new HashSet<>()); - } - private boolean isUpdatePossible(DatabaseUpdate update) { Versioned existingEntry = mapGet(update.mapName(), update.key()); switch (update.type()) { diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java index c27774a9ea..e4b264194f 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java @@ -17,15 +17,16 @@ package org.onosproject.store.consistent.impl; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; -import org.onosproject.cluster.NodeId; + +import org.onlab.util.SharedExecutors; import org.onosproject.store.service.DistributedQueue; import org.onosproject.store.service.Serializer; +import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; - import static com.google.common.base.Preconditions.checkNotNull; +import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.QUEUE_PUSH; /** * DistributedQueue implementation that provides FIFO ordering semantics. @@ -37,9 +38,7 @@ public class DefaultDistributedQueue implements DistributedQueue { private final String name; private final Database database; private final Serializer serializer; - private final NodeId localNodeId; private final Set> pendingFutures = Sets.newIdentityHashSet(); - private final Consumer> notifyConsumers; private static final String PRIMITIVE_NAME = "distributedQueue"; private static final String SIZE = "size"; @@ -53,66 +52,59 @@ public class DefaultDistributedQueue implements DistributedQueue { public DefaultDistributedQueue(String name, Database database, Serializer serializer, - NodeId localNodeId, - boolean meteringEnabled, - Consumer> notifyConsumers) { + boolean meteringEnabled) { this.name = checkNotNull(name, "queue name cannot be null"); this.database = checkNotNull(database, "database cannot be null"); this.serializer = checkNotNull(serializer, "serializer cannot be null"); - this.localNodeId = localNodeId; - this.notifyConsumers = notifyConsumers; this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled); - + this.database.registerConsumer(update -> { + SharedExecutors.getSingleThreadExecutor().execute(() -> { + if (update.target() == QUEUE_PUSH) { + List input = update.input(); + String queueName = (String) input.get(0); + if (queueName.equals(name)) { + tryPoll(); + } + } + }); + }); } @Override public long size() { final MeteringAgent.Context timer = monitor.startTimer(SIZE); - try { - return Futures.getUnchecked(database.queueSize(name)); - } finally { - timer.stop(); - } + return Futures.getUnchecked(database.queueSize(name).whenComplete((r, e) -> timer.stop())); } @Override public void push(E entry) { + checkNotNull(entry, ERROR_NULL_ENTRY); final MeteringAgent.Context timer = monitor.startTimer(PUSH); - try { - checkNotNull(entry, ERROR_NULL_ENTRY); - Futures.getUnchecked(database.queuePush(name, serializer.encode(entry)) - .thenAccept(notifyConsumers) - .thenApply(v -> null)); - } finally { - timer.stop(); - } + Futures.getUnchecked(database.queuePush(name, serializer.encode(entry)) + .whenComplete((r, e) -> timer.stop())); } @Override public CompletableFuture pop() { final MeteringAgent.Context timer = monitor.startTimer(POP); - return database.queuePop(name, localNodeId) + return database.queuePop(name) + .whenComplete((r, e) -> timer.stop()) .thenCompose(v -> { if (v != null) { return CompletableFuture.completedFuture(serializer.decode(v)); - } else { - CompletableFuture newPendingFuture = new CompletableFuture<>(); - pendingFutures.add(newPendingFuture); - return newPendingFuture; } - }) - .whenComplete((r, e) -> timer.stop()); + CompletableFuture newPendingFuture = new CompletableFuture<>(); + pendingFutures.add(newPendingFuture); + return newPendingFuture; + }); } @Override public E peek() { final MeteringAgent.Context timer = monitor.startTimer(PEEK); - try { - return Futures.getUnchecked(database.queuePeek(name) - .thenApply(v -> v != null ? serializer.decode(v) : null)); - } finally { - timer.stop(); - } + return Futures.getUnchecked(database.queuePeek(name) + .thenApply(v -> v != null ? serializer.decode(v) : null) + .whenComplete((r, e) -> timer.stop())); } public String name() { @@ -122,7 +114,7 @@ public class DefaultDistributedQueue implements DistributedQueue { protected void tryPoll() { Set> completedFutures = Sets.newHashSet(); for (CompletableFuture future : pendingFutures) { - E entry = Futures.getUnchecked(database.queuePop(name, localNodeId) + E entry = Futures.getUnchecked(database.queuePop(name) .thenApply(v -> v != null ? serializer.decode(v) : null)); if (entry != null) { future.complete(entry); diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java index b463affed0..d6654e2725 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java @@ -15,15 +15,10 @@ */ package org.onosproject.store.consistent.impl; -import com.google.common.base.Charsets; -import org.onosproject.cluster.NodeId; import org.onosproject.store.service.DistributedQueue; import org.onosproject.store.service.DistributedQueueBuilder; import org.onosproject.store.service.Serializer; -import java.util.Set; -import java.util.function.Consumer; - import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -40,8 +35,7 @@ public class DefaultDistributedQueueBuilder implements DistributedQueueBuilde private final DatabaseManager databaseManager; private boolean metering = true; - public DefaultDistributedQueueBuilder( - DatabaseManager databaseManager) { + public DefaultDistributedQueueBuilder(DatabaseManager databaseManager) { this.databaseManager = databaseManager; } @@ -78,18 +72,10 @@ public class DefaultDistributedQueueBuilder implements DistributedQueueBuilde @Override public DistributedQueue build() { checkState(validInputs()); - Consumer> notifyOthers = nodes -> databaseManager.clusterCommunicator.multicast(name, - DatabaseManager.QUEUE_UPDATED_TOPIC, - s -> s.getBytes(Charsets.UTF_8), - nodes); - DefaultDistributedQueue queue = new DefaultDistributedQueue<>( + return new DefaultDistributedQueue<>( name, persistenceEnabled ? databaseManager.partitionedDatabase : databaseManager.inMemoryDatabase, serializer, - databaseManager.localNodeId, - metering, - notifyOthers); - databaseManager.registerQueue(queue); - return queue; + metering); } } diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java index 09b3f597b1..a294681e9f 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java @@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; -import org.onosproject.cluster.NodeId; import org.onosproject.store.service.DatabaseUpdate; import org.onosproject.store.service.Transaction; import org.onosproject.store.service.Versioned; @@ -229,15 +228,15 @@ public class PartitionedDatabase implements Database { } @Override - public CompletableFuture> queuePush(String queueName, byte[] entry) { + public CompletableFuture queuePush(String queueName, byte[] entry) { checkState(isOpen.get(), DB_NOT_OPEN); return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry); } @Override - public CompletableFuture queuePop(String queueName, NodeId nodeId) { + public CompletableFuture queuePop(String queueName) { checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(queueName, queueName).queuePop(queueName, nodeId); + return partitioner.getPartition(queueName, queueName).queuePop(queueName); } @Override diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java index 9b064b0d3d..72356d0b83 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java @@ -29,7 +29,7 @@ public class StateMachineUpdate { /** * Update is for a map. */ - MAP, + MAP_UPDATE, /** * Update is a transaction commit. @@ -37,7 +37,12 @@ public class StateMachineUpdate { TX_COMMIT, /** - * Update is for a non-map data structure. + * Update is a queue push. + */ + QUEUE_PUSH, + + /** + * Update is for some other operation. */ OTHER } @@ -55,9 +60,11 @@ public class StateMachineUpdate { public Target target() { // FIXME: This check is brittle if (operationName.contains("mapUpdate")) { - return Target.MAP; + return Target.MAP_UPDATE; } else if (operationName.contains("commit") || operationName.contains("prepareAndCommit")) { return Target.TX_COMMIT; + } else if (operationName.contains("queuePush")) { + return Target.QUEUE_PUSH; } else { return Target.OTHER; }