mirror of
				https://github.com/opennetworkinglab/onos.git
				synced 2025-10-24 22:01:02 +02:00 
			
		
		
		
	Support caching in AsyncDocumentTree primitive
Change-Id: I659a5c374891ece7083fa8bad5b1c52c9fa5c8d8
This commit is contained in:
		
							parent
							
								
									cb1e02cf89
								
							
						
					
					
						commit
						8d8da598dd
					
				| @ -0,0 +1,132 @@ | ||||
| /* | ||||
|  * Copyright 2017-present Open Networking Foundation | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| package org.onosproject.store.primitives.impl; | ||||
| 
 | ||||
| import java.util.concurrent.CompletableFuture; | ||||
| import java.util.function.Consumer; | ||||
| 
 | ||||
| import com.google.common.cache.CacheBuilder; | ||||
| import com.google.common.cache.CacheLoader; | ||||
| import com.google.common.cache.LoadingCache; | ||||
| import org.onosproject.store.service.AsyncDocumentTree; | ||||
| import org.onosproject.store.service.DocumentPath; | ||||
| import org.onosproject.store.service.DocumentTreeListener; | ||||
| import org.onosproject.store.service.Versioned; | ||||
| import org.slf4j.Logger; | ||||
| 
 | ||||
| import static org.onosproject.store.service.DistributedPrimitive.Status.INACTIVE; | ||||
| import static org.onosproject.store.service.DistributedPrimitive.Status.SUSPENDED; | ||||
| import static org.slf4j.LoggerFactory.getLogger; | ||||
| 
 | ||||
| /** | ||||
|  * Caching asynchronous document tree. | ||||
|  */ | ||||
| public class CachingAsyncDocumentTree<V> extends DelegatingAsyncDocumentTree<V> implements AsyncDocumentTree<V> { | ||||
|     private static final int DEFAULT_CACHE_SIZE = 10000; | ||||
|     private final Logger log = getLogger(getClass()); | ||||
| 
 | ||||
|     private final LoadingCache<DocumentPath, CompletableFuture<Versioned<V>>> cache; | ||||
|     private final DocumentTreeListener<V> cacheUpdater; | ||||
|     private final Consumer<Status> statusListener; | ||||
| 
 | ||||
|     /** | ||||
|      * Default constructor. | ||||
|      * | ||||
|      * @param backingTree a distributed, strongly consistent map for backing | ||||
|      */ | ||||
|     public CachingAsyncDocumentTree(AsyncDocumentTree<V> backingTree) { | ||||
|         this(backingTree, DEFAULT_CACHE_SIZE); | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
|      * Constructor to configure cache size. | ||||
|      * | ||||
|      * @param backingTree a distributed, strongly consistent map for backing | ||||
|      * @param cacheSize the maximum size of the cache | ||||
|      */ | ||||
|     public CachingAsyncDocumentTree(AsyncDocumentTree<V> backingTree, int cacheSize) { | ||||
|         super(backingTree); | ||||
|         cache = CacheBuilder.newBuilder() | ||||
|                 .maximumSize(cacheSize) | ||||
|                 .build(CacheLoader.from(CachingAsyncDocumentTree.super::get)); | ||||
|         cacheUpdater = event -> { | ||||
|             if (!event.newValue().isPresent()) { | ||||
|                 cache.invalidate(event.path()); | ||||
|             } else { | ||||
|                 cache.put(event.path(), CompletableFuture.completedFuture(event.newValue().get())); | ||||
|             } | ||||
|         }; | ||||
|         statusListener = status -> { | ||||
|             log.debug("{} status changed to {}", this.name(), status); | ||||
|             // If the status of the underlying map is SUSPENDED or INACTIVE | ||||
|             // we can no longer guarantee that the cache will be in sync. | ||||
|             if (status == SUSPENDED || status == INACTIVE) { | ||||
|                 cache.invalidateAll(); | ||||
|             } | ||||
|         }; | ||||
|         super.addListener(cacheUpdater); | ||||
|         super.addStatusChangeListener(statusListener); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Versioned<V>> get(DocumentPath path) { | ||||
|         return cache.getUnchecked(path); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) { | ||||
|         return super.set(path, value) | ||||
|                 .whenComplete((r, e) -> cache.invalidate(path)); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Boolean> create(DocumentPath path, V value) { | ||||
|         return super.create(path, value) | ||||
|                 .whenComplete((r, e) -> cache.invalidate(path)); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Boolean> createRecursive(DocumentPath path, V value) { | ||||
|         return super.createRecursive(path, value) | ||||
|                 .whenComplete((r, e) -> cache.invalidate(path)); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) { | ||||
|         return super.replace(path, newValue, version) | ||||
|                 .whenComplete((r, e) -> { | ||||
|                     if (r) { | ||||
|                         cache.invalidate(path); | ||||
|                     } | ||||
|                 }); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) { | ||||
|         return super.replace(path, newValue, currentValue) | ||||
|                 .whenComplete((r, e) -> { | ||||
|                     if (r) { | ||||
|                         cache.invalidate(path); | ||||
|                     } | ||||
|                 }); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) { | ||||
|         return super.removeNode(path) | ||||
|                 .whenComplete((r, e) -> cache.invalidate(path)); | ||||
|     } | ||||
| } | ||||
| @ -47,6 +47,8 @@ public class DefaultDocumentTreeBuilder<V> extends DocumentTreeBuilder<V> { | ||||
|     @Deprecated | ||||
|     @Override | ||||
|     public AsyncDocumentTree<V> build() { | ||||
|         return primitiveCreator.newAsyncDocumentTree(name(), serializer(), ordering()); | ||||
|         AsyncDocumentTree<V> tree = primitiveCreator.newAsyncDocumentTree(name(), serializer(), ordering()); | ||||
|         tree = relaxedReadConsistency() ? DistributedPrimitives.newCachingDocumentTree(tree) : tree; | ||||
|         return tree; | ||||
|     } | ||||
| } | ||||
| @ -0,0 +1,143 @@ | ||||
| /* | ||||
|  * Copyright 2017-present Open Networking Foundation | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| package org.onosproject.store.primitives.impl; | ||||
| 
 | ||||
| import java.util.Map; | ||||
| import java.util.Objects; | ||||
| import java.util.concurrent.CompletableFuture; | ||||
| 
 | ||||
| import com.google.common.base.MoreObjects; | ||||
| import org.onosproject.store.primitives.NodeUpdate; | ||||
| import org.onosproject.store.primitives.TransactionId; | ||||
| import org.onosproject.store.service.AsyncDocumentTree; | ||||
| import org.onosproject.store.service.DocumentPath; | ||||
| import org.onosproject.store.service.DocumentTreeListener; | ||||
| import org.onosproject.store.service.TransactionLog; | ||||
| import org.onosproject.store.service.Version; | ||||
| import org.onosproject.store.service.Versioned; | ||||
| 
 | ||||
| /** | ||||
|  * Document tree that delegates to an underlying instance. | ||||
|  */ | ||||
| public class DelegatingAsyncDocumentTree<V> extends DelegatingDistributedPrimitive implements AsyncDocumentTree<V> { | ||||
|     private final AsyncDocumentTree<V> delegateTree; | ||||
| 
 | ||||
|     public DelegatingAsyncDocumentTree(AsyncDocumentTree<V> delegateTree) { | ||||
|         super(delegateTree); | ||||
|         this.delegateTree = delegateTree; | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public DocumentPath root() { | ||||
|         return delegateTree.root(); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Map<String, Versioned<V>>> getChildren(DocumentPath path) { | ||||
|         return delegateTree.getChildren(path); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Versioned<V>> get(DocumentPath path) { | ||||
|         return delegateTree.get(path); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) { | ||||
|         return delegateTree.set(path, value); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Boolean> create(DocumentPath path, V value) { | ||||
|         return delegateTree.create(path, value); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Boolean> createRecursive(DocumentPath path, V value) { | ||||
|         return delegateTree.createRecursive(path, value); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) { | ||||
|         return delegateTree.replace(path, newValue, version); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) { | ||||
|         return delegateTree.replace(path, newValue, currentValue); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) { | ||||
|         return delegateTree.removeNode(path); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener) { | ||||
|         return delegateTree.addListener(path, listener); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Void> removeListener(DocumentTreeListener<V> listener) { | ||||
|         return delegateTree.removeListener(listener); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Version> begin(TransactionId transactionId) { | ||||
|         return delegateTree.begin(transactionId); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Boolean> prepare(TransactionLog<NodeUpdate<V>> transactionLog) { | ||||
|         return delegateTree.prepare(transactionLog); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<NodeUpdate<V>> transactionLog) { | ||||
|         return delegateTree.prepareAndCommit(transactionLog); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Void> commit(TransactionId transactionId) { | ||||
|         return delegateTree.commit(transactionId); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public CompletableFuture<Void> rollback(TransactionId transactionId) { | ||||
|         return delegateTree.rollback(transactionId); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public String toString() { | ||||
|         return MoreObjects.toStringHelper(getClass()) | ||||
|                 .add("delegateTree", delegateTree) | ||||
|                 .toString(); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public int hashCode() { | ||||
|         return Objects.hash(delegateTree); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public boolean equals(Object other) { | ||||
|         if (other instanceof DelegatingAsyncDocumentTree) { | ||||
|             DelegatingAsyncDocumentTree<V> that = (DelegatingAsyncDocumentTree) other; | ||||
|             return this.delegateTree.equals(that.delegateTree); | ||||
|         } | ||||
|         return false; | ||||
|     } | ||||
| } | ||||
| @ -20,6 +20,7 @@ import org.onosproject.store.service.AsyncConsistentMap; | ||||
| import org.onosproject.store.service.AsyncConsistentMultimap; | ||||
| import org.onosproject.store.service.AsyncConsistentTreeMap; | ||||
| import org.onosproject.store.service.AsyncDistributedSet; | ||||
| import org.onosproject.store.service.AsyncDocumentTree; | ||||
| 
 | ||||
| import java.util.function.Function; | ||||
| 
 | ||||
| @ -186,4 +187,15 @@ public final class DistributedPrimitives { | ||||
|                                                         valueEncoder); | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
|      * Creates an instance of {@code AsyncDocumentTree} that caches values on get. | ||||
|      * | ||||
|      * @param tree backing tree | ||||
|      * @return caching tree | ||||
|      * @param <V> tree value type | ||||
|      */ | ||||
|     public static <V> AsyncDocumentTree<V> newCachingDocumentTree(AsyncDocumentTree<V> tree) { | ||||
|         return new CachingAsyncDocumentTree<V>(tree); | ||||
|     } | ||||
| 
 | ||||
| } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user