From 38f01690dbcd791a1f1f722e4ff47cf10f3358a9 Mon Sep 17 00:00:00 2001 From: Madan Jampani Date: Mon, 4 Apr 2016 20:28:08 -0700 Subject: [PATCH] Dropping DatabaseManager and related code. Goodbye! Change-Id: I5d90d62678402234462dad8be455903de481da21 --- .../impl/AsyncCachingConsistentMap.java | 81 --- .../store/primitives/impl/CommitResponse.java | 61 -- .../impl/CopycatCommunicationProtocol.java | 134 ----- .../store/primitives/impl/Database.java | 106 ---- .../store/primitives/impl/DatabaseConfig.java | 157 ------ .../primitives/impl/DatabaseManager.java | 456 --------------- .../primitives/impl/DatabasePartitioner.java | 45 -- .../store/primitives/impl/DatabaseProxy.java | 249 --------- .../primitives/impl/DatabaseSerializer.java | 105 ---- .../store/primitives/impl/DatabaseState.java | 120 ---- .../impl/DefaultAsyncAtomicCounter.java | 107 ---- .../impl/DefaultAsyncConsistentMap.java | 523 ------------------ .../impl/DefaultAtomicCounterBuilder.java | 41 -- .../impl/DefaultConsistentMapBuilder.java | 79 --- .../primitives/impl/DefaultDatabase.java | 253 --------- .../primitives/impl/DefaultDatabaseState.java | 381 ------------- .../impl/DefaultDistributedQueue.java | 138 ----- .../impl/DefaultDistributedQueueBuilder.java | 81 --- .../impl/DefaultTransactionContext.java | 146 ----- .../DefaultTransactionContextBuilder.java | 56 -- .../store/primitives/impl/MappingSet.java | 131 ----- .../primitives/impl/PartitionedDatabase.java | 404 -------------- .../store/primitives/impl/Partitioner.java | 33 -- .../store/primitives/impl/Result.java | 121 ---- .../impl/SimpleKeyHashPartitioner.java | 38 -- .../impl/SimpleTableHashPartitioner.java | 39 -- .../primitives/impl/StateMachineUpdate.java | 91 --- .../primitives/impl/TransactionManager.java | 110 ---- .../store/primitives/impl/UpdateResult.java | 82 --- .../impl/DefaultAsyncConsistentMapTest.java | 373 ------------- .../store/primitives/impl/ResultTest.java | 57 -- .../primitives/impl/UpdateResultTest.java | 99 ---- pom.xml | 3 +- utils/thirdparty/pom.xml | 27 +- 34 files changed, 3 insertions(+), 4924 deletions(-) delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AsyncCachingConsistentMap.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CommitResponse.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatCommunicationProtocol.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Database.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseConfig.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabasePartitioner.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseProxy.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseState.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicCounter.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabase.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueue.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MappingSet.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Partitioner.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Result.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/SimpleKeyHashPartitioner.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/SimpleTableHashPartitioner.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StateMachineUpdate.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java delete mode 100644 core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/UpdateResult.java delete mode 100644 core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMapTest.java delete mode 100644 core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/ResultTest.java delete mode 100644 core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/UpdateResultTest.java diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AsyncCachingConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AsyncCachingConsistentMap.java deleted file mode 100644 index 9df1b3bd26..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AsyncCachingConsistentMap.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import java.util.concurrent.CompletableFuture; - -import org.onosproject.core.ApplicationId; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.Versioned; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; - -/** - * Extension of {@link DefaultAsyncConsistentMap} that provides a weaker read consistency - * guarantee in return for better read performance. - *

- * For read/write operations that are local to a node this map implementation provides - * guarantees similar to a ConsistentMap. However for read/write operations executed - * across multiple nodes this implementation only provides eventual consistency. - * - * @param key type - * @param value type - */ -public class AsyncCachingConsistentMap extends DefaultAsyncConsistentMap { - - private final LoadingCache>> cache = - CacheBuilder.newBuilder() - .maximumSize(10000) // TODO: make configurable - .build(new CacheLoader>>() { - @Override - public CompletableFuture> load(K key) - throws Exception { - return AsyncCachingConsistentMap.super.get(key); - } - }); - - public AsyncCachingConsistentMap(String name, - ApplicationId applicationId, - Database database, - Serializer serializer, - boolean readOnly, - boolean purgeOnUninstall, - boolean meteringEnabled) { - super(name, applicationId, database, serializer, readOnly, purgeOnUninstall, meteringEnabled); - addListener(event -> cache.invalidate(event.key())); - } - - @Override - public CompletableFuture> get(K key) { - CompletableFuture> cachedValue = cache.getIfPresent(key); - if (cachedValue != null) { - if (cachedValue.isCompletedExceptionally()) { - cache.invalidate(key); - } else { - return cachedValue; - } - } - return cache.getUnchecked(key); - } - - @Override - protected void beforeUpdate(K key) { - super.beforeUpdate(key); - cache.invalidate(key); - } -} \ No newline at end of file diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CommitResponse.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CommitResponse.java deleted file mode 100644 index a18ade4896..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CommitResponse.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import static com.google.common.base.MoreObjects.toStringHelper; - -import java.util.Collections; -import java.util.List; - -import com.google.common.collect.ImmutableList; - -/** - * Result of a Transaction commit operation. - */ -public final class CommitResponse { - - private boolean success; - private List> updates; - - public static CommitResponse success(List> updates) { - return new CommitResponse(true, updates); - } - - public static CommitResponse failure() { - return new CommitResponse(false, Collections.emptyList()); - } - - private CommitResponse(boolean success, List> updates) { - this.success = success; - this.updates = ImmutableList.copyOf(updates); - } - - public boolean success() { - return success; - } - - public List> updates() { - return updates; - } - - @Override - public String toString() { - return toStringHelper(this) - .add("success", success) - .add("udpates", updates) - .toString(); - } -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatCommunicationProtocol.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatCommunicationProtocol.java deleted file mode 100644 index e40665f27c..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatCommunicationProtocol.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; - -import org.onlab.util.Tools; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.ControllerNode; -import org.onosproject.cluster.NodeId; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.cluster.messaging.MessageSubject; - -import net.kuujo.copycat.protocol.AbstractProtocol; -import net.kuujo.copycat.protocol.ProtocolClient; -import net.kuujo.copycat.protocol.ProtocolHandler; -import net.kuujo.copycat.protocol.ProtocolServer; -import net.kuujo.copycat.util.Configurable; - -/** - * Protocol for Copycat communication that employs - * {@code ClusterCommunicationService}. - */ -public class CopycatCommunicationProtocol extends AbstractProtocol { - - private static final MessageSubject COPYCAT_MESSAGE_SUBJECT = - new MessageSubject("onos-copycat-message"); - - protected ClusterService clusterService; - protected ClusterCommunicationService clusterCommunicator; - - public CopycatCommunicationProtocol(ClusterService clusterService, - ClusterCommunicationService clusterCommunicator) { - this.clusterService = clusterService; - this.clusterCommunicator = clusterCommunicator; - } - - @Override - public Configurable copy() { - return this; - } - - @Override - public ProtocolClient createClient(URI uri) { - NodeId nodeId = uriToNodeId(uri); - if (nodeId == null) { - throw new IllegalStateException("Unknown peer " + uri); - } - return new Client(nodeId); - } - - @Override - public ProtocolServer createServer(URI uri) { - return new Server(); - } - - private class Server implements ProtocolServer { - - @Override - public void handler(ProtocolHandler handler) { - if (handler == null) { - clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT); - } else { - clusterCommunicator.addSubscriber(COPYCAT_MESSAGE_SUBJECT, - ByteBuffer::wrap, - handler, - Tools::byteBuffertoArray); - // FIXME: Tools::byteBuffertoArray involves a array copy. - } - } - - @Override - public CompletableFuture listen() { - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture close() { - clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT); - return CompletableFuture.completedFuture(null); - } - } - - private class Client implements ProtocolClient { - private final NodeId peer; - - public Client(NodeId peer) { - this.peer = peer; - } - - @Override - public CompletableFuture write(ByteBuffer request) { - return clusterCommunicator.sendAndReceive(request, - COPYCAT_MESSAGE_SUBJECT, - Tools::byteBuffertoArray, - ByteBuffer::wrap, - peer); - } - - @Override - public CompletableFuture connect() { - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture close() { - return CompletableFuture.completedFuture(null); - } - } - - private NodeId uriToNodeId(URI uri) { - return clusterService.getNodes() - .stream() - .filter(node -> uri.getHost().equals(node.ip().toString())) - .map(ControllerNode::id) - .findAny() - .orElse(null); - } -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Database.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Database.java deleted file mode 100644 index bacf9f692c..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Database.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.primitives.impl; - - -import java.util.function.Consumer; - -import net.kuujo.copycat.cluster.ClusterConfig; -import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator; -import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig; -import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator; -import net.kuujo.copycat.resource.Resource; - -/** - * Database. - */ -public interface Database extends DatabaseProxy, Resource { - - /** - * Creates a new database with the default cluster configuration.

- * - * The database will be constructed with the default cluster configuration. The default cluster configuration - * searches for two resources on the classpath - {@code cluster} and {cluster-defaults} - in that order. Configuration - * options specified in {@code cluster.conf} will override those in {cluster-defaults.conf}.

- * - * Additionally, the database will be constructed with an database configuration that searches the classpath for - * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and - * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name - * as the map resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource - * configurations will be loaded according to namespaces as well; for example, `databases.conf`. - * - * @param name The database name. - * @return The database. - */ - static Database create(String name) { - return create(name, new ClusterConfig(), new DatabaseConfig()); - } - - /** - * Creates a new database.

- * - * The database will be constructed with an database configuration that searches the classpath for - * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and - * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name - * as the database resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource - * configurations will be loaded according to namespaces as well; for example, `databases.conf`. - * - * @param name The database name. - * @param cluster The cluster configuration. - * @return The database. - */ - static Database create(String name, ClusterConfig cluster) { - return create(name, cluster, new DatabaseConfig()); - } - - /** - * Creates a new database. - * - * @param name The database name. - * @param cluster The cluster configuration. - * @param config The database configuration. - - * @return The database. - */ - static Database create(String name, ClusterConfig cluster, DatabaseConfig config) { - ClusterCoordinator coordinator = - new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster)); - return coordinator.getResource(name, config.resolve(cluster)) - .addStartupTask(() -> coordinator.open().thenApply(v -> null)) - .addShutdownTask(coordinator::close); - } - - /** - * Tells whether the database supports change notifications. - * @return true if notifications are supported; false otherwise - */ - default boolean hasChangeNotificationSupport() { - return true; - } - - /** - * Registers a new consumer of StateMachineUpdates. - * @param consumer consumer to register - */ - void registerConsumer(Consumer consumer); - - /** - * Unregisters a consumer of StateMachineUpdates. - * @param consumer consumer to unregister - */ - void unregisterConsumer(Consumer consumer); -} \ No newline at end of file diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseConfig.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseConfig.java deleted file mode 100644 index 57dd31c302..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseConfig.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.primitives.impl; - -import com.typesafe.config.ConfigValueFactory; -import net.kuujo.copycat.cluster.ClusterConfig; -import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig; -import net.kuujo.copycat.protocol.Consistency; -import net.kuujo.copycat.resource.ResourceConfig; -import net.kuujo.copycat.state.StateLogConfig; -import net.kuujo.copycat.util.internal.Assert; - -import java.util.Map; - -/** - * Database configuration. - * - */ -public class DatabaseConfig extends ResourceConfig { - private static final String DATABASE_CONSISTENCY = "consistency"; - - private static final String DEFAULT_CONFIGURATION = "database-defaults"; - private static final String CONFIGURATION = "database"; - - private String name; - - public DatabaseConfig() { - super(CONFIGURATION, DEFAULT_CONFIGURATION); - } - - public DatabaseConfig(Map config) { - super(config, CONFIGURATION, DEFAULT_CONFIGURATION); - } - - public DatabaseConfig(String resource) { - super(resource, CONFIGURATION, DEFAULT_CONFIGURATION); - } - - protected DatabaseConfig(DatabaseConfig config) { - super(config); - } - - @Override - public DatabaseConfig copy() { - return new DatabaseConfig(this); - } - - /** - * Sets the database read consistency. - * - * @param consistency The database read consistency. - * @throws java.lang.NullPointerException If the consistency is {@code null} - */ - public void setConsistency(String consistency) { - this.config = config.withValue(DATABASE_CONSISTENCY, - ConfigValueFactory.fromAnyRef( - Consistency.parse(Assert.isNotNull(consistency, "consistency")).toString())); - } - - /** - * Sets the database read consistency. - * - * @param consistency The database read consistency. - * @throws java.lang.NullPointerException If the consistency is {@code null} - */ - public void setConsistency(Consistency consistency) { - this.config = config.withValue(DATABASE_CONSISTENCY, - ConfigValueFactory.fromAnyRef( - Assert.isNotNull(consistency, "consistency").toString())); - } - - /** - * Returns the database read consistency. - * - * @return The database read consistency. - */ - public Consistency getConsistency() { - return Consistency.parse(config.getString(DATABASE_CONSISTENCY)); - } - - /** - * Sets the database read consistency, returning the configuration for method chaining. - * - * @param consistency The database read consistency. - * @return The database configuration. - * @throws java.lang.NullPointerException If the consistency is {@code null} - */ - public DatabaseConfig withConsistency(String consistency) { - setConsistency(consistency); - return this; - } - - /** - * Sets the database read consistency, returning the configuration for method chaining. - * - * @param consistency The database read consistency. - * @return The database configuration. - * @throws java.lang.NullPointerException If the consistency is {@code null} - */ - public DatabaseConfig withConsistency(Consistency consistency) { - setConsistency(consistency); - return this; - } - - /** - * Returns the database name. - * - * @return The database name - */ - public String getName() { - return name; - } - - /** - * Sets the database name, returning the configuration for method chaining. - * - * @param name The database name - * @return The database configuration - * @throws java.lang.NullPointerException If the name is {@code null} - */ - public DatabaseConfig withName(String name) { - setName(Assert.isNotNull(name, "name")); - return this; - } - - /** - * Sets the database name. - * - * @param name The database name - * @throws java.lang.NullPointerException If the name is {@code null} - */ - public void setName(String name) { - this.name = Assert.isNotNull(name, "name"); - } - - @Override - public CoordinatedResourceConfig resolve(ClusterConfig cluster) { - return new StateLogConfig(toMap()) - .resolve(cluster) - .withResourceType(DefaultDatabase.class); - } - -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java deleted file mode 100644 index 0671ab4bf0..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java +++ /dev/null @@ -1,456 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.primitives.impl; - -import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED; -import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED; -import static org.slf4j.LoggerFactory.getLogger; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -import net.kuujo.copycat.CopycatConfig; -import net.kuujo.copycat.cluster.ClusterConfig; -import net.kuujo.copycat.cluster.Member; -import net.kuujo.copycat.cluster.Member.Type; -import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator; -import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator; -import net.kuujo.copycat.log.BufferedLog; -import net.kuujo.copycat.log.FileLog; -import net.kuujo.copycat.log.Log; -import net.kuujo.copycat.protocol.Consistency; -import net.kuujo.copycat.protocol.Protocol; -import net.kuujo.copycat.util.concurrent.NamedThreadFactory; - -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.ReferencePolicy; -import org.apache.felix.scr.annotations.Service; -import org.onosproject.app.ApplicationEvent; -import org.onosproject.app.ApplicationListener; -import org.onosproject.app.ApplicationService; -import org.onosproject.cluster.ClusterMetadataService; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.ControllerNode; -import org.onosproject.cluster.NodeId; -import org.onosproject.cluster.PartitionId; -import org.onosproject.core.ApplicationId; -import org.onosproject.persistence.PersistenceService; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.primitives.MapUpdate; -import org.onosproject.store.primitives.TransactionId; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.service.AsyncConsistentMap; -import org.onosproject.store.service.AtomicCounterBuilder; -import org.onosproject.store.service.AtomicValueBuilder; -import org.onosproject.store.service.ConsistentMapBuilder; -import org.onosproject.store.service.ConsistentMapException; -import org.onosproject.store.service.DistributedQueueBuilder; -import org.onosproject.store.service.DistributedSetBuilder; -import org.onosproject.store.service.EventuallyConsistentMapBuilder; -import org.onosproject.store.service.LeaderElectorBuilder; -import org.onosproject.store.service.MapInfo; -import org.onosproject.store.service.PartitionInfo; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.StorageAdminService; -import org.onosproject.store.service.StorageService; -import org.onosproject.store.service.TransactionContextBuilder; -import org.slf4j.Logger; - -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Futures; - -/** - * Database manager. - */ -@Component(immediate = true, enabled = false) -@Service -public class DatabaseManager implements StorageService, StorageAdminService { - - private final Logger log = getLogger(getClass()); - - public static final String BASE_PARTITION_NAME = "p0"; - - private static final int RAFT_ELECTION_TIMEOUT_MILLIS = 3000; - private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000; - - private ClusterCoordinator coordinator; - protected PartitionedDatabase partitionedDatabase; - protected Database inMemoryDatabase; - protected NodeId localNodeId; - - private TransactionManager transactionManager; - private final Supplier transactionIdGenerator = - () -> TransactionId.from(UUID.randomUUID().toString()); - - private ApplicationListener appListener = new InternalApplicationListener(); - - private final Multimap maps = - Multimaps.synchronizedMultimap(ArrayListMultimap.create()); - private final Multimap mapsByApplication = - Multimaps.synchronizedMultimap(ArrayListMultimap.create()); - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterMetadataService clusterMetadataService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterService clusterService; - - @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC) - protected ApplicationService applicationService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterCommunicationService clusterCommunicator; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected PersistenceService persistenceService; - - protected String nodeIdToUri(NodeId nodeId) { - ControllerNode node = clusterService.getNode(nodeId); - return String.format("onos://%s:%d", node.ip(), node.tcpPort()); - } - - protected void bindApplicationService(ApplicationService service) { - applicationService = service; - applicationService.addListener(appListener); - } - - protected void unbindApplicationService(ApplicationService service) { - applicationService.removeListener(appListener); - this.applicationService = null; - } - - @Activate - public void activate() { - localNodeId = clusterService.getLocalNode().id(); - - Map> partitionMap = Maps.newHashMap(); - clusterMetadataService.getClusterMetadata().getPartitions().forEach(p -> { - partitionMap.put(p.getId(), Sets.newHashSet(p.getMembers())); - }); - - String[] activeNodeUris = partitionMap.values() - .stream() - .reduce((s1, s2) -> Sets.union(s1, s2)) - .get() - .stream() - .map(this::nodeIdToUri) - .toArray(String[]::new); - - String localNodeUri = nodeIdToUri(clusterMetadataService.getLocalNode().id()); - Protocol protocol = new CopycatCommunicationProtocol(clusterService, clusterCommunicator); - - ClusterConfig clusterConfig = new ClusterConfig() - .withProtocol(protocol) - .withElectionTimeout(electionTimeoutMillis(activeNodeUris)) - .withHeartbeatInterval(heartbeatTimeoutMillis(activeNodeUris)) - .withMembers(activeNodeUris) - .withLocalMember(localNodeUri); - - CopycatConfig copycatConfig = new CopycatConfig() - .withName("onos") - .withClusterConfig(clusterConfig) - .withDefaultSerializer(new DatabaseSerializer()) - .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d"))); - - coordinator = new DefaultClusterCoordinator(copycatConfig.resolve()); - - Function logFunction = id -> id.asInt() == 0 ? newInMemoryLog() : newPersistentLog(); - - Map databases = Maps.transformEntries(partitionMap, (k, v) -> { - String[] replicas = v.stream().map(this::nodeIdToUri).toArray(String[]::new); - DatabaseConfig config = newDatabaseConfig(String.format("p%s", k), logFunction.apply(k), replicas); - return coordinator.getResource(config.getName(), config.resolve(clusterConfig) - .withSerializer(copycatConfig.getDefaultSerializer()) - .withDefaultExecutor(copycatConfig.getDefaultExecutor())); - }); - - inMemoryDatabase = databases.remove(PartitionId.from(0)); - - partitionedDatabase = new PartitionedDatabase("onos-store", databases.values()); - - CompletableFuture status = coordinator.open() - .thenCompose(v -> CompletableFuture.allOf(inMemoryDatabase.open(), partitionedDatabase.open()) - .whenComplete((db, error) -> { - if (error != null) { - log.error("Failed to initialize database.", error); - } else { - log.info("Successfully initialized database."); - } - })); - - Futures.getUnchecked(status); - - AsyncConsistentMap transactions = - this.consistentMapBuilder() - .withName("onos-transactions") - .withSerializer(Serializer.using(KryoNamespaces.API, - MapUpdate.class, - MapUpdate.Type.class, - Transaction.class, - Transaction.State.class)) - .buildAsyncMap(); - - transactionManager = new TransactionManager(partitionedDatabase, transactions); - partitionedDatabase.setTransactionManager(transactionManager); - - log.info("Started"); - } - - @Deactivate - public void deactivate() { - CompletableFuture.allOf(inMemoryDatabase.close(), partitionedDatabase.close()) - .thenCompose(v -> coordinator.close()) - .whenComplete((result, error) -> { - if (error != null) { - log.warn("Failed to cleanly close databases.", error); - } else { - log.info("Successfully closed databases."); - } - }); - ImmutableList.copyOf(maps.values()).forEach(this::unregisterMap); - if (applicationService != null) { - applicationService.removeListener(appListener); - } - log.info("Stopped"); - } - - @Override - public TransactionContextBuilder transactionContextBuilder() { - return new DefaultTransactionContextBuilder(this::consistentMapBuilder, - transactionManager::execute, - transactionIdGenerator.get()); - } - - @Override - public List getPartitionInfo() { - return Lists.asList( - inMemoryDatabase, - partitionedDatabase.getPartitions().toArray(new Database[]{})) - .stream() - .map(DatabaseManager::toPartitionInfo) - .collect(Collectors.toList()); - } - - private Log newPersistentLog() { - String logDir = System.getProperty("karaf.data", "./data"); - return new FileLog() - .withDirectory(logDir) - .withSegmentSize(1073741824) // 1GB - .withFlushOnWrite(true) - .withSegmentInterval(Long.MAX_VALUE); - } - - private Log newInMemoryLog() { - return new BufferedLog() - .withFlushOnWrite(false) - .withFlushInterval(Long.MAX_VALUE) - .withSegmentSize(10485760) // 10MB - .withSegmentInterval(Long.MAX_VALUE); - } - - private DatabaseConfig newDatabaseConfig(String name, Log log, String[] replicas) { - return new DatabaseConfig() - .withName(name) - .withElectionTimeout(electionTimeoutMillis(replicas)) - .withHeartbeatInterval(heartbeatTimeoutMillis(replicas)) - .withConsistency(Consistency.DEFAULT) - .withLog(log) - .withDefaultSerializer(new DatabaseSerializer()) - .withReplicas(replicas); - } - - private long electionTimeoutMillis(String[] replicas) { - return replicas.length == 1 ? 10L : RAFT_ELECTION_TIMEOUT_MILLIS; - } - - private long heartbeatTimeoutMillis(String[] replicas) { - return electionTimeoutMillis(replicas) / 2; - } - - /** - * Maps a Raft Database object to a PartitionInfo object. - * - * @param database database containing input data - * @return PartitionInfo object - */ - private static PartitionInfo toPartitionInfo(Database database) { - return new PartitionInfo(database.name(), - database.cluster().term(), - database.cluster().members() - .stream() - .filter(member -> Type.ACTIVE.equals(member.type())) - .map(Member::uri) - .sorted() - .collect(Collectors.toList()), - database.cluster().leader() != null ? - database.cluster().leader().uri() : null); - } - - - @Override - public EventuallyConsistentMapBuilder eventuallyConsistentMapBuilder() { - return new EventuallyConsistentMapBuilderImpl<>(clusterService, - clusterCommunicator, - persistenceService); - } - - @Override - public ConsistentMapBuilder consistentMapBuilder() { - return new DefaultConsistentMapBuilder<>(this); - } - - @Override - public DistributedSetBuilder setBuilder() { - return new DefaultDistributedSetBuilder<>(() -> this.consistentMapBuilder()); - } - - - @Override - public DistributedQueueBuilder queueBuilder() { - return new DefaultDistributedQueueBuilder<>(this); - } - - @Override - public AtomicCounterBuilder atomicCounterBuilder() { - return new DefaultAtomicCounterBuilder(inMemoryDatabase, partitionedDatabase); - } - - @Override - public AtomicValueBuilder atomicValueBuilder() { - Supplier> mapBuilderSupplier = - () -> this.consistentMapBuilder() - .withName("onos-atomic-values") - .withMeteringDisabled() - .withSerializer(Serializer.using(KryoNamespaces.BASIC)); - return new DefaultAtomicValueBuilder<>(mapBuilderSupplier); - } - - @Override - public LeaderElectorBuilder leaderElectorBuilder() { - throw new UnsupportedOperationException(); - } - - @Override - public List getMapInfo() { - List maps = Lists.newArrayList(); - maps.addAll(getMapInfo(inMemoryDatabase)); - maps.addAll(getMapInfo(partitionedDatabase)); - return maps; - } - - private List getMapInfo(Database database) { - return complete(database.maps()) - .stream() - .map(name -> new MapInfo(name, complete(database.mapSize(name)))) - .filter(info -> info.size() > 0) - .collect(Collectors.toList()); - } - - - @Override - public Map getCounters() { - Map counters = Maps.newHashMap(); - counters.putAll(complete(inMemoryDatabase.counters())); - counters.putAll(complete(partitionedDatabase.counters())); - return counters; - } - - @Override - public Map getPartitionedDatabaseCounters() { - Map counters = Maps.newHashMap(); - counters.putAll(complete(partitionedDatabase.counters())); - return counters; - } - - @Override - public Map getInMemoryDatabaseCounters() { - Map counters = Maps.newHashMap(); - counters.putAll(complete(inMemoryDatabase.counters())); - return counters; - } - - @Override - public Collection getPendingTransactions() { - return complete(transactionManager.getPendingTransactionIds()); - } - - private static T complete(CompletableFuture future) { - try { - return future.get(DATABASE_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ConsistentMapException.Interrupted(); - } catch (TimeoutException e) { - throw new ConsistentMapException.Timeout(); - } catch (ExecutionException e) { - throw new ConsistentMapException(e.getCause()); - } - } - - protected DefaultAsyncConsistentMap registerMap(DefaultAsyncConsistentMap map) { - maps.put(map.name(), map); - if (map.applicationId() != null) { - mapsByApplication.put(map.applicationId(), map); - } - return map; - } - - protected void unregisterMap(DefaultAsyncConsistentMap map) { - maps.remove(map.name(), map); - if (map.applicationId() != null) { - mapsByApplication.remove(map.applicationId(), map); - } - } - - private class InternalApplicationListener implements ApplicationListener { - @Override - public void event(ApplicationEvent event) { - if (event.type() == APP_UNINSTALLED || event.type() == APP_DEACTIVATED) { - ApplicationId appId = event.subject().id(); - List mapsToRemove; - synchronized (mapsByApplication) { - mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId)); - } - mapsToRemove.forEach(DatabaseManager.this::unregisterMap); - if (event.type() == APP_UNINSTALLED) { - mapsToRemove.stream().filter(map -> map.purgeOnUninstall()).forEach(map -> map.clear()); - } - } - } - } -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabasePartitioner.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabasePartitioner.java deleted file mode 100644 index f5ccddae33..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabasePartitioner.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.primitives.impl; - -import static com.google.common.base.Preconditions.checkState; - -import java.util.List; -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableList; -import com.google.common.hash.Hashing; - -/** - * Partitioner for mapping map entries to individual database partitions. - *

- * By default a md5 hash of the hash key (key or map name) is used to pick a - * partition. - */ -public abstract class DatabasePartitioner implements Partitioner { - // Database partitions sorted by their partition name. - protected final List partitions; - - public DatabasePartitioner(List partitions) { - checkState(partitions != null && !partitions.isEmpty(), "Partitions cannot be null or empty"); - this.partitions = ImmutableList.copyOf(partitions); - } - - protected int hash(String key) { - return Math.abs(Hashing.md5().newHasher().putBytes(key.getBytes(Charsets.UTF_8)).hash().asInt()); - } - -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseProxy.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseProxy.java deleted file mode 100644 index 83525284b2..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseProxy.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.primitives.impl; - -import org.onlab.util.Match; -import org.onosproject.store.service.Versioned; - -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - -/** - * Database proxy. - */ -public interface DatabaseProxy { - - /** - * Returns a set of all map names. - * - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture> maps(); - - /** - * Returns a mapping from counter name to next value. - * - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture> counters(); - - /** - * Returns the number of entries in map. - * - * @param mapName map name - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture mapSize(String mapName); - - /** - * Checks whether the map is empty. - * - * @param mapName map name - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture mapIsEmpty(String mapName); - - /** - * Checks whether the map contains a key. - * - * @param mapName map name - * @param key key to check. - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture mapContainsKey(String mapName, K key); - - /** - * Checks whether the map contains a value. - * - * @param mapName map name - * @param value The value to check. - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture mapContainsValue(String mapName, V value); - - /** - * Gets a value from the map. - * - * @param mapName map name - * @param key The key to get. - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture> mapGet(String mapName, K key); - - /** - * Updates the map. - * - * @param mapName map name - * @param key The key to set - * @param valueMatch match for checking existing value - * @param versionMatch match for checking existing version - * @param value new value - * @return A completable future to be completed with the result once complete - */ - CompletableFuture>> mapUpdate( - String mapName, K key, Match valueMatch, Match versionMatch, V value); - - /** - * Clears the map. - * - * @param mapName map name - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture> mapClear(String mapName); - - /** - * Gets a set of keys in the map. - * - * @param mapName map name - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture> mapKeySet(String mapName); - - /** - * Gets a collection of values in the map. - * - * @param mapName map name - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture>> mapValues(String mapName); - - /** - * Gets a set of entries in the map. - * - * @param mapName map name - * @return A completable future to be completed with the result once complete. - */ - CompletableFuture>>> mapEntrySet(String mapName); - - /** - * Atomically add the given value to current value of the specified counter. - * - * @param counterName counter name - * @param delta value to add - * @return updated value - */ - CompletableFuture counterAddAndGet(String counterName, long delta); - - /** - * Atomically add the given value to current value of the specified counter. - * - * @param counterName counter name - * @param delta value to add - * @return previous value - */ - CompletableFuture counterGetAndAdd(String counterName, long delta); - - - /** - * Atomically sets the given value to current value of the specified counter. - * - * @param counterName counter name - * @param value value to set - * @return void future - */ - CompletableFuture counterSet(String counterName, long value); - - /** - * Atomically sets the given counter to the specified update value if and only if the current value is equal to the - * expected value. - * @param counterName counter name - * @param expectedValue value to use for equivalence check - * @param update value to set if expected value is current value - * @return true if an update occurred, false otherwise - */ - CompletableFuture counterCompareAndSet(String counterName, long expectedValue, long update); - - /** - * Returns the current value of the specified atomic counter. - * - * @param counterName counter name - * @return current value - */ - CompletableFuture counterGet(String counterName); - - /** - * Returns the size of queue. - * - * @param queueName queue name - * @return queue size - */ - CompletableFuture queueSize(String queueName); - - /** - * Inserts an entry into the queue. - * - * @param queueName queue name - * @param entry queue entry - * @return void future - */ - CompletableFuture queuePush(String queueName, byte[] entry); - - /** - * Removes an entry from the queue if the queue is non-empty. - * - * @param queueName queue name - * @return entry future. Can be completed with null if queue is empty - */ - CompletableFuture queuePop(String queueName); - - /** - * Returns but does not remove an entry from the queue. - * - * @param queueName queue name - * @return entry. Can be null if queue is empty - */ - CompletableFuture queuePeek(String queueName); - - /** - * Prepare and commit the specified transaction. - * - * @param transaction transaction to commit (after preparation) - * @return A completable future to be completed with the result once complete - */ - CompletableFuture prepareAndCommit(Transaction transaction); - - /** - * Prepare the specified transaction for commit. A successful prepare implies - * all the affected resources are locked thus ensuring no concurrent updates can interfere. - * - * @param transaction transaction to prepare (for commit) - * @return A completable future to be completed with the result once complete. The future is completed - * with true if the transaction is successfully prepared i.e. all pre-conditions are met and - * applicable resources locked. - */ - CompletableFuture prepare(Transaction transaction); - - /** - * Commit the specified transaction. A successful commit implies - * all the updates are applied, are now durable and are now visible externally. - * - * @param transaction transaction to commit - * @return A completable future to be completed with the result once complete - */ - CompletableFuture commit(Transaction transaction); - - /** - * Rollback the specified transaction. A successful rollback implies - * all previously acquired locks for the affected resources are released. - * - * @param transaction transaction to rollback - * @return A completable future to be completed with the result once complete - */ - CompletableFuture rollback(Transaction transaction); -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java deleted file mode 100644 index ec9b9262bf..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseSerializer.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.primitives.impl; - -import java.nio.ByteBuffer; - -import org.onlab.util.KryoNamespace; -import org.onlab.util.Match; -import org.onosproject.cluster.NodeId; -import org.onosproject.store.primitives.MapUpdate; -import org.onosproject.store.primitives.TransactionId; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.serializers.KryoSerializer; -import org.onosproject.store.service.Versioned; - -import net.kuujo.copycat.cluster.internal.MemberInfo; -import net.kuujo.copycat.raft.protocol.AppendRequest; -import net.kuujo.copycat.raft.protocol.AppendResponse; -import net.kuujo.copycat.raft.protocol.CommitRequest; -import net.kuujo.copycat.raft.protocol.CommitResponse; -import net.kuujo.copycat.raft.protocol.PollRequest; -import net.kuujo.copycat.raft.protocol.PollResponse; -import net.kuujo.copycat.raft.protocol.QueryRequest; -import net.kuujo.copycat.raft.protocol.QueryResponse; -import net.kuujo.copycat.raft.protocol.ReplicaInfo; -import net.kuujo.copycat.raft.protocol.SyncRequest; -import net.kuujo.copycat.raft.protocol.SyncResponse; -import net.kuujo.copycat.raft.protocol.VoteRequest; -import net.kuujo.copycat.raft.protocol.VoteResponse; -import net.kuujo.copycat.util.serializer.SerializerConfig; - -/** - * Serializer for DatabaseManager's interaction with Copycat. - */ -public class DatabaseSerializer extends SerializerConfig { - - private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder() - .nextId(KryoNamespace.FLOATING_ID) - .register(AppendRequest.class) - .register(AppendResponse.class) - .register(SyncRequest.class) - .register(SyncResponse.class) - .register(VoteRequest.class) - .register(VoteResponse.class) - .register(PollRequest.class) - .register(PollResponse.class) - .register(QueryRequest.class) - .register(QueryResponse.class) - .register(CommitRequest.class) - .register(CommitResponse.class) - .register(ReplicaInfo.class) - .register(MemberInfo.class) - .build(); - - private static final KryoNamespace ONOS_STORE = KryoNamespace.newBuilder() - .nextId(KryoNamespace.FLOATING_ID) - .register(Versioned.class) - .register(MapUpdate.class) - .register(MapUpdate.Type.class) - .register(Result.class) - .register(UpdateResult.class) - .register(Result.Status.class) - .register(Transaction.class) - .register(Transaction.State.class) - .register(TransactionId.class) - .register(org.onosproject.store.primitives.impl.CommitResponse.class) - .register(Match.class) - .register(NodeId.class) - .build(); - - private static final KryoSerializer SERIALIZER = new KryoSerializer() { - @Override - protected void setupKryoPool() { - serializerPool = KryoNamespace.newBuilder() - .register(KryoNamespaces.BASIC) - .register(COPYCAT) - .register(ONOS_STORE) - .build(); - } - }; - - @Override - public ByteBuffer writeObject(Object object) { - return ByteBuffer.wrap(SERIALIZER.encode(object)); - } - - @Override - public T readObject(ByteBuffer buffer) { - return SERIALIZER.decode(buffer); - } -} \ No newline at end of file diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseState.java deleted file mode 100644 index d9ab7e5a49..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseState.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.primitives.impl; - -import net.kuujo.copycat.state.Command; -import net.kuujo.copycat.state.Initializer; -import net.kuujo.copycat.state.Query; -import net.kuujo.copycat.state.StateContext; - -import org.onlab.util.Match; -import org.onosproject.store.service.Versioned; - -import java.util.Collection; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -/** - * Database state. - * - */ -public interface DatabaseState { - - /** - * Initializes the database state. - * - * @param context The map state context. - */ - @Initializer - void init(StateContext> context); - - @Query - Set maps(); - - @Query - Map counters(); - - @Query - int mapSize(String mapName); - - @Query - boolean mapIsEmpty(String mapName); - - @Query - boolean mapContainsKey(String mapName, K key); - - @Query - boolean mapContainsValue(String mapName, V value); - - @Query - Versioned mapGet(String mapName, K key); - - @Command - Result> mapUpdate(String mapName, K key, Match valueMatch, Match versionMatch, V value); - - @Command - Result mapClear(String mapName); - - @Query - Set mapKeySet(String mapName); - - @Query - Collection> mapValues(String mapName); - - @Query - Set>> mapEntrySet(String mapName); - - @Command - Long counterAddAndGet(String counterName, long delta); - - @Command - Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue); - - @Command - Long counterGetAndAdd(String counterName, long delta); - - @Query - Long queueSize(String queueName); - - @Query - byte[] queuePeek(String queueName); - - @Command - byte[] queuePop(String queueName); - - @Command - void queuePush(String queueName, byte[] entry); - - @Query - Long counterGet(String counterName); - - @Command - void counterSet(String counterName, long value); - - @Command - CommitResponse prepareAndCommit(Transaction transaction); - - @Command - boolean prepare(Transaction transaction); - - @Command - CommitResponse commit(Transaction transaction); - - @Command - boolean rollback(Transaction transaction); -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicCounter.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicCounter.java deleted file mode 100644 index 6a792922ca..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicCounter.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import org.onosproject.store.service.AsyncAtomicCounter; -import org.onosproject.utils.MeteringAgent; - -import java.util.concurrent.CompletableFuture; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Default implementation for a distributed AsyncAtomicCounter backed by - * partitioned Raft DB. - *

- * The initial value will be zero. - */ -public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { - - private final String name; - private final Database database; - private final MeteringAgent monitor; - - private static final String PRIMITIVE_NAME = "atomicCounter"; - private static final String INCREMENT_AND_GET = "incrementAndGet"; - private static final String GET_AND_INCREMENT = "getAndIncrement"; - private static final String GET_AND_ADD = "getAndAdd"; - private static final String ADD_AND_GET = "addAndGet"; - private static final String GET = "get"; - private static final String SET = "set"; - private static final String COMPARE_AND_SET = "compareAndSet"; - - public DefaultAsyncAtomicCounter(String name, - Database database, - boolean meteringEnabled) { - this.name = checkNotNull(name); - this.database = checkNotNull(database); - this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled); - } - - @Override - public String name() { - return name; - } - - @Override - public CompletableFuture incrementAndGet() { - final MeteringAgent.Context timer = monitor.startTimer(INCREMENT_AND_GET); - return addAndGet(1L) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture get() { - final MeteringAgent.Context timer = monitor.startTimer(GET); - return database.counterGet(name) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture getAndIncrement() { - final MeteringAgent.Context timer = monitor.startTimer(GET_AND_INCREMENT); - return getAndAdd(1L) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture getAndAdd(long delta) { - final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD); - return database.counterGetAndAdd(name, delta) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture addAndGet(long delta) { - final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET); - return database.counterAddAndGet(name, delta) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture set(long value) { - final MeteringAgent.Context timer = monitor.startTimer(SET); - return database.counterSet(name, value) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture compareAndSet(long expectedValue, long updateValue) { - final MeteringAgent.Context timer = monitor.startTimer(COMPARE_AND_SET); - return database.counterCompareAndSet(name, expectedValue, updateValue) - .whenComplete((r, e) -> timer.stop(e)); - } -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java deleted file mode 100644 index 97cd23b6d1..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMap.java +++ /dev/null @@ -1,523 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.primitives.impl; - -import static com.google.common.base.Preconditions.checkNotNull; -import static org.onosproject.store.primitives.impl.StateMachineUpdate.Target.MAP_UPDATE; -import static org.onosproject.store.primitives.impl.StateMachineUpdate.Target.TX_COMMIT; -import static org.slf4j.LoggerFactory.getLogger; - -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; - -import org.onlab.util.HexString; -import org.onlab.util.Match; -import org.onlab.util.SharedExecutors; -import org.onlab.util.Tools; -import org.onosproject.core.ApplicationId; -import org.onosproject.store.primitives.TransactionId; -import org.onosproject.store.service.AsyncConsistentMap; -import org.onosproject.store.service.ConsistentMapException; -import org.onosproject.store.service.ConsistentMapException.ConcurrentModification; -import org.onosproject.store.service.MapEvent; -import org.onosproject.store.service.MapEventListener; -import org.onosproject.store.service.MapTransaction; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.Versioned; -import org.onosproject.utils.MeteringAgent; -import org.slf4j.Logger; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.Maps; - -/** - * AsyncConsistentMap implementation that is backed by a Raft consensus - * based database. - * - * @param type of key. - * @param type of value. - */ -public class DefaultAsyncConsistentMap implements AsyncConsistentMap { - - private final String name; - private final ApplicationId applicationId; - private final Database database; - private final Serializer serializer; - private final boolean readOnly; - private final boolean purgeOnUninstall; - - private static final String PRIMITIVE_NAME = "consistentMap"; - private static final String SIZE = "size"; - private static final String IS_EMPTY = "isEmpty"; - private static final String CONTAINS_KEY = "containsKey"; - private static final String CONTAINS_VALUE = "containsValue"; - private static final String GET = "get"; - private static final String COMPUTE_IF = "computeIf"; - private static final String PUT = "put"; - private static final String PUT_AND_GET = "putAndGet"; - private static final String PUT_IF_ABSENT = "putIfAbsent"; - private static final String REMOVE = "remove"; - private static final String CLEAR = "clear"; - private static final String KEY_SET = "keySet"; - private static final String VALUES = "values"; - private static final String ENTRY_SET = "entrySet"; - private static final String REPLACE = "replace"; - private static final String COMPUTE_IF_ABSENT = "computeIfAbsent"; - - private final Set> listeners = new CopyOnWriteArraySet<>(); - - private final Logger log = getLogger(getClass()); - private final MeteringAgent monitor; - - private static final String ERROR_NULL_KEY = "Key cannot be null"; - private static final String ERROR_NULL_VALUE = "Null values are not allowed"; - - // String representation of serialized byte[] -> original key Object - private final LoadingCache keyCache = CacheBuilder.newBuilder() - .softValues() - .build(new CacheLoader() { - - @Override - public K load(String key) { - return serializer.decode(HexString.fromHexString(key)); - } - }); - - protected String sK(K key) { - String s = HexString.toHexString(serializer.encode(key)); - keyCache.put(s, key); - return s; - } - - protected K dK(String key) { - return keyCache.getUnchecked(key); - } - - public DefaultAsyncConsistentMap(String name, - ApplicationId applicationId, - Database database, - Serializer serializer, - boolean readOnly, - boolean purgeOnUninstall, - boolean meteringEnabled) { - this.name = checkNotNull(name, "map name cannot be null"); - this.applicationId = applicationId; - this.database = checkNotNull(database, "database cannot be null"); - this.serializer = checkNotNull(serializer, "serializer cannot be null"); - this.readOnly = readOnly; - this.purgeOnUninstall = purgeOnUninstall; - this.database.registerConsumer(update -> { - SharedExecutors.getSingleThreadExecutor().execute(() -> { - if (listeners.isEmpty()) { - return; - } - try { - if (update.target() == MAP_UPDATE) { - Result> result = update.output(); - if (result.success() && result.value().mapName().equals(name)) { - MapEvent mapEvent = result.value() - .map(this::dK, - v -> serializer.decode(Tools.copyOf(v))) - .toMapEvent(); - notifyListeners(mapEvent); - } - } else if (update.target() == TX_COMMIT) { - CommitResponse response = update.output(); - if (response.success()) { - response.updates().forEach(u -> { - if (u.mapName().equals(name)) { - MapEvent mapEvent = - u.map(this::dK, - v -> serializer.decode(Tools.copyOf(v))) - .toMapEvent(); - notifyListeners(mapEvent); - } - }); - } - } - } catch (Exception e) { - log.warn("Error notifying listeners", e); - } - }); - }); - this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled); - } - - /** - * Returns this map name. - * @return map name - */ - @Override - public String name() { - return name; - } - - /** - * Returns the serializer for map entries. - * @return map entry serializer - */ - public Serializer serializer() { - return serializer; - } - - /** - * Returns the applicationId owning this map. - * @return application Id - */ - @Override - public ApplicationId applicationId() { - return applicationId; - } - - /** - * Returns whether the map entries should be purged when the application - * owning it is uninstalled. - * @return true is map needs to cleared on app uninstall; false otherwise - */ - public boolean purgeOnUninstall() { - return purgeOnUninstall; - } - - @Override - public CompletableFuture size() { - final MeteringAgent.Context timer = monitor.startTimer(SIZE); - return database.mapSize(name) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture isEmpty() { - final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY); - return database.mapIsEmpty(name) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture containsKey(K key) { - checkNotNull(key, ERROR_NULL_KEY); - final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_KEY); - return database.mapContainsKey(name, sK(key)) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture containsValue(V value) { - checkNotNull(value, ERROR_NULL_VALUE); - final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_VALUE); - return database.mapContainsValue(name, serializer.encode(value)) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture> get(K key) { - checkNotNull(key, ERROR_NULL_KEY); - final MeteringAgent.Context timer = monitor.startTimer(GET); - return database.mapGet(name, sK(key)) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> v != null ? v.map(serializer::decode) : null); - } - - @Override - public CompletableFuture> computeIfAbsent(K key, - Function mappingFunction) { - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(mappingFunction, "Mapping function cannot be null"); - final MeteringAgent.Context timer = monitor.startTimer(COMPUTE_IF_ABSENT); - return updateAndGet(key, Match.ifNull(), Match.any(), mappingFunction.apply(key)) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> v.newValue()); - } - - @Override - public CompletableFuture> computeIfPresent(K key, - BiFunction remappingFunction) { - return computeIf(key, Objects::nonNull, remappingFunction); - } - - @Override - public CompletableFuture> compute(K key, - BiFunction remappingFunction) { - return computeIf(key, v -> true, remappingFunction); - } - - @Override - public CompletableFuture> computeIf(K key, - Predicate condition, - BiFunction remappingFunction) { - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(condition, "predicate function cannot be null"); - checkNotNull(remappingFunction, "Remapping function cannot be null"); - final MeteringAgent.Context timer = monitor.startTimer(COMPUTE_IF); - return get(key).thenCompose(r1 -> { - V existingValue = r1 == null ? null : r1.value(); - // if the condition evaluates to false, return existing value. - if (!condition.test(existingValue)) { - return CompletableFuture.completedFuture(r1); - } - - AtomicReference computedValue = new AtomicReference<>(); - // if remappingFunction throws an exception, return the exception. - try { - computedValue.set(remappingFunction.apply(key, existingValue)); - } catch (Exception e) { - return Tools.exceptionalFuture(e); - } - if (computedValue.get() == null && r1 == null) { - return CompletableFuture.completedFuture(null); - } - Match valueMatcher = r1 == null ? Match.ifNull() : Match.any(); - Match versionMatcher = r1 == null ? Match.any() : Match.ifValue(r1.version()); - return updateAndGet(key, valueMatcher, versionMatcher, computedValue.get()) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> { - if (v.updated()) { - return v.newValue(); - } else { - throw new ConcurrentModification("Concurrent update to " + name + " detected"); - } - }); - }); - } - - @Override - public CompletableFuture> put(K key, V value) { - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(value, ERROR_NULL_VALUE); - final MeteringAgent.Context timer = monitor.startTimer(PUT); - return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.oldValue()) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture> putAndGet(K key, V value) { - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(value, ERROR_NULL_VALUE); - final MeteringAgent.Context timer = monitor.startTimer(PUT_AND_GET); - return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.newValue()) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture> remove(K key) { - checkNotNull(key, ERROR_NULL_KEY); - final MeteringAgent.Context timer = monitor.startTimer(REMOVE); - return updateAndGet(key, Match.any(), Match.any(), null).thenApply(v -> v.oldValue()) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture clear() { - checkIfUnmodifiable(); - final MeteringAgent.Context timer = monitor.startTimer(CLEAR); - return database.mapClear(name).thenApply(this::unwrapResult) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture> keySet() { - final MeteringAgent.Context timer = monitor.startTimer(KEY_SET); - return database.mapKeySet(name) - .thenApply(s -> newMappingKeySet(s)) - .whenComplete((r, e) -> timer.stop(e)); - } - - @Override - public CompletableFuture>> values() { - final MeteringAgent.Context timer = monitor.startTimer(VALUES); - return database.mapValues(name) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(c -> c - .stream() - .map(v -> v.map(serializer::decode)) - .collect(Collectors.toList())); - } - - @Override - public CompletableFuture>>> entrySet() { - final MeteringAgent.Context timer = monitor.startTimer(ENTRY_SET); - return database.mapEntrySet(name) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(s -> newMappingEntrySet(s)); - } - - @Override - public CompletableFuture> putIfAbsent(K key, V value) { - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(value, ERROR_NULL_VALUE); - final MeteringAgent.Context timer = monitor.startTimer(PUT_IF_ABSENT); - return updateAndGet(key, Match.ifNull(), Match.any(), value) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> v.oldValue()); - } - - @Override - public CompletableFuture remove(K key, V value) { - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(value, ERROR_NULL_VALUE); - final MeteringAgent.Context timer = monitor.startTimer(REMOVE); - return updateAndGet(key, Match.ifValue(value), Match.any(), null) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> v.updated()); - } - - @Override - public CompletableFuture remove(K key, long version) { - checkNotNull(key, ERROR_NULL_KEY); - final MeteringAgent.Context timer = monitor.startTimer(REMOVE); - return updateAndGet(key, Match.any(), Match.ifValue(version), null) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> v.updated()); - } - - @Override - public CompletableFuture> replace(K key, V value) { - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(value, ERROR_NULL_VALUE); - final MeteringAgent.Context timer = monitor.startTimer(REPLACE); - return updateAndGet(key, Match.ifNotNull(), Match.any(), value) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> v.oldValue()); - } - - @Override - public CompletableFuture replace(K key, V oldValue, V newValue) { - checkNotNull(key, ERROR_NULL_KEY); - checkNotNull(oldValue, ERROR_NULL_VALUE); - checkNotNull(newValue, ERROR_NULL_VALUE); - final MeteringAgent.Context timer = monitor.startTimer(REPLACE); - return updateAndGet(key, Match.ifValue(oldValue), Match.any(), newValue) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> v.updated()); - } - - @Override - public CompletableFuture replace(K key, long oldVersion, V newValue) { - final MeteringAgent.Context timer = monitor.startTimer(REPLACE); - return updateAndGet(key, Match.any(), Match.ifValue(oldVersion), newValue) - .whenComplete((r, e) -> timer.stop(e)) - .thenApply(v -> v.updated()); - } - - /** - * Pre-update hook for performing required checks/actions before going forward with an update operation. - * @param key map key. - */ - protected void beforeUpdate(K key) { - checkIfUnmodifiable(); - } - - private Set newMappingKeySet(Set s) { - return new MappingSet<>(s, Collections::unmodifiableSet, - this::sK, this::dK); - } - - private Set>> newMappingEntrySet(Set>> s) { - return new MappingSet<>(s, Collections::unmodifiableSet, - this::reverseMapRawEntry, this::mapRawEntry); - } - - private Map.Entry> mapRawEntry(Map.Entry> e) { - return Maps.immutableEntry(dK(e.getKey()), e.getValue().map(serializer::decode)); - } - - private Map.Entry> reverseMapRawEntry(Map.Entry> e) { - return Maps.immutableEntry(sK(e.getKey()), e.getValue().map(serializer::encode)); - } - - private CompletableFuture> updateAndGet(K key, - Match oldValueMatch, - Match oldVersionMatch, - V value) { - beforeUpdate(key); - return database.mapUpdate(name, - sK(key), - oldValueMatch.map(serializer::encode), - oldVersionMatch, - value == null ? null : serializer.encode(value)) - .thenApply(this::unwrapResult) - .thenApply(r -> r.map(this::dK, serializer::decode)); - } - - private T unwrapResult(Result result) { - if (result.status() == Result.Status.LOCKED) { - throw new ConsistentMapException.ConcurrentModification(); - } else if (result.success()) { - return result.value(); - } else { - throw new IllegalStateException("Must not be here"); - } - } - - private void checkIfUnmodifiable() { - if (readOnly) { - throw new UnsupportedOperationException(); - } - } - - @Override - public CompletableFuture addListener(MapEventListener listener) { - listeners.add(listener); - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture removeListener(MapEventListener listener) { - listeners.remove(listener); - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture prepare(MapTransaction transaction) { - return Tools.exceptionalFuture(new UnsupportedOperationException()); - } - - @Override - public CompletableFuture commit(TransactionId transactionId) { - return Tools.exceptionalFuture(new UnsupportedOperationException()); - } - - @Override - public CompletableFuture rollback(TransactionId transactionId) { - return Tools.exceptionalFuture(new UnsupportedOperationException()); - } - - protected void notifyListeners(MapEvent event) { - if (event == null) { - return; - } - listeners.forEach(listener -> { - try { - listener.event(event); - } catch (Exception e) { - log.warn("Failure notifying listener about {}", event, e); - } - }); - } -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java deleted file mode 100644 index 3bd6bf6c7f..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import org.onosproject.store.service.AsyncAtomicCounter; -import org.onosproject.store.service.AtomicCounterBuilder; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Default implementation of AtomicCounterBuilder. - */ -public class DefaultAtomicCounterBuilder extends AtomicCounterBuilder { - - private final Database partitionedDatabase; - private final Database inMemoryDatabase; - - public DefaultAtomicCounterBuilder(Database inMemoryDatabase, Database partitionedDatabase) { - this.inMemoryDatabase = inMemoryDatabase; - this.partitionedDatabase = partitionedDatabase; - } - - @Override - public AsyncAtomicCounter build() { - Database database = partitionsDisabled() ? inMemoryDatabase : partitionedDatabase; - return new DefaultAsyncAtomicCounter(checkNotNull(name()), database, meteringEnabled()); - } -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java deleted file mode 100644 index 5a81ca6868..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import org.onosproject.store.service.AsyncConsistentMap; -import org.onosproject.store.service.ConsistentMap; -import org.onosproject.store.service.ConsistentMapBuilder; - -import static com.google.common.base.Preconditions.checkState; - -/** - * Default Consistent Map builder. - * - * @param type for map key - * @param type for map value - */ -public class DefaultConsistentMapBuilder extends ConsistentMapBuilder { - - private final DatabaseManager manager; - - public DefaultConsistentMapBuilder(DatabaseManager manager) { - this.manager = manager; - } - - private void validateInputs() { - checkState(name() != null, "name must be specified"); - checkState(serializer() != null, "serializer must be specified"); - if (purgeOnUninstall()) { - checkState(applicationId() != null, "ApplicationId must be specified when purgeOnUninstall is enabled"); - } - } - - @Override - public ConsistentMap build() { - return buildAndRegisterMap().asConsistentMap(); - } - - @Override - public AsyncConsistentMap buildAsyncMap() { - return buildAndRegisterMap(); - } - - private DefaultAsyncConsistentMap buildAndRegisterMap() { - validateInputs(); - Database database = partitionsDisabled() ? manager.inMemoryDatabase : manager.partitionedDatabase; - if (relaxedReadConsistency()) { - return manager.registerMap( - new AsyncCachingConsistentMap<>(name(), - applicationId(), - database, - serializer(), - readOnly(), - purgeOnUninstall(), - meteringEnabled())); - } else { - return manager.registerMap( - new DefaultAsyncConsistentMap<>(name(), - applicationId(), - database, - serializer(), - readOnly(), - purgeOnUninstall(), - meteringEnabled())); - } - } -} \ No newline at end of file diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabase.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabase.java deleted file mode 100644 index e4de2964ad..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabase.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.primitives.impl; - -import com.google.common.collect.Sets; - -import net.kuujo.copycat.resource.internal.AbstractResource; -import net.kuujo.copycat.resource.internal.ResourceManager; -import net.kuujo.copycat.state.StateMachine; -import net.kuujo.copycat.state.internal.DefaultStateMachine; -import net.kuujo.copycat.util.concurrent.Futures; -import net.kuujo.copycat.util.function.TriConsumer; - -import org.onlab.util.Match; -import org.onosproject.store.service.Versioned; - -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; -import java.util.function.Supplier; - -/** - * Default database. - */ -public class DefaultDatabase extends AbstractResource implements Database { - private final StateMachine> stateMachine; - private DatabaseProxy proxy; - private final Set> consumers = Sets.newCopyOnWriteArraySet(); - private final TriConsumer watcher = new InternalStateMachineWatcher(); - - @SuppressWarnings({"unchecked", "rawtypes"}) - public DefaultDatabase(ResourceManager context) { - super(context); - this.stateMachine = new DefaultStateMachine(context, - DatabaseState.class, - DefaultDatabaseState.class, - DefaultDatabase.class.getClassLoader()); - this.stateMachine.addStartupTask(() -> { - stateMachine.registerWatcher(watcher); - return CompletableFuture.completedFuture(null); - }); - this.stateMachine.addShutdownTask(() -> { - stateMachine.unregisterWatcher(watcher); - return CompletableFuture.completedFuture(null); - }); - } - - /** - * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to - * return the completed future result. - * - * @param supplier The supplier to call if the database is open. - * @param The future result type. - * @return A completable future that if this database is closed is immediately failed. - */ - protected CompletableFuture checkOpen(Supplier> supplier) { - if (proxy == null) { - return Futures.exceptionalFuture(new IllegalStateException("Database closed")); - } - return supplier.get(); - } - - @Override - public CompletableFuture> maps() { - return checkOpen(() -> proxy.maps()); - } - - @Override - public CompletableFuture> counters() { - return checkOpen(() -> proxy.counters()); - } - - @Override - public CompletableFuture mapSize(String mapName) { - return checkOpen(() -> proxy.mapSize(mapName)); - } - - @Override - public CompletableFuture mapIsEmpty(String mapName) { - return checkOpen(() -> proxy.mapIsEmpty(mapName)); - } - - @Override - public CompletableFuture mapContainsKey(String mapName, String key) { - return checkOpen(() -> proxy.mapContainsKey(mapName, key)); - } - - @Override - public CompletableFuture mapContainsValue(String mapName, byte[] value) { - return checkOpen(() -> proxy.mapContainsValue(mapName, value)); - } - - @Override - public CompletableFuture> mapGet(String mapName, String key) { - return checkOpen(() -> proxy.mapGet(mapName, key)); - } - - @Override - public CompletableFuture>> mapUpdate( - String mapName, String key, Match valueMatch, Match versionMatch, byte[] value) { - return checkOpen(() -> proxy.mapUpdate(mapName, key, valueMatch, versionMatch, value)); - } - - @Override - public CompletableFuture> mapClear(String mapName) { - return checkOpen(() -> proxy.mapClear(mapName)); - } - - @Override - public CompletableFuture> mapKeySet(String mapName) { - return checkOpen(() -> proxy.mapKeySet(mapName)); - } - - @Override - public CompletableFuture>> mapValues(String mapName) { - return checkOpen(() -> proxy.mapValues(mapName)); - } - - @Override - public CompletableFuture>>> mapEntrySet(String mapName) { - return checkOpen(() -> proxy.mapEntrySet(mapName)); - } - - @Override - public CompletableFuture counterGet(String counterName) { - return checkOpen(() -> proxy.counterGet(counterName)); - } - - @Override - public CompletableFuture counterAddAndGet(String counterName, long delta) { - return checkOpen(() -> proxy.counterAddAndGet(counterName, delta)); - } - - @Override - public CompletableFuture counterGetAndAdd(String counterName, long delta) { - return checkOpen(() -> proxy.counterGetAndAdd(counterName, delta)); - } - - @Override - public CompletableFuture counterSet(String counterName, long value) { - return checkOpen(() -> proxy.counterSet(counterName, value)); - } - - @Override - public CompletableFuture counterCompareAndSet(String counterName, long expectedValue, long update) { - return checkOpen(() -> proxy.counterCompareAndSet(counterName, expectedValue, update)); - } - - @Override - public CompletableFuture queueSize(String queueName) { - return checkOpen(() -> proxy.queueSize(queueName)); - } - - @Override - public CompletableFuture queuePush(String queueName, byte[] entry) { - return checkOpen(() -> proxy.queuePush(queueName, entry)); - } - - @Override - public CompletableFuture queuePop(String queueName) { - return checkOpen(() -> proxy.queuePop(queueName)); - } - - @Override - public CompletableFuture queuePeek(String queueName) { - return checkOpen(() -> proxy.queuePeek(queueName)); - } - - @Override - public CompletableFuture prepareAndCommit(Transaction transaction) { - return checkOpen(() -> proxy.prepareAndCommit(transaction)); - } - - @Override - public CompletableFuture prepare(Transaction transaction) { - return checkOpen(() -> proxy.prepare(transaction)); - } - - @Override - public CompletableFuture commit(Transaction transaction) { - return checkOpen(() -> proxy.commit(transaction)); - } - - @Override - public CompletableFuture rollback(Transaction transaction) { - return checkOpen(() -> proxy.rollback(transaction)); - } - - @Override - @SuppressWarnings("unchecked") - public synchronized CompletableFuture open() { - return runStartupTasks() - .thenCompose(v -> stateMachine.open()) - .thenRun(() -> { - this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader()); - }) - .thenApply(v -> null); - } - - @Override - public synchronized CompletableFuture close() { - proxy = null; - return stateMachine.close() - .thenCompose(v -> runShutdownTasks()); - } - - @Override - public int hashCode() { - return name().hashCode(); - } - - @Override - public boolean equals(Object other) { - if (other instanceof Database) { - return name().equals(((Database) other).name()); - } - return false; - } - - @Override - public void registerConsumer(Consumer consumer) { - consumers.add(consumer); - } - - @Override - public void unregisterConsumer(Consumer consumer) { - consumers.remove(consumer); - } - - private class InternalStateMachineWatcher implements TriConsumer { - @Override - public void accept(String name, Object input, Object output) { - StateMachineUpdate update = new StateMachineUpdate(name, input, output); - consumers.forEach(consumer -> consumer.accept(update)); - } - } -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java deleted file mode 100644 index 27d953bf38..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java +++ /dev/null @@ -1,381 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.primitives.impl; - -import com.google.common.base.Objects; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import net.kuujo.copycat.state.Initializer; -import net.kuujo.copycat.state.StateContext; - -import org.onlab.util.Match; -import org.onosproject.store.primitives.MapUpdate; -import org.onosproject.store.primitives.TransactionId; -import org.onosproject.store.service.Versioned; - -import java.util.Arrays; -import java.util.Collection; -import java.util.LinkedList; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; - -/** - * Default database state. - */ -public class DefaultDatabaseState implements DatabaseState { - private Long nextVersion; - private Map counters; - private Map>> maps; - private Map> queues; - - /** - * This locks map has a structure similar to the "tables" map above and - * holds all the provisional updates made during a transaction's prepare phase. - * The entry value is represented as the tuple: (transactionId, newValue) - * If newValue == null that signifies this update is attempting to - * delete the existing value. - * This map also serves as a lock on the entries that are being updated. - * The presence of a entry in this map indicates that element is - * participating in a transaction and is currently locked for updates. - */ - private Map> locks; - - @Initializer - @Override - public void init(StateContext> context) { - counters = context.get("counters"); - if (counters == null) { - counters = Maps.newConcurrentMap(); - context.put("counters", counters); - } - maps = context.get("maps"); - if (maps == null) { - maps = Maps.newConcurrentMap(); - context.put("maps", maps); - } - locks = context.get("locks"); - if (locks == null) { - locks = Maps.newConcurrentMap(); - context.put("locks", locks); - } - queues = context.get("queues"); - if (queues == null) { - queues = Maps.newConcurrentMap(); - context.put("queues", queues); - } - nextVersion = context.get("nextVersion"); - if (nextVersion == null) { - nextVersion = 0L; - context.put("nextVersion", nextVersion); - } - } - - @Override - public Set maps() { - return ImmutableSet.copyOf(maps.keySet()); - } - - @Override - public Map counters() { - Map counterMap = Maps.newHashMap(); - counters.forEach((k, v) -> counterMap.put(k, v.get())); - return counterMap; - } - - @Override - public int mapSize(String mapName) { - return getMap(mapName).size(); - } - - @Override - public boolean mapIsEmpty(String mapName) { - return getMap(mapName).isEmpty(); - } - - @Override - public boolean mapContainsKey(String mapName, String key) { - return getMap(mapName).containsKey(key); - } - - @Override - public boolean mapContainsValue(String mapName, byte[] value) { - return getMap(mapName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value)); - } - - @Override - public Versioned mapGet(String mapName, String key) { - return getMap(mapName).get(key); - } - - - @Override - public Result> mapUpdate( - String mapName, - String key, - Match valueMatch, - Match versionMatch, - byte[] value) { - if (isLockedForUpdates(mapName, key)) { - return Result.locked(); - } - Versioned currentValue = getMap(mapName).get(key); - if (!valueMatch.matches(currentValue == null ? null : currentValue.value()) || - !versionMatch.matches(currentValue == null ? null : currentValue.version())) { - return Result.ok(new UpdateResult<>(false, mapName, key, currentValue, currentValue)); - } else { - if (value == null) { - if (currentValue == null) { - return Result.ok(new UpdateResult<>(false, mapName, key, null, null)); - } else { - getMap(mapName).remove(key); - return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, null)); - } - } - Versioned newValue = new Versioned<>(value, ++nextVersion); - getMap(mapName).put(key, newValue); - return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, newValue)); - } - } - - @Override - public Result mapClear(String mapName) { - if (areTransactionsInProgress(mapName)) { - return Result.locked(); - } - getMap(mapName).clear(); - return Result.ok(null); - } - - @Override - public Set mapKeySet(String mapName) { - return ImmutableSet.copyOf(getMap(mapName).keySet()); - } - - @Override - public Collection> mapValues(String mapName) { - return ImmutableList.copyOf(getMap(mapName).values()); - } - - @Override - public Set>> mapEntrySet(String mapName) { - return ImmutableSet.copyOf(getMap(mapName) - .entrySet() - .stream() - .map(entry -> Maps.immutableEntry(entry.getKey(), entry.getValue())) - .collect(Collectors.toSet())); - } - - @Override - public Long counterAddAndGet(String counterName, long delta) { - return getCounter(counterName).addAndGet(delta); - } - - @Override - public Long counterGetAndAdd(String counterName, long delta) { - return getCounter(counterName).getAndAdd(delta); - } - - @Override - public Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue) { - return getCounter(counterName).compareAndSet(expectedValue, updateValue); - } - - @Override - public Long counterGet(String counterName) { - return getCounter(counterName).get(); - } - - @Override - public void counterSet(String counterName, long value) { - getCounter(counterName).set(value); - } - - @Override - public Long queueSize(String queueName) { - return Long.valueOf(getQueue(queueName).size()); - } - - @Override - public byte[] queuePeek(String queueName) { - return getQueue(queueName).peek(); - } - - @Override - public byte[] queuePop(String queueName) { - return getQueue(queueName).poll(); - } - - @Override - public void queuePush(String queueName, byte[] entry) { - getQueue(queueName).offer(entry); - } - - @Override - public CommitResponse prepareAndCommit(Transaction transaction) { - if (prepare(transaction)) { - return commit(transaction); - } - return CommitResponse.failure(); - } - - @Override - public boolean prepare(Transaction transaction) { - if (transaction.updates().stream().anyMatch(update -> - isLockedByAnotherTransaction(update.mapName(), - update.key(), - transaction.id()))) { - return false; - } - - if (transaction.updates().stream().allMatch(this::isUpdatePossible)) { - transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id())); - return true; - } - return false; - } - - @Override - public CommitResponse commit(Transaction transaction) { - return CommitResponse.success(Lists.transform(transaction.updates(), - update -> commitProvisionalUpdate(update, transaction.id()))); - } - - @Override - public boolean rollback(Transaction transaction) { - transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id())); - return true; - } - - private Map> getMap(String mapName) { - return maps.computeIfAbsent(mapName, name -> Maps.newConcurrentMap()); - } - - private Map getLockMap(String mapName) { - return locks.computeIfAbsent(mapName, name -> Maps.newConcurrentMap()); - } - - private AtomicLong getCounter(String counterName) { - return counters.computeIfAbsent(counterName, name -> new AtomicLong(0)); - } - - private Queue getQueue(String queueName) { - return queues.computeIfAbsent(queueName, name -> new LinkedList<>()); - } - - private boolean isUpdatePossible(MapUpdate update) { - Versioned existingEntry = mapGet(update.mapName(), update.key()); - switch (update.type()) { - case PUT: - case REMOVE: - return true; - case PUT_IF_ABSENT: - return existingEntry == null; - case PUT_IF_VERSION_MATCH: - return existingEntry != null && existingEntry.version() == update.currentVersion(); - case PUT_IF_VALUE_MATCH: - return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue()); - case REMOVE_IF_VERSION_MATCH: - return existingEntry == null || existingEntry.version() == update.currentVersion(); - case REMOVE_IF_VALUE_MATCH: - return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue()); - default: - throw new IllegalStateException("Unsupported type: " + update.type()); - } - } - - private void doProvisionalUpdate(MapUpdate update, TransactionId transactionId) { - Map lockMap = getLockMap(update.mapName()); - switch (update.type()) { - case PUT: - case PUT_IF_ABSENT: - case PUT_IF_VERSION_MATCH: - case PUT_IF_VALUE_MATCH: - lockMap.put(update.key(), new Update(transactionId, update.value())); - break; - case REMOVE: - case REMOVE_IF_VERSION_MATCH: - case REMOVE_IF_VALUE_MATCH: - lockMap.put(update.key(), new Update(transactionId, null)); - break; - default: - throw new IllegalStateException("Unsupported type: " + update.type()); - } - } - - private UpdateResult commitProvisionalUpdate( - MapUpdate update, TransactionId transactionId) { - String mapName = update.mapName(); - String key = update.key(); - Update provisionalUpdate = getLockMap(mapName).get(key); - if (Objects.equal(transactionId, provisionalUpdate.transactionId())) { - getLockMap(mapName).remove(key); - } else { - throw new IllegalStateException("Invalid transaction Id"); - } - return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value(); - } - - private void undoProvisionalUpdate(MapUpdate update, TransactionId transactionId) { - String mapName = update.mapName(); - String key = update.key(); - Update provisionalUpdate = getLockMap(mapName).get(key); - if (provisionalUpdate == null) { - return; - } - if (Objects.equal(transactionId, provisionalUpdate.transactionId())) { - getLockMap(mapName).remove(key); - } - } - - private boolean isLockedByAnotherTransaction(String mapName, String key, TransactionId transactionId) { - Update update = getLockMap(mapName).get(key); - return update != null && !Objects.equal(transactionId, update.transactionId()); - } - - private boolean isLockedForUpdates(String mapName, String key) { - return getLockMap(mapName).containsKey(key); - } - - private boolean areTransactionsInProgress(String mapName) { - return !getLockMap(mapName).isEmpty(); - } - - private class Update { - private final TransactionId transactionId; - private final byte[] value; - - public Update(TransactionId txId, byte[] value) { - this.transactionId = txId; - this.value = value; - } - - public TransactionId transactionId() { - return this.transactionId; - } - - public byte[] value() { - return this.value; - } - } -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueue.java deleted file mode 100644 index 34801170b0..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueue.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Futures; - -import org.onlab.util.SharedExecutors; -import org.onosproject.store.service.DistributedPrimitive; -import org.onosproject.store.service.DistributedQueue; -import org.onosproject.store.service.Serializer; -import org.onosproject.utils.MeteringAgent; - -import java.util.List; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - -import static com.google.common.base.Preconditions.checkNotNull; -import static org.onosproject.store.primitives.impl.StateMachineUpdate.Target.QUEUE_PUSH; - -/** - * DistributedQueue implementation that provides FIFO ordering semantics. - * - * @param queue entry type - */ -public class DefaultDistributedQueue implements DistributedQueue { - - private final String name; - private final Database database; - private final Serializer serializer; - private final Set> pendingFutures = Sets.newIdentityHashSet(); - - private static final String PRIMITIVE_NAME = "distributedQueue"; - private static final String SIZE = "size"; - private static final String PUSH = "push"; - private static final String POP = "pop"; - private static final String PEEK = "peek"; - - private static final String ERROR_NULL_ENTRY = "Null entries are not allowed"; - private final MeteringAgent monitor; - - public DefaultDistributedQueue(String name, - Database database, - Serializer serializer, - boolean meteringEnabled) { - this.name = checkNotNull(name, "queue name cannot be null"); - this.database = checkNotNull(database, "database cannot be null"); - this.serializer = checkNotNull(serializer, "serializer cannot be null"); - this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled); - this.database.registerConsumer(update -> { - SharedExecutors.getSingleThreadExecutor().execute(() -> { - if (update.target() == QUEUE_PUSH) { - List input = update.input(); - String queueName = (String) input.get(0); - if (queueName.equals(name)) { - tryPoll(); - } - } - }); - }); - } - - @Override - public long size() { - final MeteringAgent.Context timer = monitor.startTimer(SIZE); - return Futures.getUnchecked(database.queueSize(name).whenComplete((r, e) -> timer.stop(e))); - } - - @Override - public void push(E entry) { - checkNotNull(entry, ERROR_NULL_ENTRY); - final MeteringAgent.Context timer = monitor.startTimer(PUSH); - Futures.getUnchecked(database.queuePush(name, serializer.encode(entry)) - .whenComplete((r, e) -> timer.stop(e))); - } - - @Override - public CompletableFuture pop() { - final MeteringAgent.Context timer = monitor.startTimer(POP); - return database.queuePop(name) - .whenComplete((r, e) -> timer.stop(e)) - .thenCompose(v -> { - if (v != null) { - return CompletableFuture.completedFuture(serializer.decode(v)); - } - CompletableFuture newPendingFuture = new CompletableFuture<>(); - pendingFutures.add(newPendingFuture); - return newPendingFuture; - }); - - } - - @Override - public E peek() { - final MeteringAgent.Context timer = monitor.startTimer(PEEK); - return Futures.getUnchecked(database.queuePeek(name) - .thenApply(v -> v != null ? serializer.decode(v) : null) - .whenComplete((r, e) -> timer.stop(e))); - } - - @Override - public String name() { - return name; - } - - @Override - public DistributedPrimitive.Type primitiveType() { - return DistributedPrimitive.Type.QUEUE; - } - - protected void tryPoll() { - Set> completedFutures = Sets.newHashSet(); - for (CompletableFuture future : pendingFutures) { - E entry = Futures.getUnchecked(database.queuePop(name) - .thenApply(v -> v != null ? serializer.decode(v) : null)); - if (entry != null) { - future.complete(entry); - completedFutures.add(future); - } else { - break; - } - } - pendingFutures.removeAll(completedFutures); - } -} \ No newline at end of file diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java deleted file mode 100644 index ea3665ea34..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedQueueBuilder.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import org.onosproject.store.service.DistributedQueue; -import org.onosproject.store.service.DistributedQueueBuilder; -import org.onosproject.store.service.Serializer; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -/** - * Default implementation of a {@code DistributedQueueBuilder}. - * - * @param queue entry type - */ -public class DefaultDistributedQueueBuilder implements DistributedQueueBuilder { - - private Serializer serializer; - private String name; - private boolean persistenceEnabled = true; - private final DatabaseManager databaseManager; - private boolean metering = true; - - public DefaultDistributedQueueBuilder(DatabaseManager databaseManager) { - this.databaseManager = databaseManager; - } - - @Override - public DistributedQueueBuilder withName(String name) { - checkArgument(name != null && !name.isEmpty()); - this.name = name; - return this; - } - - @Override - public DistributedQueueBuilder withSerializer(Serializer serializer) { - checkArgument(serializer != null); - this.serializer = serializer; - return this; - } - - @Override - public DistributedQueueBuilder withMeteringDisabled() { - metering = false; - return this; - } - - @Override - public DistributedQueueBuilder withPersistenceDisabled() { - persistenceEnabled = false; - return this; - } - - private boolean validInputs() { - return name != null && serializer != null; - } - - @Override - public DistributedQueue build() { - checkState(validInputs()); - return new DefaultDistributedQueue<>( - name, - persistenceEnabled ? databaseManager.partitionedDatabase : databaseManager.inMemoryDatabase, - serializer, - metering); - } -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java deleted file mode 100644 index 30a6f26eb6..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.primitives.impl; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; -import java.util.function.Supplier; - -import static com.google.common.base.Preconditions.*; - -import org.onosproject.store.primitives.MapUpdate; -import org.onosproject.store.primitives.TransactionId; -import org.onosproject.store.primitives.resources.impl.CommitResult; -import org.onosproject.store.service.CommitStatus; -import org.onosproject.store.service.ConsistentMapBuilder; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.TransactionContext; -import org.onosproject.store.service.TransactionalMap; - -import com.google.common.base.MoreObjects; -import com.google.common.base.MoreObjects.ToStringHelper; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.Futures; - -/** - * Default TransactionContext implementation. - */ -public class DefaultTransactionContext implements TransactionContext { - private static final String TX_NOT_OPEN_ERROR = "Transaction Context is not open"; - - @SuppressWarnings("rawtypes") - private final Map txMaps = Maps.newConcurrentMap(); - private boolean isOpen = false; - private final Function> transactionCommitter; - private final TransactionId transactionId; - private final Supplier mapBuilderSupplier; - - public DefaultTransactionContext(TransactionId transactionId, - Function> transactionCommitter, - Supplier mapBuilderSupplier) { - this.transactionId = transactionId; - this.transactionCommitter = checkNotNull(transactionCommitter); - this.mapBuilderSupplier = checkNotNull(mapBuilderSupplier); - } - - @Override - public TransactionId transactionId() { - return transactionId; - } - - @Override - public void begin() { - checkState(!isOpen, "Transaction Context is already open"); - isOpen = true; - } - - @Override - public boolean isOpen() { - return isOpen; - } - - @Override - @SuppressWarnings("unchecked") - public TransactionalMap getTransactionalMap(String mapName, - Serializer serializer) { - checkState(isOpen, TX_NOT_OPEN_ERROR); - checkNotNull(mapName); - checkNotNull(serializer); - return txMaps.computeIfAbsent(mapName, name -> { - ConsistentMapBuilder mapBuilder = (ConsistentMapBuilder) mapBuilderSupplier.get() - .withName(name) - .withSerializer(serializer); - return new DefaultTransactionalMap<>( - name, - mapBuilder.buildAsyncMap(), - this, - serializer); - }); - } - - @SuppressWarnings("unchecked") - @Override - public CompletableFuture commit() { - // TODO: rework commit implementation to be more intuitive - checkState(isOpen, TX_NOT_OPEN_ERROR); - CommitStatus status; - try { - List> updates = Lists.newLinkedList(); - txMaps.values().forEach(m -> updates.addAll(m.toMapUpdates())); - Transaction transaction = new Transaction(transactionId, updates); - status = Futures.getUnchecked(transactionCommitter.apply(transaction)) == CommitResult.OK - ? CommitStatus.SUCCESS : CommitStatus.FAILURE; - } catch (Exception e) { - abort(); - status = CommitStatus.FAILURE; - } finally { - isOpen = false; - } - return CompletableFuture.completedFuture(status); - } - - @Override - public void abort() { - if (isOpen) { - try { - txMaps.values().forEach(m -> m.abort()); - } finally { - isOpen = false; - } - } - } - - @Override - public String toString() { - ToStringHelper s = MoreObjects.toStringHelper(this) - .add("transactionId", transactionId) - .add("isOpen", isOpen); - - txMaps.entrySet().forEach(e -> { - s.add(e.getKey(), e.getValue()); - }); - return s.toString(); - } - - @Override - public String name() { - return transactionId.toString(); - } -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java deleted file mode 100644 index 91f4bf6e7d..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; -import java.util.function.Supplier; - -import org.onosproject.store.primitives.TransactionId; -import org.onosproject.store.primitives.resources.impl.CommitResult; -import org.onosproject.store.service.ConsistentMapBuilder; -import org.onosproject.store.service.TransactionContext; -import org.onosproject.store.service.TransactionContextBuilder; - -/** - * The default implementation of a transaction context builder. This builder - * generates a {@link DefaultTransactionContext}. - */ -public class DefaultTransactionContextBuilder extends TransactionContextBuilder { - - private final Supplier mapBuilderSupplier; - private final Function> transactionCommitter; - private final TransactionId transactionId; - - public DefaultTransactionContextBuilder(Supplier mapBuilderSupplier, - Function> transactionCommiter, - TransactionId transactionId) { - this.mapBuilderSupplier = mapBuilderSupplier; - this.transactionCommitter = transactionCommiter; - this.transactionId = transactionId; - } - - @Override - public TransactionContext build() { - return new DefaultTransactionContext(transactionId, transactionCommitter, () -> { - ConsistentMapBuilder mapBuilder = mapBuilderSupplier.get(); - if (partitionsDisabled()) { - mapBuilder = (ConsistentMapBuilder) mapBuilder.withPartitionsDisabled(); - } - return mapBuilder; - }); - } -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MappingSet.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MappingSet.java deleted file mode 100644 index 6ca4d4f8f9..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MappingSet.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; -import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; - -import com.google.common.collect.Iterators; - -/** - * Set view backed by Set with element type {@code } but returns - * element as {@code } for convenience. - * - * @param Backing {@link Set} element type. - * MappingSet will follow this type's equality behavior. - * @param external facing element type. - * MappingSet will ignores equality defined by this type. - */ -class MappingSet implements Set { - - private final Set backedSet; - private final Function toBack; - private final Function toOut; - - public MappingSet(Set backedSet, - Function, Set> supplier, - Function toBack, Function toOut) { - this.backedSet = supplier.apply(backedSet); - this.toBack = toBack; - this.toOut = toOut; - } - - @Override - public int size() { - return backedSet.size(); - } - - @Override - public boolean isEmpty() { - return backedSet.isEmpty(); - } - - @Override - public boolean contains(Object o) { - return backedSet.contains(toBack.apply((OUT) o)); - } - - @Override - public Iterator iterator() { - return Iterators.transform(backedSet.iterator(), toOut::apply); - } - - @Override - public Object[] toArray() { - return backedSet.stream() - .map(toOut) - .toArray(); - } - - @Override - public T[] toArray(T[] a) { - return backedSet.stream() - .map(toOut) - .toArray(size -> { - if (size < a.length) { - return (T[]) new Object[size]; - } else { - Arrays.fill(a, null); - return a; - } - }); - } - - @Override - public boolean add(OUT e) { - return backedSet.add(toBack.apply(e)); - } - - @Override - public boolean remove(Object o) { - return backedSet.remove(toBack.apply((OUT) o)); - } - - @Override - public boolean containsAll(Collection c) { - return c.stream() - .map(e -> toBack.apply((OUT) e)) - .allMatch(backedSet::contains); - } - - @Override - public boolean addAll(Collection c) { - return backedSet.addAll(c.stream().map(toBack).collect(Collectors.toList())); - } - - @Override - public boolean retainAll(Collection c) { - return backedSet.retainAll(c.stream() - .map(x -> toBack.apply((OUT) x)) - .collect(Collectors.toList())); - } - - @Override - public boolean removeAll(Collection c) { - return backedSet.removeAll(c.stream() - .map(x -> toBack.apply((OUT) x)) - .collect(Collectors.toList())); - } - - @Override - public void clear() { - backedSet.clear(); - } -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java deleted file mode 100644 index 31e22a86d5..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedDatabase.java +++ /dev/null @@ -1,404 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.primitives.impl; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import net.kuujo.copycat.Task; -import net.kuujo.copycat.cluster.Cluster; -import net.kuujo.copycat.resource.ResourceState; - -import org.onlab.util.Match; -import org.onosproject.store.primitives.MapUpdate; -import org.onosproject.store.primitives.resources.impl.CommitResult; -import org.onosproject.store.service.Versioned; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.stream.Collectors; - -import static com.google.common.base.Preconditions.checkState; - -/** - * A database that partitions the keys across one or more database partitions. - */ -public class PartitionedDatabase implements Database { - - private final String name; - private final Partitioner partitioner; - private final List partitions; - private final AtomicBoolean isOpen = new AtomicBoolean(false); - private static final String DB_NOT_OPEN = "Partitioned Database is not open"; - private TransactionManager transactionManager; - - public PartitionedDatabase( - String name, - Collection partitions) { - this.name = name; - this.partitions = partitions - .stream() - .sorted((db1, db2) -> db1.name().compareTo(db2.name())) - .collect(Collectors.toList()); - this.partitioner = new SimpleKeyHashPartitioner(this.partitions); - } - - /** - * Returns the databases for individual partitions. - * @return list of database partitions - */ - public List getPartitions() { - return partitions; - } - - /** - * Returns true if the database is open. - * @return true if open, false otherwise - */ - @Override - public boolean isOpen() { - return isOpen.get(); - } - - @Override - public CompletableFuture> maps() { - checkState(isOpen.get(), DB_NOT_OPEN); - Set mapNames = Sets.newConcurrentHashSet(); - return CompletableFuture.allOf(partitions - .stream() - .map(db -> db.maps().thenApply(mapNames::addAll)) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> mapNames); - } - - @Override - public CompletableFuture> counters() { - checkState(isOpen.get(), DB_NOT_OPEN); - Map counters = Maps.newConcurrentMap(); - return CompletableFuture.allOf(partitions - .stream() - .map(db -> db.counters() - .thenApply(m -> { - counters.putAll(m); - return null; - })) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> counters); - } - - @Override - public CompletableFuture mapSize(String mapName) { - checkState(isOpen.get(), DB_NOT_OPEN); - AtomicInteger totalSize = new AtomicInteger(0); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet)) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> totalSize.get()); - } - - @Override - public CompletableFuture mapIsEmpty(String mapName) { - checkState(isOpen.get(), DB_NOT_OPEN); - return mapSize(mapName).thenApply(size -> size == 0); - } - - @Override - public CompletableFuture mapContainsKey(String mapName, String key) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(mapName, key).mapContainsKey(mapName, key); - } - - @Override - public CompletableFuture mapContainsValue(String mapName, byte[] value) { - checkState(isOpen.get(), DB_NOT_OPEN); - AtomicBoolean containsValue = new AtomicBoolean(false); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapContainsValue(mapName, value) - .thenApply(v -> containsValue.compareAndSet(false, v))) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> containsValue.get()); - } - - @Override - public CompletableFuture> mapGet(String mapName, String key) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(mapName, key).mapGet(mapName, key); - } - - @Override - public CompletableFuture>> mapUpdate( - String mapName, String key, Match valueMatch, - Match versionMatch, byte[] value) { - return partitioner.getPartition(mapName, key).mapUpdate(mapName, key, valueMatch, versionMatch, value); - - } - - @Override - public CompletableFuture> mapClear(String mapName) { - AtomicBoolean isLocked = new AtomicBoolean(false); - checkState(isOpen.get(), DB_NOT_OPEN); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapClear(mapName) - .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status()))) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null)); - } - - @Override - public CompletableFuture> mapKeySet(String mapName) { - checkState(isOpen.get(), DB_NOT_OPEN); - Set keySet = Sets.newConcurrentHashSet(); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapKeySet(mapName).thenApply(keySet::addAll)) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> keySet); - } - - @Override - public CompletableFuture>> mapValues(String mapName) { - checkState(isOpen.get(), DB_NOT_OPEN); - List> values = new CopyOnWriteArrayList<>(); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapValues(mapName).thenApply(values::addAll)) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> values); - } - - @Override - public CompletableFuture>>> mapEntrySet(String mapName) { - checkState(isOpen.get(), DB_NOT_OPEN); - Set>> entrySet = Sets.newConcurrentHashSet(); - return CompletableFuture.allOf(partitions - .stream() - .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll)) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> entrySet); - } - - @Override - public CompletableFuture counterGet(String counterName) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(counterName, counterName).counterGet(counterName); - } - - @Override - public CompletableFuture counterAddAndGet(String counterName, long delta) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta); - } - - @Override - public CompletableFuture counterGetAndAdd(String counterName, long delta) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta); - } - - @Override - public CompletableFuture counterSet(String counterName, long value) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(counterName, counterName).counterSet(counterName, value); - } - - @Override - public CompletableFuture counterCompareAndSet(String counterName, long expectedValue, long updateValue) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(counterName, counterName). - counterCompareAndSet(counterName, expectedValue, updateValue); - - } - - @Override - public CompletableFuture queueSize(String queueName) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(queueName, queueName).queueSize(queueName); - } - - @Override - public CompletableFuture queuePush(String queueName, byte[] entry) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry); - } - - @Override - public CompletableFuture queuePop(String queueName) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(queueName, queueName).queuePop(queueName); - } - - @Override - public CompletableFuture queuePeek(String queueName) { - checkState(isOpen.get(), DB_NOT_OPEN); - return partitioner.getPartition(queueName, queueName).queuePeek(queueName); - } - - @Override - public CompletableFuture prepareAndCommit(Transaction transaction) { - Map subTransactions = createSubTransactions(transaction); - if (subTransactions.isEmpty()) { - return CompletableFuture.completedFuture(CommitResponse.success(ImmutableList.of())); - } else if (subTransactions.size() == 1) { - Entry entry = - subTransactions.entrySet().iterator().next(); - return entry.getKey().prepareAndCommit(entry.getValue()); - } else { - if (transactionManager == null) { - throw new IllegalStateException("TransactionManager is not initialized"); - } - return transactionManager.execute(transaction) - .thenApply(r -> r == CommitResult.OK - ? CommitResponse.success(ImmutableList.of()) : CommitResponse.failure()); - } - } - - @Override - public CompletableFuture prepare(Transaction transaction) { - Map subTransactions = createSubTransactions(transaction); - AtomicBoolean status = new AtomicBoolean(true); - return CompletableFuture.allOf(subTransactions.entrySet() - .stream() - .map(entry -> entry - .getKey() - .prepare(entry.getValue()) - .thenApply(v -> status.compareAndSet(true, v))) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> status.get()); - } - - @Override - public CompletableFuture commit(Transaction transaction) { - Map subTransactions = createSubTransactions(transaction); - AtomicBoolean success = new AtomicBoolean(true); - List> allUpdates = Lists.newArrayList(); - return CompletableFuture.allOf(subTransactions.entrySet() - .stream() - .map(entry -> entry.getKey().commit(entry.getValue()) - .thenAccept(response -> { - success.set(success.get() && response.success()); - if (success.get()) { - allUpdates.addAll(response.updates()); - } - })) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> success.get() ? - CommitResponse.success(allUpdates) : CommitResponse.failure()); - } - - @Override - public CompletableFuture rollback(Transaction transaction) { - Map subTransactions = createSubTransactions(transaction); - return CompletableFuture.allOf(subTransactions.entrySet() - .stream() - .map(entry -> entry.getKey().rollback(entry.getValue())) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> true); - } - - @Override - public CompletableFuture open() { - return CompletableFuture.allOf(partitions - .stream() - .map(Database::open) - .toArray(CompletableFuture[]::new)) - .thenApply(v -> { - isOpen.set(true); - return this; - }); - } - - @Override - public CompletableFuture close() { - checkState(isOpen.get(), DB_NOT_OPEN); - return CompletableFuture.allOf(partitions - .stream() - .map(database -> database.close()) - .toArray(CompletableFuture[]::new)); - } - - @Override - public boolean isClosed() { - return !isOpen.get(); - } - - @Override - public String name() { - return name; - } - - @Override - public Cluster cluster() { - throw new UnsupportedOperationException(); - } - - @Override - public Database addStartupTask(Task> task) { - throw new UnsupportedOperationException(); - } - - @Override - public Database addShutdownTask(Task> task) { - throw new UnsupportedOperationException(); - } - - @Override - public ResourceState state() { - throw new UnsupportedOperationException(); - } - - private Map createSubTransactions( - Transaction transaction) { - Map>> perPartitionUpdates = Maps.newHashMap(); - for (MapUpdate update : transaction.updates()) { - Database partition = partitioner.getPartition(update.mapName(), update.key()); - List> partitionUpdates = - perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList()); - partitionUpdates.add(update); - } - Map subTransactions = Maps.newHashMap(); - perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new Transaction(transaction.id(), v))); - return subTransactions; - } - - protected void setTransactionManager(TransactionManager transactionManager) { - this.transactionManager = transactionManager; - } - - @Override - public void registerConsumer(Consumer consumer) { - partitions.forEach(p -> p.registerConsumer(consumer)); - } - - @Override - public void unregisterConsumer(Consumer consumer) { - partitions.forEach(p -> p.unregisterConsumer(consumer)); - } -} - diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Partitioner.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Partitioner.java deleted file mode 100644 index 7cabb3d4db..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Partitioner.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.primitives.impl; - -/** - * Partitioner is responsible for mapping keys to individual database partitions. - * - * @param key type. - */ -public interface Partitioner { - - /** - * Returns the database partition. - * @param mapName map name - * @param key key - * @return Database partition - */ - Database getPartition(String mapName, K key); -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Result.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Result.java deleted file mode 100644 index acd724cb89..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Result.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import static com.google.common.base.MoreObjects.toStringHelper; - -import java.util.Objects; - -/** - * Result of a database update operation. - * - * @param return value type - */ -public final class Result { - - public enum Status { - /** - * Indicates a successful update. - */ - OK, - - /** - * Indicates a failure due to underlying state being locked by another transaction. - */ - LOCKED - } - - private final Status status; - private final V value; - - /** - * Creates a new Result instance with the specified value with status set to Status.OK. - * - * @param result value type - * @param value result value - * @return Result instance - */ - public static Result ok(V value) { - return new Result<>(value, Status.OK); - } - - /** - * Creates a new Result instance with status set to Status.LOCKED. - * - * @param result value type - * @return Result instance - */ - public static Result locked() { - return new Result<>(null, Status.LOCKED); - } - - private Result(V value, Status status) { - this.value = value; - this.status = status; - } - - /** - * Returns true if this result indicates a successful execution i.e status is Status.OK. - * - * @return true if successful, false otherwise - */ - public boolean success() { - return status == Status.OK; - } - - /** - * Returns the status of database update operation. - * - * @return database update status - */ - public Status status() { - return status; - } - - /** - * Returns the return value for the update. - * - * @return value returned by database update. If the status is another - * other than Status.OK, this returns a null - */ - public V value() { - return value; - } - - @Override - public int hashCode() { - return Objects.hash(value, status); - } - - @SuppressWarnings("unchecked") - @Override - public boolean equals(Object other) { - if (!(other instanceof Result)) { - return false; - } - Result that = (Result) other; - return Objects.equals(this.value, that.value) && - Objects.equals(this.status, that.status); - } - - @Override - public String toString() { - return toStringHelper(this) - .add("status", status) - .add("value", value) - .toString(); - } -} \ No newline at end of file diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/SimpleKeyHashPartitioner.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/SimpleKeyHashPartitioner.java deleted file mode 100644 index c94090ac35..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/SimpleKeyHashPartitioner.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.primitives.impl; - -import java.util.List; - -/** - * A simple Partitioner for mapping keys to database partitions. - *

- * This class uses a md5 hash based hashing scheme for hashing the key to - * a partition. - * - */ -public class SimpleKeyHashPartitioner extends DatabasePartitioner { - - public SimpleKeyHashPartitioner(List partitions) { - super(partitions); - } - - @Override - public Database getPartition(String mapName, String key) { - return partitions.get(hash(key) % partitions.size()); - } -} \ No newline at end of file diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/SimpleTableHashPartitioner.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/SimpleTableHashPartitioner.java deleted file mode 100644 index bf7971e8d3..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/SimpleTableHashPartitioner.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onosproject.store.primitives.impl; - -import java.util.List; - -/** - * A simple Partitioner that uses the map name hash to - * pick a partition. - *

- * This class uses a md5 hash based hashing scheme for hashing the map name to - * a partition. This partitioner maps all keys for a map to the same database - * partition. - */ -public class SimpleTableHashPartitioner extends DatabasePartitioner { - - public SimpleTableHashPartitioner(List partitions) { - super(partitions); - } - - @Override - public Database getPartition(String mapName, String key) { - return partitions.get(hash(mapName) % partitions.size()); - } -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StateMachineUpdate.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StateMachineUpdate.java deleted file mode 100644 index 6d1dcd7d10..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StateMachineUpdate.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import static com.google.common.base.MoreObjects.toStringHelper; - -/** - * Representation of a state machine update. - */ -public class StateMachineUpdate { - - /** - * Target data structure type this update is for. - */ - enum Target { - /** - * Update is for a map. - */ - MAP_UPDATE, - - /** - * Update is a transaction commit. - */ - TX_COMMIT, - - /** - * Update is a queue push. - */ - QUEUE_PUSH, - - /** - * Update is for some other operation. - */ - OTHER - } - - private final String operationName; - private final Object input; - private final Object output; - - public StateMachineUpdate(String operationName, Object input, Object output) { - this.operationName = operationName; - this.input = input; - this.output = output; - } - - public Target target() { - // FIXME: This check is brittle - if (operationName.contains("mapUpdate")) { - return Target.MAP_UPDATE; - } else if (operationName.contains("commit") || operationName.contains("prepareAndCommit")) { - return Target.TX_COMMIT; - } else if (operationName.contains("queuePush")) { - return Target.QUEUE_PUSH; - } else { - return Target.OTHER; - } - } - - @SuppressWarnings("unchecked") - public T input() { - return (T) input; - } - - @SuppressWarnings("unchecked") - public T output() { - return (T) output; - } - - @Override - public String toString() { - return toStringHelper(this) - .add("name", operationName) - .add("input", input) - .add("output", output) - .toString(); - } -} \ No newline at end of file diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java deleted file mode 100644 index 02a9fde835..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Collection; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; - -import org.onosproject.store.primitives.TransactionId; -import org.onosproject.store.primitives.resources.impl.CommitResult; -import org.onosproject.store.service.AsyncConsistentMap; - -import static org.onosproject.store.primitives.impl.Transaction.State.COMMITTED; -import static org.onosproject.store.primitives.impl.Transaction.State.COMMITTING; -import static org.onosproject.store.primitives.impl.Transaction.State.ROLLEDBACK; -import static org.onosproject.store.primitives.impl.Transaction.State.ROLLINGBACK; - -/** - * Agent that runs the two phase commit protocol. - */ -public class TransactionManager { - - private final Database database; - private final AsyncConsistentMap transactions; - - public TransactionManager(Database database, AsyncConsistentMap transactions) { - this.database = checkNotNull(database, "database cannot be null"); - this.transactions = transactions; - } - - /** - * Executes the specified transaction by employing a two phase commit protocol. - * - * @param transaction transaction to commit - * @return transaction commit result - */ - public CompletableFuture execute(Transaction transaction) { - // short-circuit if there is only a single update - if (transaction.updates().size() <= 1) { - return database.prepareAndCommit(transaction) - .thenApply(response -> response.success() - ? CommitResult.OK : CommitResult.FAILURE_DURING_COMMIT); - } - // clean up if this transaction in already in a terminal state. - if (transaction.state() == COMMITTED || transaction.state() == ROLLEDBACK) { - return transactions.remove(transaction.id()).thenApply(v -> CommitResult.OK); - } else if (transaction.state() == COMMITTING) { - return commit(transaction); - } else if (transaction.state() == ROLLINGBACK) { - return rollback(transaction).thenApply(v -> CommitResult.FAILURE_TO_PREPARE); - } else { - return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction)); - } - } - - /** - * Returns all pending transaction identifiers. - * - * @return future for a collection of transaction identifiers. - */ - public CompletableFuture> getPendingTransactionIds() { - return transactions.values().thenApply(c -> c.stream() - .map(v -> v.value()) - .filter(v -> v.state() != COMMITTED && v.state() != ROLLEDBACK) - .map(Transaction::id) - .collect(Collectors.toList())); - } - - private CompletableFuture prepare(Transaction transaction) { - return transactions.put(transaction.id(), transaction) - .thenCompose(v -> database.prepare(transaction)) - .thenCompose(status -> transactions.put( - transaction.id(), - transaction.transition(status ? COMMITTING : ROLLINGBACK)) - .thenApply(v -> status)); - } - - private CompletableFuture commit(Transaction transaction) { - return database.commit(transaction) - .thenCompose(r -> { - if (r.success()) { - return transactions.put(transaction.id(), transaction.transition(COMMITTED)) - .thenApply(v -> CommitResult.OK); - } else { - return CompletableFuture.completedFuture(CommitResult.FAILURE_DURING_COMMIT); - } - }); - } - - private CompletableFuture rollback(Transaction transaction) { - return database.rollback(transaction) - .thenCompose(v -> transactions.put(transaction.id(), transaction.transition(ROLLEDBACK))) - .thenApply(v -> CommitResult.FAILURE_TO_PREPARE); - } -} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/UpdateResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/UpdateResult.java deleted file mode 100644 index 6122b7980c..0000000000 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/UpdateResult.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import java.util.function.Function; - -import org.onosproject.store.service.MapEvent; -import org.onosproject.store.service.Versioned; - -/** - * Result of a update operation. - *

- * Both old and new values are accessible along with a flag that indicates if the - * the value was updated. If flag is false, oldValue and newValue both - * point to the same unmodified value. - * @param result type - */ -public class UpdateResult { - - private final boolean updated; - private final String mapName; - private final K key; - private final Versioned oldValue; - private final Versioned newValue; - - public UpdateResult(boolean updated, String mapName, K key, Versioned oldValue, Versioned newValue) { - this.updated = updated; - this.mapName = mapName; - this.key = key; - this.oldValue = oldValue; - this.newValue = newValue; - } - - public boolean updated() { - return updated; - } - - public String mapName() { - return mapName; - } - - public K key() { - return key; - } - - public Versioned oldValue() { - return oldValue; - } - - public Versioned newValue() { - return newValue; - } - - public UpdateResult map(Function keyTransform, Function valueMapper) { - return new UpdateResult<>(updated, - mapName, - keyTransform.apply(key), - oldValue == null ? null : oldValue.map(valueMapper), - newValue == null ? null : newValue.map(valueMapper)); - } - - public MapEvent toMapEvent() { - if (!updated) { - return null; - } else { - return new MapEvent<>(mapName(), key(), newValue, oldValue); - } - } -} \ No newline at end of file diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMapTest.java deleted file mode 100644 index 68e7ecb3d2..0000000000 --- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/DefaultAsyncConsistentMapTest.java +++ /dev/null @@ -1,373 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import static java.util.Collections.unmodifiableCollection; -import static java.util.Collections.unmodifiableSet; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -import java.util.Collection; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; - -import net.kuujo.copycat.Task; -import net.kuujo.copycat.cluster.Cluster; -import net.kuujo.copycat.resource.ResourceState; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.onlab.util.Match; -import org.onosproject.core.ApplicationId; -import org.onosproject.core.DefaultApplicationId; -import org.onosproject.store.service.Serializer; -import org.onosproject.store.service.Versioned; - -import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; - -/** - * - */ -public class DefaultAsyncConsistentMapTest { - - private static final ApplicationId APP_ID = new DefaultApplicationId(42, "what"); - - private static final TestData KEY1A = new TestData("One", "a"); - private static final TestData KEY1B = new TestData("One", "b"); - - private static final TestData VALUE2A = new TestData("Two", "a"); - private static final TestData VALUE2B = new TestData("Two", "b"); - - @Before - public void setUp() throws Exception { - } - - @After - public void tearDown() throws Exception { - } - - @Test - public void testKeySet() throws Exception { - DefaultAsyncConsistentMap map; - String name = "map_name"; - Database database = new TestDatabase(); - Serializer serializer = Serializer.forTypes(TestData.class); - - map = new DefaultAsyncConsistentMap<>(name, APP_ID, database, serializer, - false, false, false); - map.put(KEY1A, VALUE2A); - map.put(KEY1B, VALUE2A); - - Set set = map.keySet().get(); - assertEquals("Should contain 2 keys", - 2, set.size()); - assertThat(set.contains(KEY1A), is(true)); - assertThat(set.contains(KEY1B), is(true)); - assertThat(set.contains(new TestData("One", "a")), is(true)); - } - - @Test - public void testEntrySet() throws Exception { - DefaultAsyncConsistentMap map; - String name = "map_name"; - Database database = new TestDatabase(); - Serializer serializer = Serializer.forTypes(TestData.class); - - map = new DefaultAsyncConsistentMap<>(name, APP_ID, database, serializer, - false, false, false); - map.put(KEY1A, VALUE2A); - map.put(KEY1B, VALUE2A); - - assertEquals("Should contain 2 entry", - 2, - map.entrySet().get().size()); - } - - /** - * Object to be used as a test data. - * - * {@link Object#equals(Object)} use only part of it's fields. - * - * As a result there can be 2 instances which the - * serialized bytes are not-equal but - * {@link Object#equals(Object)}-wise they are equal. - */ - public static class TestData { - - private final String theKey; - - @SuppressWarnings("unused") - private final String notUsedForEquals; - - public TestData(String theKey, String notUsedForEquals) { - this.theKey = theKey; - this.notUsedForEquals = notUsedForEquals; - } - - @Override - public int hashCode() { - return Objects.hashCode(theKey); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof TestData) { - TestData that = (TestData) obj; - return Objects.equals(this.theKey, that.theKey); - } - return false; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("theKey", theKey) - .add("notUsedForEquals", notUsedForEquals) - .toString(); - } - } - - /** - * {@link Database} implementation for testing. - * - * There is only 1 backing Map, {@code mapName} will be ignored. - */ - public class TestDatabase implements Database { - - Map> map = new ConcurrentHashMap<>(); - - @Override - public CompletableFuture> maps() { - return CompletableFuture.completedFuture(ImmutableSet.of()); - } - - @Override - public CompletableFuture> counters() { - return CompletableFuture.completedFuture(ImmutableMap.of()); - } - - @Override - public CompletableFuture mapSize(String mapName) { - return CompletableFuture.completedFuture(map.size()); - } - - @Override - public CompletableFuture mapIsEmpty(String mapName) { - return CompletableFuture.completedFuture(map.isEmpty()); - } - - @Override - public CompletableFuture mapContainsKey(String mapName, - String key) { - return CompletableFuture.completedFuture(map.containsKey(key)); - } - - @Override - public CompletableFuture mapContainsValue(String mapName, - byte[] value) { - return CompletableFuture.completedFuture(Maps.transformValues(map, Versioned::value) - .containsValue(value)); - } - - @Override - public CompletableFuture> mapGet(String mapName, - String key) { - return CompletableFuture.completedFuture(map.get(key)); - } - - @Override - public synchronized CompletableFuture>> mapUpdate(String mapName, - String key, - Match valueMatch, - Match versionMatch, - byte[] value) { - - boolean updated = false; - final Versioned oldValue; - final Versioned newValue; - - Versioned old = map.getOrDefault(key, new Versioned(null, 0)); - if (valueMatch.matches(old.value()) && versionMatch.matches(old.version())) { - updated = true; - oldValue = old; - newValue = new Versioned<>(value, old.version() + 1); - map.put(key, newValue); - } else { - updated = false; - oldValue = old; - newValue = old; - } - return CompletableFuture.completedFuture( - Result.ok(new UpdateResult(updated, - mapName, key, oldValue, newValue))); - } - - @Override - public CompletableFuture> mapClear(String mapName) { - throw new UnsupportedOperationException(); - } - - @Override - public CompletableFuture> mapKeySet(String mapName) { - return CompletableFuture.completedFuture(unmodifiableSet(map.keySet())); - } - - @Override - public CompletableFuture>> mapValues(String mapName) { - return CompletableFuture.completedFuture(unmodifiableCollection(map.values())); - } - - @Override - public CompletableFuture>>> mapEntrySet(String mapName) { - return CompletableFuture.completedFuture(unmodifiableSet(map.entrySet())); - } - - @Override - public CompletableFuture counterAddAndGet(String counterName, - long delta) { - throw new UnsupportedOperationException(); - } - - @Override - public CompletableFuture counterGetAndAdd(String counterName, - long delta) { - throw new UnsupportedOperationException(); - } - - @Override - public CompletableFuture counterSet(String counterName, - long value) { - throw new UnsupportedOperationException(); - } - - @Override - public CompletableFuture counterCompareAndSet(String counterName, - long expectedValue, - long update) { - throw new UnsupportedOperationException(); - } - - @Override - public CompletableFuture counterGet(String counterName) { - throw new UnsupportedOperationException(); - } - - @Override - public CompletableFuture queueSize(String queueName) { - throw new UnsupportedOperationException(); - } - - @Override - public CompletableFuture queuePush(String queueName, - byte[] entry) { - throw new UnsupportedOperationException(); - } - - @Override - public CompletableFuture queuePop(String queueName) { - throw new UnsupportedOperationException(); - } - - @Override - public CompletableFuture queuePeek(String queueName) { - throw new UnsupportedOperationException(); - } - - @Override - public CompletableFuture prepareAndCommit(Transaction transaction) { - throw new UnsupportedOperationException(); - } - - @Override - public CompletableFuture prepare(Transaction transaction) { - throw new UnsupportedOperationException(); - } - - @Override - public CompletableFuture commit(Transaction transaction) { - throw new UnsupportedOperationException(); - } - - @Override - public CompletableFuture rollback(Transaction transaction) { - throw new UnsupportedOperationException(); - } - - @Override - public String name() { - return "name"; - } - - @Override - public ResourceState state() { - return ResourceState.HEALTHY; - } - - @Override - public Cluster cluster() { - throw new UnsupportedOperationException(); - } - - @Override - public Database addStartupTask(Task> task) { - throw new UnsupportedOperationException(); - } - - @Override - public Database addShutdownTask(Task> task) { - throw new UnsupportedOperationException(); - } - - @Override - public CompletableFuture open() { - return CompletableFuture.completedFuture(this); - } - - @Override - public boolean isOpen() { - return true; - } - - @Override - public CompletableFuture close() { - return CompletableFuture.completedFuture(null); - } - - @Override - public boolean isClosed() { - return false; - } - - @Override - public void registerConsumer(Consumer consumer) { - } - - @Override - public void unregisterConsumer(Consumer consumer) { - } - } - -} diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/ResultTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/ResultTest.java deleted file mode 100644 index 3624fa2945..0000000000 --- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/ResultTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import static junit.framework.TestCase.assertEquals; -import static junit.framework.TestCase.assertFalse; -import static junit.framework.TestCase.assertNull; -import static junit.framework.TestCase.assertTrue; - -import org.junit.Test; - -/** - * Unit tests for Result. - */ -public class ResultTest { - - @Test - public void testLocked() { - Result r = Result.locked(); - assertFalse(r.success()); - assertNull(r.value()); - assertEquals(Result.Status.LOCKED, r.status()); - } - - @Test - public void testOk() { - Result r = Result.ok("foo"); - assertTrue(r.success()); - assertEquals("foo", r.value()); - assertEquals(Result.Status.OK, r.status()); - } - - @Test - public void testEquality() { - Result r1 = Result.ok("foo"); - Result r2 = Result.locked(); - Result r3 = Result.ok("bar"); - Result r4 = Result.ok("foo"); - assertTrue(r1.equals(r4)); - assertFalse(r1.equals(r2)); - assertFalse(r1.equals(r3)); - assertFalse(r2.equals(r3)); - } -} diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/UpdateResultTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/UpdateResultTest.java deleted file mode 100644 index 3426315108..0000000000 --- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/UpdateResultTest.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.primitives.impl; - -import static junit.framework.TestCase.assertEquals; -import static junit.framework.TestCase.assertNull; -import static junit.framework.TestCase.assertTrue; - -import org.junit.Test; -import org.onosproject.store.service.MapEvent; -import org.onosproject.store.service.Versioned; - -/** - * Unit tests for UpdateResult. - */ -public class UpdateResultTest { - - @Test - public void testGetters() { - Versioned oldValue = new Versioned<>("a", 1); - Versioned newValue = new Versioned<>("b", 2); - UpdateResult ur = - new UpdateResult<>(true, "foo", "k", oldValue, newValue); - - assertTrue(ur.updated()); - assertEquals("foo", ur.mapName()); - assertEquals("k", ur.key()); - assertEquals(oldValue, ur.oldValue()); - assertEquals(newValue, ur.newValue()); - } - - @Test - public void testToMapEvent() { - Versioned oldValue = new Versioned<>("a", 1); - Versioned newValue = new Versioned<>("b", 2); - UpdateResult ur1 = - new UpdateResult<>(true, "foo", "k", oldValue, newValue); - MapEvent event1 = ur1.toMapEvent(); - assertEquals(MapEvent.Type.UPDATE, event1.type()); - assertEquals("k", event1.key()); - assertEquals(newValue, event1.value()); - - UpdateResult ur2 = - new UpdateResult<>(true, "foo", "k", null, newValue); - MapEvent event2 = ur2.toMapEvent(); - assertEquals(MapEvent.Type.INSERT, event2.type()); - assertEquals("k", event2.key()); - assertEquals(newValue, event2.value()); - - UpdateResult ur3 = - new UpdateResult<>(true, "foo", "k", oldValue, null); - MapEvent event3 = ur3.toMapEvent(); - assertEquals(MapEvent.Type.REMOVE, event3.type()); - assertEquals("k", event3.key()); - assertEquals(oldValue, event3.value()); - - UpdateResult ur4 = - new UpdateResult<>(false, "foo", "k", oldValue, oldValue); - assertNull(ur4.toMapEvent()); - } - - @Test - public void testMap() { - Versioned oldValue = new Versioned<>("a", 1); - Versioned newValue = new Versioned<>("b", 2); - UpdateResult ur1 = - new UpdateResult<>(true, "foo", "k", oldValue, newValue); - UpdateResult ur2 = ur1.map(s -> s.length(), s -> s.length()); - - assertEquals(ur2.updated(), ur1.updated()); - assertEquals(ur1.mapName(), ur2.mapName()); - assertEquals(new Integer(1), ur2.key()); - assertEquals(oldValue.map(s -> s.length()), ur2.oldValue()); - assertEquals(newValue.map(s -> s.length()), ur2.newValue()); - - UpdateResult ur3 = - new UpdateResult<>(true, "foo", "k", null, newValue); - UpdateResult ur4 = ur3.map(s -> s.length(), s -> s.length()); - - assertEquals(ur3.updated(), ur4.updated()); - assertEquals(ur3.mapName(), ur4.mapName()); - assertEquals(new Integer(1), ur4.key()); - assertNull(ur4.oldValue()); - assertEquals(newValue.map(s -> s.length()), ur4.newValue()); - } -} diff --git a/pom.xml b/pom.xml index 8f03f9bc2e..15f6a26560 100644 --- a/pom.xml +++ b/pom.xml @@ -80,8 +80,7 @@ 1.0.6 1.0.0-rc3 - 1.0.0-rc6 - 0.5.1.onos + 1.0.0-rc6 0.9.3.onos-SNAPSHOT 1.9 4.3.1 diff --git a/utils/thirdparty/pom.xml b/utils/thirdparty/pom.xml index a1fbcd40ed..8381be7f09 100644 --- a/utils/thirdparty/pom.xml +++ b/utils/thirdparty/pom.xml @@ -53,28 +53,15 @@ io.atomix.copycat copycat-client - ${atomix.copycat.version} + ${copycat.version} io.atomix.copycat copycat-server - ${atomix.copycat.version} - - - - - org.onosproject - copycat-api ${copycat.version} - - - org.onosproject - copycat-core - ${copycat.version} - @@ -98,16 +85,6 @@ - - org.onosproject:copycat* - - ** - - - net/kuujo/copycat/** - - - io.atomix:atomix-all @@ -132,7 +109,7 @@ - net.kuujo.copycat.*;io.atomix.* + io.atomix.* !sun.nio.ch,!sun.misc,*