diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AsyncCachingConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AsyncCachingConsistentMap.java
deleted file mode 100644
index 9df1b3bd26..0000000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/AsyncCachingConsistentMap.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.concurrent.CompletableFuture;
-
-import org.onosproject.core.ApplicationId;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
-/**
- * Extension of {@link DefaultAsyncConsistentMap} that provides a weaker read consistency
- * guarantee in return for better read performance.
- *
- * For read/write operations that are local to a node this map implementation provides
- * guarantees similar to a ConsistentMap. However for read/write operations executed
- * across multiple nodes this implementation only provides eventual consistency.
- *
- * @param key type
- * @param value type
- */
-public class AsyncCachingConsistentMap extends DefaultAsyncConsistentMap {
-
- private final LoadingCache>> cache =
- CacheBuilder.newBuilder()
- .maximumSize(10000) // TODO: make configurable
- .build(new CacheLoader>>() {
- @Override
- public CompletableFuture> load(K key)
- throws Exception {
- return AsyncCachingConsistentMap.super.get(key);
- }
- });
-
- public AsyncCachingConsistentMap(String name,
- ApplicationId applicationId,
- Database database,
- Serializer serializer,
- boolean readOnly,
- boolean purgeOnUninstall,
- boolean meteringEnabled) {
- super(name, applicationId, database, serializer, readOnly, purgeOnUninstall, meteringEnabled);
- addListener(event -> cache.invalidate(event.key()));
- }
-
- @Override
- public CompletableFuture> get(K key) {
- CompletableFuture> cachedValue = cache.getIfPresent(key);
- if (cachedValue != null) {
- if (cachedValue.isCompletedExceptionally()) {
- cache.invalidate(key);
- } else {
- return cachedValue;
- }
- }
- return cache.getUnchecked(key);
- }
-
- @Override
- protected void beforeUpdate(K key) {
- super.beforeUpdate(key);
- cache.invalidate(key);
- }
-}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CommitResponse.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CommitResponse.java
deleted file mode 100644
index a18ade4896..0000000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CommitResponse.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-import java.util.Collections;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * Result of a Transaction commit operation.
- */
-public final class CommitResponse {
-
- private boolean success;
- private List> updates;
-
- public static CommitResponse success(List> updates) {
- return new CommitResponse(true, updates);
- }
-
- public static CommitResponse failure() {
- return new CommitResponse(false, Collections.emptyList());
- }
-
- private CommitResponse(boolean success, List> updates) {
- this.success = success;
- this.updates = ImmutableList.copyOf(updates);
- }
-
- public boolean success() {
- return success;
- }
-
- public List> updates() {
- return updates;
- }
-
- @Override
- public String toString() {
- return toStringHelper(this)
- .add("success", success)
- .add("udpates", updates)
- .toString();
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatCommunicationProtocol.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatCommunicationProtocol.java
deleted file mode 100644
index e40665f27c..0000000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatCommunicationProtocol.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
-
-import org.onlab.util.Tools;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-
-import net.kuujo.copycat.protocol.AbstractProtocol;
-import net.kuujo.copycat.protocol.ProtocolClient;
-import net.kuujo.copycat.protocol.ProtocolHandler;
-import net.kuujo.copycat.protocol.ProtocolServer;
-import net.kuujo.copycat.util.Configurable;
-
-/**
- * Protocol for Copycat communication that employs
- * {@code ClusterCommunicationService}.
- */
-public class CopycatCommunicationProtocol extends AbstractProtocol {
-
- private static final MessageSubject COPYCAT_MESSAGE_SUBJECT =
- new MessageSubject("onos-copycat-message");
-
- protected ClusterService clusterService;
- protected ClusterCommunicationService clusterCommunicator;
-
- public CopycatCommunicationProtocol(ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator) {
- this.clusterService = clusterService;
- this.clusterCommunicator = clusterCommunicator;
- }
-
- @Override
- public Configurable copy() {
- return this;
- }
-
- @Override
- public ProtocolClient createClient(URI uri) {
- NodeId nodeId = uriToNodeId(uri);
- if (nodeId == null) {
- throw new IllegalStateException("Unknown peer " + uri);
- }
- return new Client(nodeId);
- }
-
- @Override
- public ProtocolServer createServer(URI uri) {
- return new Server();
- }
-
- private class Server implements ProtocolServer {
-
- @Override
- public void handler(ProtocolHandler handler) {
- if (handler == null) {
- clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
- } else {
- clusterCommunicator.addSubscriber(COPYCAT_MESSAGE_SUBJECT,
- ByteBuffer::wrap,
- handler,
- Tools::byteBuffertoArray);
- // FIXME: Tools::byteBuffertoArray involves a array copy.
- }
- }
-
- @Override
- public CompletableFuture listen() {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture close() {
- clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
- return CompletableFuture.completedFuture(null);
- }
- }
-
- private class Client implements ProtocolClient {
- private final NodeId peer;
-
- public Client(NodeId peer) {
- this.peer = peer;
- }
-
- @Override
- public CompletableFuture write(ByteBuffer request) {
- return clusterCommunicator.sendAndReceive(request,
- COPYCAT_MESSAGE_SUBJECT,
- Tools::byteBuffertoArray,
- ByteBuffer::wrap,
- peer);
- }
-
- @Override
- public CompletableFuture connect() {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture close() {
- return CompletableFuture.completedFuture(null);
- }
- }
-
- private NodeId uriToNodeId(URI uri) {
- return clusterService.getNodes()
- .stream()
- .filter(node -> uri.getHost().equals(node.ip().toString()))
- .map(ControllerNode::id)
- .findAny()
- .orElse(null);
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Database.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Database.java
deleted file mode 100644
index bacf9f692c..0000000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Database.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.primitives.impl;
-
-
-import java.util.function.Consumer;
-
-import net.kuujo.copycat.cluster.ClusterConfig;
-import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
-import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig;
-import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
-import net.kuujo.copycat.resource.Resource;
-
-/**
- * Database.
- */
-public interface Database extends DatabaseProxy, Resource {
-
- /**
- * Creates a new database with the default cluster configuration.
- *
- * The database will be constructed with the default cluster configuration. The default cluster configuration
- * searches for two resources on the classpath - {@code cluster} and {cluster-defaults} - in that order. Configuration
- * options specified in {@code cluster.conf} will override those in {cluster-defaults.conf}.
- *
- * Additionally, the database will be constructed with an database configuration that searches the classpath for
- * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and
- * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
- * as the map resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
- * configurations will be loaded according to namespaces as well; for example, `databases.conf`.
- *
- * @param name The database name.
- * @return The database.
- */
- static Database create(String name) {
- return create(name, new ClusterConfig(), new DatabaseConfig());
- }
-
- /**
- * Creates a new database.
- *
- * The database will be constructed with an database configuration that searches the classpath for
- * three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and
- * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
- * as the database resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
- * configurations will be loaded according to namespaces as well; for example, `databases.conf`.
- *
- * @param name The database name.
- * @param cluster The cluster configuration.
- * @return The database.
- */
- static Database create(String name, ClusterConfig cluster) {
- return create(name, cluster, new DatabaseConfig());
- }
-
- /**
- * Creates a new database.
- *
- * @param name The database name.
- * @param cluster The cluster configuration.
- * @param config The database configuration.
-
- * @return The database.
- */
- static Database create(String name, ClusterConfig cluster, DatabaseConfig config) {
- ClusterCoordinator coordinator =
- new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster));
- return coordinator.getResource(name, config.resolve(cluster))
- .addStartupTask(() -> coordinator.open().thenApply(v -> null))
- .addShutdownTask(coordinator::close);
- }
-
- /**
- * Tells whether the database supports change notifications.
- * @return true if notifications are supported; false otherwise
- */
- default boolean hasChangeNotificationSupport() {
- return true;
- }
-
- /**
- * Registers a new consumer of StateMachineUpdates.
- * @param consumer consumer to register
- */
- void registerConsumer(Consumer consumer);
-
- /**
- * Unregisters a consumer of StateMachineUpdates.
- * @param consumer consumer to unregister
- */
- void unregisterConsumer(Consumer consumer);
-}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseConfig.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseConfig.java
deleted file mode 100644
index 57dd31c302..0000000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseConfig.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.primitives.impl;
-
-import com.typesafe.config.ConfigValueFactory;
-import net.kuujo.copycat.cluster.ClusterConfig;
-import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig;
-import net.kuujo.copycat.protocol.Consistency;
-import net.kuujo.copycat.resource.ResourceConfig;
-import net.kuujo.copycat.state.StateLogConfig;
-import net.kuujo.copycat.util.internal.Assert;
-
-import java.util.Map;
-
-/**
- * Database configuration.
- *
- */
-public class DatabaseConfig extends ResourceConfig {
- private static final String DATABASE_CONSISTENCY = "consistency";
-
- private static final String DEFAULT_CONFIGURATION = "database-defaults";
- private static final String CONFIGURATION = "database";
-
- private String name;
-
- public DatabaseConfig() {
- super(CONFIGURATION, DEFAULT_CONFIGURATION);
- }
-
- public DatabaseConfig(Map config) {
- super(config, CONFIGURATION, DEFAULT_CONFIGURATION);
- }
-
- public DatabaseConfig(String resource) {
- super(resource, CONFIGURATION, DEFAULT_CONFIGURATION);
- }
-
- protected DatabaseConfig(DatabaseConfig config) {
- super(config);
- }
-
- @Override
- public DatabaseConfig copy() {
- return new DatabaseConfig(this);
- }
-
- /**
- * Sets the database read consistency.
- *
- * @param consistency The database read consistency.
- * @throws java.lang.NullPointerException If the consistency is {@code null}
- */
- public void setConsistency(String consistency) {
- this.config = config.withValue(DATABASE_CONSISTENCY,
- ConfigValueFactory.fromAnyRef(
- Consistency.parse(Assert.isNotNull(consistency, "consistency")).toString()));
- }
-
- /**
- * Sets the database read consistency.
- *
- * @param consistency The database read consistency.
- * @throws java.lang.NullPointerException If the consistency is {@code null}
- */
- public void setConsistency(Consistency consistency) {
- this.config = config.withValue(DATABASE_CONSISTENCY,
- ConfigValueFactory.fromAnyRef(
- Assert.isNotNull(consistency, "consistency").toString()));
- }
-
- /**
- * Returns the database read consistency.
- *
- * @return The database read consistency.
- */
- public Consistency getConsistency() {
- return Consistency.parse(config.getString(DATABASE_CONSISTENCY));
- }
-
- /**
- * Sets the database read consistency, returning the configuration for method chaining.
- *
- * @param consistency The database read consistency.
- * @return The database configuration.
- * @throws java.lang.NullPointerException If the consistency is {@code null}
- */
- public DatabaseConfig withConsistency(String consistency) {
- setConsistency(consistency);
- return this;
- }
-
- /**
- * Sets the database read consistency, returning the configuration for method chaining.
- *
- * @param consistency The database read consistency.
- * @return The database configuration.
- * @throws java.lang.NullPointerException If the consistency is {@code null}
- */
- public DatabaseConfig withConsistency(Consistency consistency) {
- setConsistency(consistency);
- return this;
- }
-
- /**
- * Returns the database name.
- *
- * @return The database name
- */
- public String getName() {
- return name;
- }
-
- /**
- * Sets the database name, returning the configuration for method chaining.
- *
- * @param name The database name
- * @return The database configuration
- * @throws java.lang.NullPointerException If the name is {@code null}
- */
- public DatabaseConfig withName(String name) {
- setName(Assert.isNotNull(name, "name"));
- return this;
- }
-
- /**
- * Sets the database name.
- *
- * @param name The database name
- * @throws java.lang.NullPointerException If the name is {@code null}
- */
- public void setName(String name) {
- this.name = Assert.isNotNull(name, "name");
- }
-
- @Override
- public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
- return new StateLogConfig(toMap())
- .resolve(cluster)
- .withResourceType(DefaultDatabase.class);
- }
-
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java
deleted file mode 100644
index 0671ab4bf0..0000000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseManager.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.primitives.impl;
-
-import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED;
-import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import net.kuujo.copycat.CopycatConfig;
-import net.kuujo.copycat.cluster.ClusterConfig;
-import net.kuujo.copycat.cluster.Member;
-import net.kuujo.copycat.cluster.Member.Type;
-import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
-import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
-import net.kuujo.copycat.log.BufferedLog;
-import net.kuujo.copycat.log.FileLog;
-import net.kuujo.copycat.log.Log;
-import net.kuujo.copycat.protocol.Consistency;
-import net.kuujo.copycat.protocol.Protocol;
-import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.app.ApplicationEvent;
-import org.onosproject.app.ApplicationListener;
-import org.onosproject.app.ApplicationService;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.AtomicCounterBuilder;
-import org.onosproject.store.service.AtomicValueBuilder;
-import org.onosproject.store.service.ConsistentMapBuilder;
-import org.onosproject.store.service.ConsistentMapException;
-import org.onosproject.store.service.DistributedQueueBuilder;
-import org.onosproject.store.service.DistributedSetBuilder;
-import org.onosproject.store.service.EventuallyConsistentMapBuilder;
-import org.onosproject.store.service.LeaderElectorBuilder;
-import org.onosproject.store.service.MapInfo;
-import org.onosproject.store.service.PartitionInfo;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageAdminService;
-import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.TransactionContextBuilder;
-import org.slf4j.Logger;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-
-/**
- * Database manager.
- */
-@Component(immediate = true, enabled = false)
-@Service
-public class DatabaseManager implements StorageService, StorageAdminService {
-
- private final Logger log = getLogger(getClass());
-
- public static final String BASE_PARTITION_NAME = "p0";
-
- private static final int RAFT_ELECTION_TIMEOUT_MILLIS = 3000;
- private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000;
-
- private ClusterCoordinator coordinator;
- protected PartitionedDatabase partitionedDatabase;
- protected Database inMemoryDatabase;
- protected NodeId localNodeId;
-
- private TransactionManager transactionManager;
- private final Supplier transactionIdGenerator =
- () -> TransactionId.from(UUID.randomUUID().toString());
-
- private ApplicationListener appListener = new InternalApplicationListener();
-
- private final Multimap maps =
- Multimaps.synchronizedMultimap(ArrayListMultimap.create());
- private final Multimap mapsByApplication =
- Multimaps.synchronizedMultimap(ArrayListMultimap.create());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterMetadataService clusterMetadataService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC)
- protected ApplicationService applicationService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected PersistenceService persistenceService;
-
- protected String nodeIdToUri(NodeId nodeId) {
- ControllerNode node = clusterService.getNode(nodeId);
- return String.format("onos://%s:%d", node.ip(), node.tcpPort());
- }
-
- protected void bindApplicationService(ApplicationService service) {
- applicationService = service;
- applicationService.addListener(appListener);
- }
-
- protected void unbindApplicationService(ApplicationService service) {
- applicationService.removeListener(appListener);
- this.applicationService = null;
- }
-
- @Activate
- public void activate() {
- localNodeId = clusterService.getLocalNode().id();
-
- Map> partitionMap = Maps.newHashMap();
- clusterMetadataService.getClusterMetadata().getPartitions().forEach(p -> {
- partitionMap.put(p.getId(), Sets.newHashSet(p.getMembers()));
- });
-
- String[] activeNodeUris = partitionMap.values()
- .stream()
- .reduce((s1, s2) -> Sets.union(s1, s2))
- .get()
- .stream()
- .map(this::nodeIdToUri)
- .toArray(String[]::new);
-
- String localNodeUri = nodeIdToUri(clusterMetadataService.getLocalNode().id());
- Protocol protocol = new CopycatCommunicationProtocol(clusterService, clusterCommunicator);
-
- ClusterConfig clusterConfig = new ClusterConfig()
- .withProtocol(protocol)
- .withElectionTimeout(electionTimeoutMillis(activeNodeUris))
- .withHeartbeatInterval(heartbeatTimeoutMillis(activeNodeUris))
- .withMembers(activeNodeUris)
- .withLocalMember(localNodeUri);
-
- CopycatConfig copycatConfig = new CopycatConfig()
- .withName("onos")
- .withClusterConfig(clusterConfig)
- .withDefaultSerializer(new DatabaseSerializer())
- .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
-
- coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
-
- Function logFunction = id -> id.asInt() == 0 ? newInMemoryLog() : newPersistentLog();
-
- Map databases = Maps.transformEntries(partitionMap, (k, v) -> {
- String[] replicas = v.stream().map(this::nodeIdToUri).toArray(String[]::new);
- DatabaseConfig config = newDatabaseConfig(String.format("p%s", k), logFunction.apply(k), replicas);
- return coordinator.getResource(config.getName(), config.resolve(clusterConfig)
- .withSerializer(copycatConfig.getDefaultSerializer())
- .withDefaultExecutor(copycatConfig.getDefaultExecutor()));
- });
-
- inMemoryDatabase = databases.remove(PartitionId.from(0));
-
- partitionedDatabase = new PartitionedDatabase("onos-store", databases.values());
-
- CompletableFuture status = coordinator.open()
- .thenCompose(v -> CompletableFuture.allOf(inMemoryDatabase.open(), partitionedDatabase.open())
- .whenComplete((db, error) -> {
- if (error != null) {
- log.error("Failed to initialize database.", error);
- } else {
- log.info("Successfully initialized database.");
- }
- }));
-
- Futures.getUnchecked(status);
-
- AsyncConsistentMap transactions =
- this.consistentMapBuilder()
- .withName("onos-transactions")
- .withSerializer(Serializer.using(KryoNamespaces.API,
- MapUpdate.class,
- MapUpdate.Type.class,
- Transaction.class,
- Transaction.State.class))
- .buildAsyncMap();
-
- transactionManager = new TransactionManager(partitionedDatabase, transactions);
- partitionedDatabase.setTransactionManager(transactionManager);
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- CompletableFuture.allOf(inMemoryDatabase.close(), partitionedDatabase.close())
- .thenCompose(v -> coordinator.close())
- .whenComplete((result, error) -> {
- if (error != null) {
- log.warn("Failed to cleanly close databases.", error);
- } else {
- log.info("Successfully closed databases.");
- }
- });
- ImmutableList.copyOf(maps.values()).forEach(this::unregisterMap);
- if (applicationService != null) {
- applicationService.removeListener(appListener);
- }
- log.info("Stopped");
- }
-
- @Override
- public TransactionContextBuilder transactionContextBuilder() {
- return new DefaultTransactionContextBuilder(this::consistentMapBuilder,
- transactionManager::execute,
- transactionIdGenerator.get());
- }
-
- @Override
- public List getPartitionInfo() {
- return Lists.asList(
- inMemoryDatabase,
- partitionedDatabase.getPartitions().toArray(new Database[]{}))
- .stream()
- .map(DatabaseManager::toPartitionInfo)
- .collect(Collectors.toList());
- }
-
- private Log newPersistentLog() {
- String logDir = System.getProperty("karaf.data", "./data");
- return new FileLog()
- .withDirectory(logDir)
- .withSegmentSize(1073741824) // 1GB
- .withFlushOnWrite(true)
- .withSegmentInterval(Long.MAX_VALUE);
- }
-
- private Log newInMemoryLog() {
- return new BufferedLog()
- .withFlushOnWrite(false)
- .withFlushInterval(Long.MAX_VALUE)
- .withSegmentSize(10485760) // 10MB
- .withSegmentInterval(Long.MAX_VALUE);
- }
-
- private DatabaseConfig newDatabaseConfig(String name, Log log, String[] replicas) {
- return new DatabaseConfig()
- .withName(name)
- .withElectionTimeout(electionTimeoutMillis(replicas))
- .withHeartbeatInterval(heartbeatTimeoutMillis(replicas))
- .withConsistency(Consistency.DEFAULT)
- .withLog(log)
- .withDefaultSerializer(new DatabaseSerializer())
- .withReplicas(replicas);
- }
-
- private long electionTimeoutMillis(String[] replicas) {
- return replicas.length == 1 ? 10L : RAFT_ELECTION_TIMEOUT_MILLIS;
- }
-
- private long heartbeatTimeoutMillis(String[] replicas) {
- return electionTimeoutMillis(replicas) / 2;
- }
-
- /**
- * Maps a Raft Database object to a PartitionInfo object.
- *
- * @param database database containing input data
- * @return PartitionInfo object
- */
- private static PartitionInfo toPartitionInfo(Database database) {
- return new PartitionInfo(database.name(),
- database.cluster().term(),
- database.cluster().members()
- .stream()
- .filter(member -> Type.ACTIVE.equals(member.type()))
- .map(Member::uri)
- .sorted()
- .collect(Collectors.toList()),
- database.cluster().leader() != null ?
- database.cluster().leader().uri() : null);
- }
-
-
- @Override
- public EventuallyConsistentMapBuilder eventuallyConsistentMapBuilder() {
- return new EventuallyConsistentMapBuilderImpl<>(clusterService,
- clusterCommunicator,
- persistenceService);
- }
-
- @Override
- public ConsistentMapBuilder consistentMapBuilder() {
- return new DefaultConsistentMapBuilder<>(this);
- }
-
- @Override
- public DistributedSetBuilder setBuilder() {
- return new DefaultDistributedSetBuilder<>(() -> this.consistentMapBuilder());
- }
-
-
- @Override
- public DistributedQueueBuilder queueBuilder() {
- return new DefaultDistributedQueueBuilder<>(this);
- }
-
- @Override
- public AtomicCounterBuilder atomicCounterBuilder() {
- return new DefaultAtomicCounterBuilder(inMemoryDatabase, partitionedDatabase);
- }
-
- @Override
- public AtomicValueBuilder atomicValueBuilder() {
- Supplier> mapBuilderSupplier =
- () -> this.consistentMapBuilder()
- .withName("onos-atomic-values")
- .withMeteringDisabled()
- .withSerializer(Serializer.using(KryoNamespaces.BASIC));
- return new DefaultAtomicValueBuilder<>(mapBuilderSupplier);
- }
-
- @Override
- public LeaderElectorBuilder leaderElectorBuilder() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List getMapInfo() {
- List maps = Lists.newArrayList();
- maps.addAll(getMapInfo(inMemoryDatabase));
- maps.addAll(getMapInfo(partitionedDatabase));
- return maps;
- }
-
- private List getMapInfo(Database database) {
- return complete(database.maps())
- .stream()
- .map(name -> new MapInfo(name, complete(database.mapSize(name))))
- .filter(info -> info.size() > 0)
- .collect(Collectors.toList());
- }
-
-
- @Override
- public Map getCounters() {
- Map counters = Maps.newHashMap();
- counters.putAll(complete(inMemoryDatabase.counters()));
- counters.putAll(complete(partitionedDatabase.counters()));
- return counters;
- }
-
- @Override
- public Map getPartitionedDatabaseCounters() {
- Map counters = Maps.newHashMap();
- counters.putAll(complete(partitionedDatabase.counters()));
- return counters;
- }
-
- @Override
- public Map getInMemoryDatabaseCounters() {
- Map counters = Maps.newHashMap();
- counters.putAll(complete(inMemoryDatabase.counters()));
- return counters;
- }
-
- @Override
- public Collection getPendingTransactions() {
- return complete(transactionManager.getPendingTransactionIds());
- }
-
- private static T complete(CompletableFuture future) {
- try {
- return future.get(DATABASE_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ConsistentMapException.Interrupted();
- } catch (TimeoutException e) {
- throw new ConsistentMapException.Timeout();
- } catch (ExecutionException e) {
- throw new ConsistentMapException(e.getCause());
- }
- }
-
- protected DefaultAsyncConsistentMap registerMap(DefaultAsyncConsistentMap map) {
- maps.put(map.name(), map);
- if (map.applicationId() != null) {
- mapsByApplication.put(map.applicationId(), map);
- }
- return map;
- }
-
- protected void unregisterMap(DefaultAsyncConsistentMap map) {
- maps.remove(map.name(), map);
- if (map.applicationId() != null) {
- mapsByApplication.remove(map.applicationId(), map);
- }
- }
-
- private class InternalApplicationListener implements ApplicationListener {
- @Override
- public void event(ApplicationEvent event) {
- if (event.type() == APP_UNINSTALLED || event.type() == APP_DEACTIVATED) {
- ApplicationId appId = event.subject().id();
- List mapsToRemove;
- synchronized (mapsByApplication) {
- mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId));
- }
- mapsToRemove.forEach(DatabaseManager.this::unregisterMap);
- if (event.type() == APP_UNINSTALLED) {
- mapsToRemove.stream().filter(map -> map.purgeOnUninstall()).forEach(map -> map.clear());
- }
- }
- }
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabasePartitioner.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabasePartitioner.java
deleted file mode 100644
index f5ccddae33..0000000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabasePartitioner.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.primitives.impl;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import java.util.List;
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.common.hash.Hashing;
-
-/**
- * Partitioner for mapping map entries to individual database partitions.
- *
- * By default a md5 hash of the hash key (key or map name) is used to pick a
- * partition.
- */
-public abstract class DatabasePartitioner implements Partitioner {
- // Database partitions sorted by their partition name.
- protected final List partitions;
-
- public DatabasePartitioner(List partitions) {
- checkState(partitions != null && !partitions.isEmpty(), "Partitions cannot be null or empty");
- this.partitions = ImmutableList.copyOf(partitions);
- }
-
- protected int hash(String key) {
- return Math.abs(Hashing.md5().newHasher().putBytes(key.getBytes(Charsets.UTF_8)).hash().asInt());
- }
-
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseProxy.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseProxy.java
deleted file mode 100644
index 83525284b2..0000000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseProxy.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.primitives.impl;
-
-import org.onlab.util.Match;
-import org.onosproject.store.service.Versioned;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Database proxy.
- */
-public interface DatabaseProxy {
-
- /**
- * Returns a set of all map names.
- *
- * @return A completable future to be completed with the result once complete.
- */
- CompletableFuture> maps();
-
- /**
- * Returns a mapping from counter name to next value.
- *
- * @return A completable future to be completed with the result once complete.
- */
- CompletableFuture