From 832686da5f83f758eaacdca4bf6b47f77b45ac35 Mon Sep 17 00:00:00 2001 From: Madan Jampani Date: Mon, 4 Apr 2016 21:57:26 -0700 Subject: [PATCH] Post DatabaseManager deprecation code cleanup - Dropping MutexExecutionService as there are now better alternatives - Dropping New from class names that were added during transition phase Change-Id: If0cdd3321081c3f8fda81441ef2c84549b616edd --- .../cli/net/CountersListCommand.java | 70 +--- .../service/DistributedQueueBuilder.java | 18 - .../store/service/MutexExecutionService.java | 34 -- .../store/service/StorageAdminService.java | 22 -- .../security/impl/DefaultPolicyBuilder.java | 4 - .../impl/DefaultAsyncAtomicValue.java | 6 +- ....java => DefaultAtomicCounterBuilder.java} | 4 +- ....java => DefaultConsistentMapBuilder.java} | 4 +- ...va => DefaultDistributedQueueBuilder.java} | 18 +- ...xt.java => DefaultTransactionContext.java} | 4 +- ... => DefaultTransactionContextBuilder.java} | 6 +- .../impl/MutexExecutionManager.java | 317 ------------------ .../primitives/impl/PartitionManager.java | 2 +- .../store/primitives/impl/StorageManager.java | 31 +- .../impl/UnmodifiableAsyncConsistentMap.java | 24 +- .../store/primitives/impl/package-info.java | 3 +- 16 files changed, 44 insertions(+), 523 deletions(-) delete mode 100644 core/api/src/main/java/org/onosproject/store/service/MutexExecutionService.java rename core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/{NewDefaultAtomicCounterBuilder.java => DefaultAtomicCounterBuilder.java} (87%) rename core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/{NewDefaultConsistentMapBuilder.java => DefaultConsistentMapBuilder.java} (90%) rename core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/{NewDefaultDistributedQueueBuilder.java => DefaultDistributedQueueBuilder.java} (78%) rename core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/{NewDefaultTransactionContext.java => DefaultTransactionContext.java} (95%) rename core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/{NewDefaultTransactionContextBuilder.java => DefaultTransactionContextBuilder.java} (87%) delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MutexExecutionManager.java diff --git a/cli/src/main/java/org/onosproject/cli/net/CountersListCommand.java b/cli/src/main/java/org/onosproject/cli/net/CountersListCommand.java index a44fdc9f62..a83999cbf3 100644 --- a/cli/src/main/java/org/onosproject/cli/net/CountersListCommand.java +++ b/cli/src/main/java/org/onosproject/cli/net/CountersListCommand.java @@ -21,9 +21,7 @@ import org.apache.karaf.shell.commands.Command; import org.onosproject.cli.AbstractShellCommand; import org.onosproject.store.service.StorageAdminService; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; /** @@ -35,73 +33,17 @@ public class CountersListCommand extends AbstractShellCommand { private static final String FMT = "name=%s value=%d"; - /** - * Displays counters as text. - * - * @param counters counter info - */ - private void displayCounters(Map counters) { - counters.forEach((name, value) -> print(FMT, name, value)); - } - - /** - * Converts info for counters into a JSON object. - * - * @param counters counter info - */ - private JsonNode json(Map counters) { - ObjectMapper mapper = new ObjectMapper(); - ArrayNode jsonCounters = mapper.createArrayNode(); - - // Create a JSON node for each counter - counters.forEach((name, value) -> { - ObjectNode jsonCounter = mapper.createObjectNode(); - jsonCounter.put("name", name) - .put("value", value); - jsonCounters.add(jsonCounter); - }); - - return jsonCounters; - } - - /** - * Converts info for counters from different databases into a JSON object. - * - * @param partitionedDbCounters counters info - * @param inMemoryDbCounters counters info - */ - private JsonNode jsonAllCounters(Map partitionedDbCounters, - Map inMemoryDbCounters) { - ObjectMapper mapper = new ObjectMapper(); - ArrayNode jsonCounters = mapper.createArrayNode(); - - // Create a JSON node for partitioned database counter - ObjectNode jsonPartitionedDatabaseCounters = mapper.createObjectNode(); - jsonPartitionedDatabaseCounters.set("partitionedDatabaseCounters", - json(partitionedDbCounters)); - jsonCounters.add(jsonPartitionedDatabaseCounters); - // Create a JSON node for in-memory database counter - ObjectNode jsonInMemoryDatabseCounters = mapper.createObjectNode(); - jsonInMemoryDatabseCounters.set("inMemoryDatabaseCounters", - json(inMemoryDbCounters)); - jsonCounters.add(jsonInMemoryDatabseCounters); - - return jsonCounters; - } - - @Override protected void execute() { StorageAdminService storageAdminService = get(StorageAdminService.class); - Map partitionedDatabaseCounters = storageAdminService.getPartitionedDatabaseCounters(); - Map inMemoryDatabaseCounters = storageAdminService.getInMemoryDatabaseCounters(); + Map counters = storageAdminService.getCounters(); if (outputJson()) { - print("%s", jsonAllCounters(partitionedDatabaseCounters, inMemoryDatabaseCounters)); + ObjectMapper mapper = new ObjectMapper(); + ObjectNode jsonCounters = mapper.createObjectNode(); + counters.forEach((k, v) -> jsonCounters.put(k, v)); + print("%s", jsonCounters); } else { - print("Partitioned database counters:"); - displayCounters(partitionedDatabaseCounters); - print("In-memory database counters:"); - displayCounters(inMemoryDatabaseCounters); + counters.keySet().stream().sorted().forEach(name -> print(FMT, name, counters.get(name))); } } } diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedQueueBuilder.java b/core/api/src/main/java/org/onosproject/store/service/DistributedQueueBuilder.java index 646dc28cc9..5e3cabf706 100644 --- a/core/api/src/main/java/org/onosproject/store/service/DistributedQueueBuilder.java +++ b/core/api/src/main/java/org/onosproject/store/service/DistributedQueueBuilder.java @@ -50,24 +50,6 @@ public interface DistributedQueueBuilder { */ DistributedQueueBuilder withSerializer(Serializer serializer); - /** - * - * - * @return this DistributedQueueBuilder for method chaining - */ - DistributedQueueBuilder withMeteringDisabled(); - - - /** - * Disables persistence of queues entries. - *

- * When persistence is disabled, a full cluster restart will wipe out all - * queue entries. - *

- * @return this DistributedQueueBuilder for method chaining - */ - DistributedQueueBuilder withPersistenceDisabled(); - /** * Builds a queue based on the configuration options * supplied to this builder. diff --git a/core/api/src/main/java/org/onosproject/store/service/MutexExecutionService.java b/core/api/src/main/java/org/onosproject/store/service/MutexExecutionService.java deleted file mode 100644 index d05f3b912f..0000000000 --- a/core/api/src/main/java/org/onosproject/store/service/MutexExecutionService.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.service; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Service for mutually exclusive job execution. - */ -public interface MutexExecutionService { - - /** - * Runs the specified task in a mutually exclusive fashion. - * @param task task to run - * @param exclusionPath path on which different instances synchronize - * @param executor executor to use for running the task - * @return future that is completed when the task execution completes. - */ - CompletableFuture execute(MutexTask task, String exclusionPath, Executor executor); -} \ No newline at end of file diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java index 4d94cec686..c8426ed357 100644 --- a/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java +++ b/core/api/src/main/java/org/onosproject/store/service/StorageAdminService.java @@ -46,33 +46,11 @@ public interface StorageAdminService { /** * Returns information about all the atomic counters in the system. - * If 2 counters belonging to 2 different databases have the same name, - * then only one counter from one database is returned. * * @return mapping from counter name to that counter's next value - * @deprecated 1.5.0 Falcon Release */ - @Deprecated Map getCounters(); - /** - * Returns information about all the atomic partitioned database counters in the system. - * - * @return mapping from counter name to that counter's next value - * @deprecated 1.5.0 Falcon Release - */ - @Deprecated - Map getPartitionedDatabaseCounters(); - - /** - * Returns information about all the atomic in-memory database counters in the system. - * - * @return mapping from counter name to that counter's next value - * @deprecated 1.5.0 Falcon Release - */ - @Deprecated - Map getInMemoryDatabaseCounters(); - /** * Returns all pending transactions. * diff --git a/core/security/src/main/java/org/onosproject/security/impl/DefaultPolicyBuilder.java b/core/security/src/main/java/org/onosproject/security/impl/DefaultPolicyBuilder.java index 01442f7428..214a43a1bc 100644 --- a/core/security/src/main/java/org/onosproject/security/impl/DefaultPolicyBuilder.java +++ b/core/security/src/main/java/org/onosproject/security/impl/DefaultPolicyBuilder.java @@ -73,7 +73,6 @@ import org.onosproject.store.cluster.messaging.MessagingService; import org.onosproject.store.primitives.PartitionAdminService; import org.onosproject.store.primitives.PartitionService; import org.onosproject.store.service.LogicalClockService; -import org.onosproject.store.service.MutexExecutionService; import org.onosproject.store.service.StorageAdminService; import org.onosproject.store.service.StorageService; import org.onosproject.ui.UiExtensionService; @@ -248,7 +247,6 @@ public final class DefaultPolicyBuilder { permSet.add(new ServicePermission(MessagingService.class.getName(), ServicePermission.GET)); permSet.add(new ServicePermission(PartitionService.class.getName(), ServicePermission.GET)); permSet.add(new ServicePermission(LogicalClockService.class.getName(), ServicePermission.GET)); - permSet.add(new ServicePermission(MutexExecutionService.class.getName(), ServicePermission.GET)); permSet.add(new ServicePermission(StorageService.class.getName(), ServicePermission.GET)); permSet.add(new ServicePermission(UiExtensionService.class.getName(), ServicePermission.GET)); @@ -376,8 +374,6 @@ public final class DefaultPolicyBuilder { PartitionService.class.getName())); serviceDirectory.put(CLOCK_WRITE, ImmutableSet.of( LogicalClockService.class.getName())); - serviceDirectory.put(MUTEX_WRITE, ImmutableSet.of( - MutexExecutionService.class.getName())); return serviceDirectory; } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java index 1d42a97bba..281ad254b4 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java @@ -34,7 +34,11 @@ import com.google.common.base.Throwables; import com.google.common.collect.Maps; import org.onosproject.utils.MeteringAgent; - +/** + * Default implementation of a {@code AsyncAtomicValue}. + * + * @param value type + */ public class DefaultAsyncAtomicValue implements AsyncAtomicValue { private final String name; diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultAtomicCounterBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java similarity index 87% rename from core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultAtomicCounterBuilder.java rename to core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java index 556774e036..fdbcf6c004 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultAtomicCounterBuilder.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java @@ -22,11 +22,11 @@ import org.onosproject.store.service.AtomicCounterBuilder; /** * Default implementation of AtomicCounterBuilder. */ -public class NewDefaultAtomicCounterBuilder extends AtomicCounterBuilder { +public class DefaultAtomicCounterBuilder extends AtomicCounterBuilder { private final DistributedPrimitiveCreator primitiveCreator; - public NewDefaultAtomicCounterBuilder(DistributedPrimitiveCreator primitiveCreator) { + public DefaultAtomicCounterBuilder(DistributedPrimitiveCreator primitiveCreator) { this.primitiveCreator = primitiveCreator; } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultConsistentMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java similarity index 90% rename from core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultConsistentMapBuilder.java rename to core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java index dfef62ed16..37c45d48f5 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultConsistentMapBuilder.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java @@ -26,11 +26,11 @@ import org.onosproject.store.service.ConsistentMapBuilder; * @param type for map key * @param type for map value */ -public class NewDefaultConsistentMapBuilder extends ConsistentMapBuilder { +public class DefaultConsistentMapBuilder extends ConsistentMapBuilder { private final DistributedPrimitiveCreator primitiveCreator; - public NewDefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator) { + public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator) { this.primitiveCreator = primitiveCreator; } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultDistributedQueueBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java similarity index 78% rename from core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultDistributedQueueBuilder.java rename to core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java index eff8e77253..47c862822f 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultDistributedQueueBuilder.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java @@ -28,15 +28,13 @@ import static com.google.common.base.Preconditions.checkState; * * @param queue entry type */ -public class NewDefaultDistributedQueueBuilder implements DistributedQueueBuilder { +public class DefaultDistributedQueueBuilder implements DistributedQueueBuilder { private final DistributedPrimitiveCreator primitiveCreator; private String name; - private boolean persistenceEnabled = true; - private boolean metering = true; private Serializer serializer; - public NewDefaultDistributedQueueBuilder(DistributedPrimitiveCreator primitiveCreator) { + public DefaultDistributedQueueBuilder(DistributedPrimitiveCreator primitiveCreator) { this.primitiveCreator = primitiveCreator; } @@ -54,18 +52,6 @@ public class NewDefaultDistributedQueueBuilder implements DistributedQueueBui return this; } - @Override - public DistributedQueueBuilder withMeteringDisabled() { - metering = false; - return this; - } - - @Override - public DistributedQueueBuilder withPersistenceDisabled() { - persistenceEnabled = false; - return this; - } - private boolean validInputs() { return name != null && serializer != null; } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java similarity index 95% rename from core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java rename to core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java index e9e75a595a..be9ead43c5 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java @@ -32,7 +32,7 @@ import com.google.common.collect.Sets; /** * Default implementation of transaction context. */ -public class NewDefaultTransactionContext implements TransactionContext { +public class DefaultTransactionContext implements TransactionContext { private final AtomicBoolean isOpen = new AtomicBoolean(false); private final DistributedPrimitiveCreator creator; @@ -41,7 +41,7 @@ public class NewDefaultTransactionContext implements TransactionContext { private final Set txParticipants = Sets.newConcurrentHashSet(); private final MeteringAgent monitor; - public NewDefaultTransactionContext(TransactionId transactionId, + public DefaultTransactionContext(TransactionId transactionId, DistributedPrimitiveCreator creator, TransactionCoordinator transactionCoordinator) { this.transactionId = transactionId; diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContextBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java similarity index 87% rename from core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContextBuilder.java rename to core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java index 7d4e22e1d8..1704bb2fe5 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContextBuilder.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java @@ -22,13 +22,13 @@ import org.onosproject.store.service.TransactionContextBuilder; /** * Default Transaction Context Builder. */ -public class NewDefaultTransactionContextBuilder extends TransactionContextBuilder { +public class DefaultTransactionContextBuilder extends TransactionContextBuilder { private final TransactionId transactionId; private final DistributedPrimitiveCreator primitiveCreator; private final TransactionCoordinator transactionCoordinator; - public NewDefaultTransactionContextBuilder(TransactionId transactionId, + public DefaultTransactionContextBuilder(TransactionId transactionId, DistributedPrimitiveCreator primitiveCreator, TransactionCoordinator transactionCoordinator) { this.transactionId = transactionId; @@ -38,7 +38,7 @@ public class NewDefaultTransactionContextBuilder extends TransactionContextBuild @Override public TransactionContext build() { - return new NewDefaultTransactionContext(transactionId, + return new DefaultTransactionContext(transactionId, primitiveCreator, transactionCoordinator); } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MutexExecutionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MutexExecutionManager.java deleted file mode 100644 index bc45d683d0..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MutexExecutionManager.java +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import static org.slf4j.LoggerFactory.getLogger; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.Service; -import org.onlab.util.Tools; -import org.onosproject.cluster.ClusterEvent; -import org.onosproject.cluster.ClusterEventListener; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.ControllerNode.State; -import org.onosproject.cluster.NodeId; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.service.ConsistentMap; -import org.onosproject.store.service.ConsistentMapException; -import org.onosproject.store.service.MapEvent; -import org.onosproject.store.service.MapEventListener; -import org.onosproject.store.service.MutexExecutionService; -import org.onosproject.store.service.MutexTask; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.StorageService; -import org.onosproject.store.service.Versioned; -import org.slf4j.Logger; - -import com.google.common.base.MoreObjects; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import static org.onosproject.security.AppGuard.checkPermission; -import static org.onosproject.security.AppPermission.Type.MUTEX_WRITE; -/** - * Implementation of a MutexExecutionService. - */ -@Component(immediate = true) -@Service -public class MutexExecutionManager implements MutexExecutionService { - - private final Logger log = getLogger(getClass()); - - protected ConsistentMap lockMap; - protected NodeId localNodeId; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterService clusterService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected StorageService storageService; - - private final MapEventListener mapEventListener = new InternalLockMapEventListener(); - private final ClusterEventListener clusterEventListener = new InternalClusterEventListener(); - - private Map> pending = Maps.newConcurrentMap(); - private Map activeTasks = Maps.newConcurrentMap(); - - @Activate - public void activate() { - localNodeId = clusterService.getLocalNode().id(); - lockMap = storageService.consistentMapBuilder() - .withName("onos-mutexes") - .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API), MutexState.class)) - .withPartitionsDisabled() - .build(); - lockMap.addListener(mapEventListener); - clusterService.addListener(clusterEventListener); - releaseOldLocks(); - log.info("Started"); - } - - @Deactivate - public void deactivate() { - lockMap.removeListener(mapEventListener); - pending.values().forEach(future -> future.cancel(true)); - activeTasks.forEach((k, v) -> { - v.stop(); - unlock(k); - }); - clusterService.removeListener(clusterEventListener); - log.info("Stopped"); - } - - @Override - public CompletableFuture execute(MutexTask task, String exclusionPath, Executor executor) { - checkPermission(MUTEX_WRITE); - return lock(exclusionPath) - .thenApply(state -> activeTasks.computeIfAbsent(exclusionPath, - k -> new InnerMutexTask(exclusionPath, - task, - state.term()))) - .thenAcceptAsync(t -> t.start(), executor) - .whenComplete((r, e) -> unlock(exclusionPath)); - } - - protected CompletableFuture lock(String exclusionPath) { - CompletableFuture future = - pending.computeIfAbsent(exclusionPath, k -> new CompletableFuture<>()); - tryLock(exclusionPath); - return future; - } - - /** - * Attempts to acquire lock for a path. If lock is held by some other node, adds this node to - * the wait list. - * @param exclusionPath exclusion path - */ - protected void tryLock(String exclusionPath) { - Tools.retryable(() -> lockMap.asJavaMap() - .compute(exclusionPath, - (k, v) -> MutexState.admit(v, localNodeId)), - ConsistentMapException.ConcurrentModification.class, - Integer.MAX_VALUE, - 100).get(); - } - - /** - * Releases lock for the specific path. This operation is idempotent. - * @param exclusionPath exclusion path - */ - protected void unlock(String exclusionPath) { - Tools.retryable(() -> lockMap.asJavaMap() - .compute(exclusionPath, (k, v) -> MutexState.evict(v, localNodeId)), - ConsistentMapException.ConcurrentModification.class, - Integer.MAX_VALUE, - 100).get(); - } - - /** - * Detects and releases all locks held by this node. - */ - private void releaseOldLocks() { - Maps.filterValues(lockMap.asJavaMap(), state -> localNodeId.equals(state.holder())) - .keySet() - .forEach(path -> { - log.info("Detected zombie task still holding lock for {}. Releasing lock.", path); - unlock(path); - }); - } - - private class InternalLockMapEventListener implements MapEventListener { - - @Override - public void event(MapEvent event) { - log.debug("Received {}", event); - if (event.type() == MapEvent.Type.UPDATE || event.type() == MapEvent.Type.INSERT) { - pending.computeIfPresent(event.key(), (k, future) -> { - MutexState state = Versioned.valueOrElse(event.value(), null); - if (state != null && localNodeId.equals(state.holder())) { - log.debug("Local node is now owner for {}", event.key()); - future.complete(state); - return null; - } else { - return future; - } - }); - InnerMutexTask task = activeTasks.get(event.key()); - if (task != null && task.term() < Versioned.valueOrElse(event.value(), null).term()) { - task.stop(); - } - } - } - } - - private class InternalClusterEventListener implements ClusterEventListener { - - @Override - public void event(ClusterEvent event) { - if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED || - event.type() == ClusterEvent.Type.INSTANCE_REMOVED) { - NodeId nodeId = event.subject().id(); - log.debug("{} is no longer active. Attemping to clean up its locks.", nodeId); - lockMap.asJavaMap().forEach((k, v) -> { - if (v.contains(nodeId)) { - lockMap.compute(k, (path, state) -> MutexState.evict(v, nodeId)); - } - }); - } - long activeNodes = clusterService.getNodes() - .stream() - .map(node -> clusterService.getState(node.id())) - .filter(State::isActive) - .count(); - if (clusterService.getNodes().size() > 1 && activeNodes == 1) { - log.info("This node is partitioned away from the cluster. Stopping all inflight executions"); - activeTasks.forEach((k, v) -> { - v.stop(); - }); - } - } - } - - private static final class MutexState { - - private final NodeId holder; - private final List waitList; - private final long term; - - public static MutexState admit(MutexState state, NodeId nodeId) { - if (state == null) { - return new MutexState(nodeId, 1L, Lists.newArrayList()); - } else if (state.holder() == null) { - return new MutexState(nodeId, state.term() + 1, Lists.newArrayList()); - } else { - if (!state.contains(nodeId)) { - NodeId newHolder = state.holder(); - List newWaitList = Lists.newArrayList(state.waitList()); - newWaitList.add(nodeId); - return new MutexState(newHolder, state.term(), newWaitList); - } else { - return state; - } - } - } - - public static MutexState evict(MutexState state, NodeId nodeId) { - return state.evict(nodeId); - } - - public MutexState evict(NodeId nodeId) { - if (nodeId.equals(holder)) { - if (waitList.isEmpty()) { - return new MutexState(null, term, waitList); - } - List newWaitList = Lists.newArrayList(waitList); - NodeId newHolder = newWaitList.remove(0); - return new MutexState(newHolder, term + 1, newWaitList); - } else { - NodeId newHolder = holder; - List newWaitList = Lists.newArrayList(waitList); - newWaitList.remove(nodeId); - return new MutexState(newHolder, term, newWaitList); - } - } - - public NodeId holder() { - return holder; - } - - public List waitList() { - return waitList; - } - - public long term() { - return term; - } - - private boolean contains(NodeId nodeId) { - return (nodeId.equals(holder) || waitList.contains(nodeId)); - } - - private MutexState(NodeId holder, long term, List waitList) { - this.holder = holder; - this.term = term; - this.waitList = Lists.newArrayList(waitList); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("holder", holder) - .add("term", term) - .add("waitList", waitList) - .toString(); - } - } - - private class InnerMutexTask implements MutexTask { - private final MutexTask task; - private final String mutexPath; - private final long term; - - public InnerMutexTask(String mutexPath, MutexTask task, long term) { - this.mutexPath = mutexPath; - this.term = term; - this.task = task; - } - - public long term() { - return term; - } - - @Override - public void start() { - log.debug("Starting execution for mutex task guarded by {}", mutexPath); - task.start(); - log.debug("Finished execution for mutex task guarded by {}", mutexPath); - } - - @Override - public void stop() { - log.debug("Stopping execution for mutex task guarded by {}", mutexPath); - task.stop(); - } - } -} \ No newline at end of file diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java index d4699a2f8f..d2ea48c167 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java @@ -94,7 +94,7 @@ public class PartitionManager extends AbstractListenerManager openFuture = CompletableFuture.allOf(partitions.values() .stream() diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java index 5576b4f767..ae07a0a1df 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java @@ -57,7 +57,6 @@ import org.onosproject.store.service.StorageService; import org.onosproject.store.service.TransactionContextBuilder; import org.slf4j.Logger; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; @@ -129,7 +128,7 @@ public class StorageManager implements StorageService, StorageAdminService { @Override public ConsistentMapBuilder consistentMapBuilder() { checkPermission(STORAGE_WRITE); - return new NewDefaultConsistentMapBuilder<>(federatedPrimitiveCreator); + return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator); } @Override @@ -141,13 +140,13 @@ public class StorageManager implements StorageService, StorageAdminService { @Override public DistributedQueueBuilder queueBuilder() { checkPermission(STORAGE_WRITE); - return new NewDefaultDistributedQueueBuilder<>(federatedPrimitiveCreator); + return new DefaultDistributedQueueBuilder<>(federatedPrimitiveCreator); } @Override public AtomicCounterBuilder atomicCounterBuilder() { checkPermission(STORAGE_WRITE); - return new NewDefaultAtomicCounterBuilder(federatedPrimitiveCreator); + return new DefaultAtomicCounterBuilder(federatedPrimitiveCreator); } @Override @@ -163,7 +162,7 @@ public class StorageManager implements StorageService, StorageAdminService { @Override public TransactionContextBuilder transactionContextBuilder() { checkPermission(STORAGE_WRITE); - return new NewDefaultTransactionContextBuilder(transactionIdGenerator.get(), + return new DefaultTransactionContextBuilder(transactionIdGenerator.get(), federatedPrimitiveCreator, transactionCoordinator); } @@ -181,26 +180,10 @@ public class StorageManager implements StorageService, StorageAdminService { @Override public Map getCounters() { - Map result = Maps.newHashMap(); - result.putAll(getInMemoryDatabaseCounters()); - result.putAll(getPartitionedDatabaseCounters()); - return result; - } - - @Override - public Map getInMemoryDatabaseCounters() { - return ImmutableMap.of(); - } - - @Override - public Map getPartitionedDatabaseCounters() { - return getCounters(federatedPrimitiveCreator); - } - - public Map getCounters(DistributedPrimitiveCreator creator) { Map counters = Maps.newConcurrentMap(); - creator.getAsyncAtomicCounterNames() - .forEach(name -> counters.put(name, creator.newAsyncCounter(name).asAtomicCounter().get())); + federatedPrimitiveCreator.getAsyncAtomicCounterNames() + .forEach(name -> counters.put(name, + federatedPrimitiveCreator.newAsyncCounter(name).asAtomicCounter().get())); return counters; } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/UnmodifiableAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/UnmodifiableAsyncConsistentMap.java index 30efc369b7..6525f12196 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/UnmodifiableAsyncConsistentMap.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/UnmodifiableAsyncConsistentMap.java @@ -35,6 +35,8 @@ import org.onosproject.store.service.Versioned; */ public class UnmodifiableAsyncConsistentMap extends DelegatingAsyncConsistentMap { + private static final String ERROR_MSG = "map updates are not allowed"; + public UnmodifiableAsyncConsistentMap(AsyncConsistentMap backingMap) { super(backingMap); } @@ -43,56 +45,56 @@ public class UnmodifiableAsyncConsistentMap extends DelegatingAsyncConsist public CompletableFuture> computeIf(K key, Predicate condition, BiFunction remappingFunction) { - return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed")); + return Tools.exceptionalFuture(new UnsupportedOperationException("")); } @Override public CompletableFuture> put(K key, V value) { - return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed")); + return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG)); } @Override public CompletableFuture> putAndGet(K key, V value) { - return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed")); + return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG)); } @Override public CompletableFuture> remove(K key) { - return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed")); + return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG)); } @Override public CompletableFuture clear() { - return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed")); + return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG)); } @Override public CompletableFuture> putIfAbsent(K key, V value) { - return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed")); + return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG)); } @Override public CompletableFuture remove(K key, V value) { - return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed")); + return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG)); } @Override public CompletableFuture remove(K key, long version) { - return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed")); + return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG)); } @Override public CompletableFuture> replace(K key, V value) { - return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed")); + return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG)); } @Override public CompletableFuture replace(K key, V oldValue, V newValue) { - return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed")); + return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG)); } @Override public CompletableFuture replace(K key, long oldVersion, V newValue) { - return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed")); + return Tools.exceptionalFuture(new UnsupportedOperationException(ERROR_MSG)); } } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/package-info.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/package-info.java index cb20c3eec8..5b293c72da 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/package-info.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/package-info.java @@ -15,7 +15,6 @@ */ /** - * Implementation of partitioned and distributed store facility capable of - * providing consistent update semantics. + * Implementation classes for various Distributed primitives. */ package org.onosproject.store.primitives.impl; \ No newline at end of file