diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncDocumentTree.java new file mode 100644 index 0000000000..d71f965a52 --- /dev/null +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncDocumentTree.java @@ -0,0 +1,132 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.primitives.impl; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.onosproject.store.service.AsyncDocumentTree; +import org.onosproject.store.service.DocumentPath; +import org.onosproject.store.service.DocumentTreeListener; +import org.onosproject.store.service.Versioned; +import org.slf4j.Logger; + +import static org.onosproject.store.service.DistributedPrimitive.Status.INACTIVE; +import static org.onosproject.store.service.DistributedPrimitive.Status.SUSPENDED; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Caching asynchronous document tree. + */ +public class CachingAsyncDocumentTree extends DelegatingAsyncDocumentTree implements AsyncDocumentTree { + private static final int DEFAULT_CACHE_SIZE = 10000; + private final Logger log = getLogger(getClass()); + + private final LoadingCache>> cache; + private final DocumentTreeListener cacheUpdater; + private final Consumer statusListener; + + /** + * Default constructor. + * + * @param backingTree a distributed, strongly consistent map for backing + */ + public CachingAsyncDocumentTree(AsyncDocumentTree backingTree) { + this(backingTree, DEFAULT_CACHE_SIZE); + } + + /** + * Constructor to configure cache size. + * + * @param backingTree a distributed, strongly consistent map for backing + * @param cacheSize the maximum size of the cache + */ + public CachingAsyncDocumentTree(AsyncDocumentTree backingTree, int cacheSize) { + super(backingTree); + cache = CacheBuilder.newBuilder() + .maximumSize(cacheSize) + .build(CacheLoader.from(CachingAsyncDocumentTree.super::get)); + cacheUpdater = event -> { + if (!event.newValue().isPresent()) { + cache.invalidate(event.path()); + } else { + cache.put(event.path(), CompletableFuture.completedFuture(event.newValue().get())); + } + }; + statusListener = status -> { + log.debug("{} status changed to {}", this.name(), status); + // If the status of the underlying map is SUSPENDED or INACTIVE + // we can no longer guarantee that the cache will be in sync. + if (status == SUSPENDED || status == INACTIVE) { + cache.invalidateAll(); + } + }; + super.addListener(cacheUpdater); + super.addStatusChangeListener(statusListener); + } + + @Override + public CompletableFuture> get(DocumentPath path) { + return cache.getUnchecked(path); + } + + @Override + public CompletableFuture> set(DocumentPath path, V value) { + return super.set(path, value) + .whenComplete((r, e) -> cache.invalidate(path)); + } + + @Override + public CompletableFuture create(DocumentPath path, V value) { + return super.create(path, value) + .whenComplete((r, e) -> cache.invalidate(path)); + } + + @Override + public CompletableFuture createRecursive(DocumentPath path, V value) { + return super.createRecursive(path, value) + .whenComplete((r, e) -> cache.invalidate(path)); + } + + @Override + public CompletableFuture replace(DocumentPath path, V newValue, long version) { + return super.replace(path, newValue, version) + .whenComplete((r, e) -> { + if (r) { + cache.invalidate(path); + } + }); + } + + @Override + public CompletableFuture replace(DocumentPath path, V newValue, V currentValue) { + return super.replace(path, newValue, currentValue) + .whenComplete((r, e) -> { + if (r) { + cache.invalidate(path); + } + }); + } + + @Override + public CompletableFuture> removeNode(DocumentPath path) { + return super.removeNode(path) + .whenComplete((r, e) -> cache.invalidate(path)); + } +} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java index 9db48b7028..1aeb0a0867 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java @@ -47,6 +47,8 @@ public class DefaultDocumentTreeBuilder extends DocumentTreeBuilder { @Deprecated @Override public AsyncDocumentTree build() { - return primitiveCreator.newAsyncDocumentTree(name(), serializer(), ordering()); + AsyncDocumentTree tree = primitiveCreator.newAsyncDocumentTree(name(), serializer(), ordering()); + tree = relaxedReadConsistency() ? DistributedPrimitives.newCachingDocumentTree(tree) : tree; + return tree; } } \ No newline at end of file diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncDocumentTree.java new file mode 100644 index 0000000000..6182c48e79 --- /dev/null +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncDocumentTree.java @@ -0,0 +1,143 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.primitives.impl; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +import com.google.common.base.MoreObjects; +import org.onosproject.store.primitives.NodeUpdate; +import org.onosproject.store.primitives.TransactionId; +import org.onosproject.store.service.AsyncDocumentTree; +import org.onosproject.store.service.DocumentPath; +import org.onosproject.store.service.DocumentTreeListener; +import org.onosproject.store.service.TransactionLog; +import org.onosproject.store.service.Version; +import org.onosproject.store.service.Versioned; + +/** + * Document tree that delegates to an underlying instance. + */ +public class DelegatingAsyncDocumentTree extends DelegatingDistributedPrimitive implements AsyncDocumentTree { + private final AsyncDocumentTree delegateTree; + + public DelegatingAsyncDocumentTree(AsyncDocumentTree delegateTree) { + super(delegateTree); + this.delegateTree = delegateTree; + } + + @Override + public DocumentPath root() { + return delegateTree.root(); + } + + @Override + public CompletableFuture>> getChildren(DocumentPath path) { + return delegateTree.getChildren(path); + } + + @Override + public CompletableFuture> get(DocumentPath path) { + return delegateTree.get(path); + } + + @Override + public CompletableFuture> set(DocumentPath path, V value) { + return delegateTree.set(path, value); + } + + @Override + public CompletableFuture create(DocumentPath path, V value) { + return delegateTree.create(path, value); + } + + @Override + public CompletableFuture createRecursive(DocumentPath path, V value) { + return delegateTree.createRecursive(path, value); + } + + @Override + public CompletableFuture replace(DocumentPath path, V newValue, long version) { + return delegateTree.replace(path, newValue, version); + } + + @Override + public CompletableFuture replace(DocumentPath path, V newValue, V currentValue) { + return delegateTree.replace(path, newValue, currentValue); + } + + @Override + public CompletableFuture> removeNode(DocumentPath path) { + return delegateTree.removeNode(path); + } + + @Override + public CompletableFuture addListener(DocumentPath path, DocumentTreeListener listener) { + return delegateTree.addListener(path, listener); + } + + @Override + public CompletableFuture removeListener(DocumentTreeListener listener) { + return delegateTree.removeListener(listener); + } + + @Override + public CompletableFuture begin(TransactionId transactionId) { + return delegateTree.begin(transactionId); + } + + @Override + public CompletableFuture prepare(TransactionLog> transactionLog) { + return delegateTree.prepare(transactionLog); + } + + @Override + public CompletableFuture prepareAndCommit(TransactionLog> transactionLog) { + return delegateTree.prepareAndCommit(transactionLog); + } + + @Override + public CompletableFuture commit(TransactionId transactionId) { + return delegateTree.commit(transactionId); + } + + @Override + public CompletableFuture rollback(TransactionId transactionId) { + return delegateTree.rollback(transactionId); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("delegateTree", delegateTree) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hash(delegateTree); + } + + @Override + public boolean equals(Object other) { + if (other instanceof DelegatingAsyncDocumentTree) { + DelegatingAsyncDocumentTree that = (DelegatingAsyncDocumentTree) other; + return this.delegateTree.equals(that.delegateTree); + } + return false; + } +} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java index 33b72a4b3e..d116988852 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java @@ -20,6 +20,7 @@ import org.onosproject.store.service.AsyncConsistentMap; import org.onosproject.store.service.AsyncConsistentMultimap; import org.onosproject.store.service.AsyncConsistentTreeMap; import org.onosproject.store.service.AsyncDistributedSet; +import org.onosproject.store.service.AsyncDocumentTree; import java.util.function.Function; @@ -186,4 +187,15 @@ public final class DistributedPrimitives { valueEncoder); } + /** + * Creates an instance of {@code AsyncDocumentTree} that caches values on get. + * + * @param tree backing tree + * @return caching tree + * @param tree value type + */ + public static AsyncDocumentTree newCachingDocumentTree(AsyncDocumentTree tree) { + return new CachingAsyncDocumentTree(tree); + } + }