diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValueOptions.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValueOptions.java index b00a7afff1..b42e5f3e6e 100644 --- a/core/api/src/main/java/org/onosproject/store/service/AtomicValueOptions.java +++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValueOptions.java @@ -15,6 +15,8 @@ */ package org.onosproject.store.service; +import java.util.function.BiFunction; + import org.onosproject.store.primitives.DistributedPrimitiveOptions; /** @@ -23,7 +25,22 @@ import org.onosproject.store.primitives.DistributedPrimitiveOptions; * @param atomic value type */ public abstract class AtomicValueOptions, V> extends DistributedPrimitiveOptions { + protected BiFunction compatibilityFunction; + public AtomicValueOptions() { super(DistributedPrimitive.Type.VALUE); } + + /** + * Sets a compatibility function on the map. + * + * @param compatibilityFunction the compatibility function + * @return the consistent map builder + */ + @SuppressWarnings("unchecked") + public O withCompatibilityFunction( + BiFunction compatibilityFunction) { + this.compatibilityFunction = compatibilityFunction; + return (O) this; + } } diff --git a/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java b/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java index 7be048086d..cc773933a0 100644 --- a/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java +++ b/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java @@ -113,6 +113,14 @@ public interface PrimitiveService { */ LeaderElectorBuilder leaderElectorBuilder(); + /** + * Creates a new TopicBuilder. + * + * @param topic value type + * @return topic builder + */ + TopicBuilder topicBuilder(); + /** * Creates a new transaction context builder. * diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java index 2dd42fa09f..aa2e890f32 100644 --- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java +++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java @@ -120,6 +120,14 @@ public interface StorageService extends PrimitiveService { */ LeaderElectorBuilder leaderElectorBuilder(); + /** + * Creates a new TopicBuilder. + * + * @param topic value type + * @return topic builder + */ + TopicBuilder topicBuilder(); + /** * Creates a new transaction context builder. * diff --git a/core/api/src/main/java/org/onosproject/store/service/TopicBuilder.java b/core/api/src/main/java/org/onosproject/store/service/TopicBuilder.java new file mode 100644 index 0000000000..59434320fd --- /dev/null +++ b/core/api/src/main/java/org/onosproject/store/service/TopicBuilder.java @@ -0,0 +1,28 @@ +/* + * Copyright 2018-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.service; + +import org.onosproject.store.primitives.DistributedPrimitiveBuilder; + +/** + * Builder for {@link Topic} instances. + * + * @param type for topic value + */ +public abstract class TopicBuilder + extends TopicOptions, T> + implements DistributedPrimitiveBuilder> { +} diff --git a/core/api/src/main/java/org/onosproject/store/service/TopicOptions.java b/core/api/src/main/java/org/onosproject/store/service/TopicOptions.java new file mode 100644 index 0000000000..7845b56f2b --- /dev/null +++ b/core/api/src/main/java/org/onosproject/store/service/TopicOptions.java @@ -0,0 +1,49 @@ +/* + * Copyright 2018-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.service; + +import java.util.function.BiFunction; + +import org.onosproject.store.primitives.DistributedPrimitiveOptions; + +/** + * Builder for {@link Topic} instances. + * + * @param type for topic value + */ +public abstract class TopicOptions, T> + extends DistributedPrimitiveOptions { + + protected BiFunction compatibilityFunction; + + public TopicOptions() { + super(DistributedPrimitive.Type.TOPIC); + } + + /** + * Sets a compatibility function on the map. + * + * @param compatibilityFunction the compatibility function + * @return the consistent map builder + */ + @SuppressWarnings("unchecked") + public O withCompatibilityFunction( + BiFunction compatibilityFunction) { + this.compatibilityFunction = compatibilityFunction; + return (O) this; + } + +} diff --git a/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java index f05d9aab05..68cb5fd419 100644 --- a/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java +++ b/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java @@ -74,6 +74,11 @@ public class CoordinationServiceAdapter implements CoordinationService { return null; } + @Override + public TopicBuilder topicBuilder() { + return null; + } + @Override public TransactionContextBuilder transactionContextBuilder() { return null; diff --git a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java index 4d528e2dc3..ae65500022 100644 --- a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java +++ b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java @@ -69,6 +69,11 @@ public class StorageServiceAdapter implements StorageService { return null; } + @Override + public TopicBuilder topicBuilder() { + return null; + } + @Override public WorkQueue getWorkQueue(String name, Serializer serializer) { return null; diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java index e03b645704..5976003576 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java @@ -177,8 +177,13 @@ public class DistributedApplicationStore extends ApplicationArchive .withCompatibilityFunction(this::convertApplication) .build(); - appActivationTopic = storageService.getTopic("onos-apps-activation-topic", - Serializer.using(KryoNamespaces.API)); + appActivationTopic = storageService.topicBuilder() + .withName("onos-apps-activation-topic") + .withSerializer(Serializer.using(KryoNamespaces.API)) + .withVersion(versionService.version()) + .withRevisionType(RevisionType.PROPAGATE) + .withCompatibilityFunction(this::convertApplication) + .build(); activationExecutor = newSingleThreadExecutor(groupedThreads("onos/store/app", "app-activation", log)); @@ -206,8 +211,8 @@ public class DistributedApplicationStore extends ApplicationArchive // version, update the stored application with the new version. ApplicationDescription appDesc = getApplicationDescription(appHolder.app.id().name()); if (!appDesc.version().equals(appHolder.app().version())) { - Application newApplication = DefaultApplication.builder(appHolder.app()) - .withVersion(appDesc.version()) + Application newApplication = DefaultApplication.builder(appDesc) + .withAppId(appHolder.app.id()) .build(); return new InternalApplicationHolder( newApplication, appHolder.state, appHolder.permissions); @@ -215,6 +220,21 @@ public class DistributedApplicationStore extends ApplicationArchive return appHolder; } + /** + * Converts the versions of stored applications propagated from the prior version to the local application versions. + */ + private Application convertApplication(Application app, Version version) { + // Load the application description from disk. If the version doesn't match the persisted + // version, update the stored application with the new version. + ApplicationDescription appDesc = getApplicationDescription(app.id().name()); + if (!appDesc.version().equals(app.version())) { + return DefaultApplication.builder(appDesc) + .withAppId(app.id()) + .build(); + } + return app; + } + /** * Activates applications that should be activated according to the distributed store. */ diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java index f2965d9316..1f6310f708 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java @@ -29,7 +29,6 @@ import org.onosproject.cluster.ControllerNode; import org.onosproject.cluster.DefaultPartition; import org.onosproject.cluster.NodeId; import org.onosproject.cluster.PartitionId; -import org.onosproject.core.VersionService; import org.onosproject.persistence.PersistenceService; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.primitives.DistributedPrimitiveCreator; @@ -52,6 +51,7 @@ import org.onosproject.store.service.EventuallyConsistentMapBuilder; import org.onosproject.store.service.LeaderElectorBuilder; import org.onosproject.store.service.Serializer; import org.onosproject.store.service.Topic; +import org.onosproject.store.service.TopicBuilder; import org.onosproject.store.service.TransactionContextBuilder; import org.onosproject.store.service.WorkQueue; import org.slf4j.Logger; @@ -81,9 +81,6 @@ public class CoordinationManager implements CoordinationService { @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected PersistenceService persistenceService; - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected VersionService versionService; - private StoragePartition partition; private DistributedPrimitiveCreator primitiveCreator; @@ -139,7 +136,7 @@ public class CoordinationManager implements CoordinationService { @Override public ConsistentMapBuilder consistentMapBuilder() { checkPermission(STORAGE_WRITE); - return new DefaultConsistentMapBuilder<>(primitiveCreator, versionService.version()); + return new DefaultConsistentMapBuilder<>(primitiveCreator); } @Override @@ -204,6 +201,12 @@ public class CoordinationManager implements CoordinationService { return new DefaultLeaderElectorBuilder(primitiveCreator); } + @Override + public TopicBuilder topicBuilder() { + checkPermission(STORAGE_WRITE); + return new DefaultDistributedTopicBuilder<>(atomicValueBuilder()); + } + @Override public WorkQueue getWorkQueue(String name, Serializer serializer) { checkPermission(STORAGE_WRITE); diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java index fad39936dd..9058189e4e 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java @@ -17,9 +17,11 @@ package org.onosproject.store.primitives.impl; import java.util.function.Supplier; +import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.service.AsyncAtomicValue; import org.onosproject.store.service.AtomicValueBuilder; import org.onosproject.store.service.ConsistentMapBuilder; +import org.onosproject.store.service.Serializer; import static com.google.common.base.Preconditions.checkNotNull; @@ -38,8 +40,24 @@ public class DefaultAtomicValueBuilder extends AtomicValueBuilder { @Override public AsyncAtomicValue build() { - return new DefaultAsyncAtomicValue<>(checkNotNull(name()), - checkNotNull(serializer()), - mapBuilder.buildAsyncMap()); + if (compatibilityFunction != null) { + Serializer serializer = Serializer.using(KryoNamespaces.API, CompatibleValue.class); + + AsyncAtomicValue> rawValue = new DefaultAsyncAtomicValue<>( + checkNotNull(name()), serializer, mapBuilder.buildAsyncMap()); + + AsyncAtomicValue> compatibleValue = + DistributedPrimitives.newTranscodingAtomicValue( + rawValue, + value -> value == null ? null : + new CompatibleValue(serializer().encode(value.value()), value.version()), + value -> value == null ? null : + new CompatibleValue(serializer().decode(value.value()), value.version())); + return DistributedPrimitives.newCompatibleAtomicValue(compatibleValue, compatibilityFunction, version()); + } + return new DefaultAsyncAtomicValue<>( + checkNotNull(name()), + checkNotNull(serializer()), + mapBuilder.buildAsyncMap()); } } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java index 6d3d4dc5eb..d6b94418f1 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java @@ -15,7 +15,6 @@ */ package org.onosproject.store.primitives.impl; -import org.onosproject.core.Version; import org.onosproject.store.primitives.DistributedPrimitiveCreator; import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.service.AsyncConsistentMap; @@ -32,11 +31,9 @@ import org.onosproject.store.service.Serializer; public class DefaultConsistentMapBuilder extends ConsistentMapBuilder { private final DistributedPrimitiveCreator primitiveCreator; - private final Version version; - public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator, Version version) { + public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator) { this.primitiveCreator = primitiveCreator; - this.version = version; } @Override diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopicBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopicBuilder.java new file mode 100644 index 0000000000..21ca7a1574 --- /dev/null +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedTopicBuilder.java @@ -0,0 +1,71 @@ +/* + * Copyright 2018-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.primitives.impl; + +import java.util.function.BiFunction; + +import org.onosproject.core.Version; +import org.onosproject.store.service.AtomicValueBuilder; +import org.onosproject.store.service.RevisionType; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.Topic; +import org.onosproject.store.service.TopicBuilder; + +/** + * Default topic builder. + */ +public class DefaultDistributedTopicBuilder extends TopicBuilder { + private final AtomicValueBuilder valueBuilder; + + public DefaultDistributedTopicBuilder(AtomicValueBuilder valueBuilder) { + this.valueBuilder = valueBuilder; + } + + @Override + public TopicBuilder withName(String name) { + valueBuilder.withName(name); + return this; + } + + @Override + public TopicBuilder withSerializer(Serializer serializer) { + valueBuilder.withSerializer(serializer); + return this; + } + + @Override + public TopicBuilder withVersion(Version version) { + valueBuilder.withVersion(version); + return this; + } + + @Override + public TopicBuilder withRevisionType(RevisionType revisionType) { + valueBuilder.withRevisionType(revisionType); + return this; + } + + @Override + public TopicBuilder withCompatibilityFunction(BiFunction compatibilityFunction) { + valueBuilder.withCompatibilityFunction(compatibilityFunction); + return this; + } + + @Override + public Topic build() { + return new DefaultDistributedTopic<>(valueBuilder.build()); + } +} diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java index 78c41e87c1..9b209ba543 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java @@ -15,17 +15,18 @@ */ package org.onosproject.store.primitives.impl; +import java.util.function.BiFunction; +import java.util.function.Function; + import org.onosproject.core.Version; import org.onosproject.store.service.AsyncAtomicCounterMap; +import org.onosproject.store.service.AsyncAtomicValue; import org.onosproject.store.service.AsyncConsistentMap; import org.onosproject.store.service.AsyncConsistentMultimap; import org.onosproject.store.service.AsyncConsistentTreeMap; import org.onosproject.store.service.AsyncDistributedSet; import org.onosproject.store.service.AsyncDocumentTree; -import java.util.function.BiFunction; -import java.util.function.Function; - /** * Misc utilities for working with {@code DistributedPrimitive}s. */ @@ -116,6 +117,45 @@ public final class DistributedPrimitives { return new TranscodingAsyncConsistentMap<>(map, k -> k, k -> k, encoder, decoder); } + /** + * Creates an instance of {@code AsyncAtomicValue} that transforms value types. + * + * @param value backing value + * @param valueEncoder transformer for value type of returned value to value type of input value + * @param valueDecoder transformer for value type of input value to value type of returned value + * @param returned value type + * @param input value type + * @return new counter map + */ + public static AsyncAtomicValue newTranscodingAtomicValue(AsyncAtomicValue value, + Function valueEncoder, + Function valueDecoder) { + return new TranscodingAsyncAtomicValue<>(value, valueEncoder, valueDecoder); + } + + /** + * Creates an instance of {@code AsyncAtomicValue} that converts values from other versions. + * + * @param atomicValue backing value + * @param compatibilityFunction the compatibility function + * @param version local node version + * @param value type + * @return compatible map + */ + public static AsyncAtomicValue newCompatibleAtomicValue( + AsyncAtomicValue> atomicValue, + BiFunction compatibilityFunction, + Version version) { + Function> encoder = value -> new CompatibleValue<>(value, version); + Function, V> decoder = value -> { + if (!value.version().equals(version)) { + return compatibilityFunction.apply(value.value(), value.version()); + } + return value.value(); + }; + return new TranscodingAsyncAtomicValue<>(atomicValue, encoder, decoder); + } + /** * Creates an instance of {@code AsyncAtomicCounterMap} that transforms key types. * @@ -127,8 +167,8 @@ public final class DistributedPrimitives { * @return new counter map */ public static AsyncAtomicCounterMap newTranscodingAtomicCounterMap(AsyncAtomicCounterMap map, - Function keyEncoder, - Function keyDecoder) { + Function keyEncoder, + Function keyDecoder) { return new TranscodingAsyncAtomicCounterMap<>(map, keyEncoder, keyDecoder); } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java index a649ecf455..719a96ed4d 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java @@ -35,7 +35,6 @@ import org.onosproject.cluster.Member; import org.onosproject.cluster.MembershipService; import org.onosproject.cluster.NodeId; import org.onosproject.cluster.PartitionId; -import org.onosproject.core.VersionService; import org.onosproject.persistence.PersistenceService; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.primitives.DistributedPrimitiveCreator; @@ -66,6 +65,7 @@ import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageAdminService; import org.onosproject.store.service.StorageService; import org.onosproject.store.service.Topic; +import org.onosproject.store.service.TopicBuilder; import org.onosproject.store.service.TransactionContextBuilder; import org.onosproject.store.service.WorkQueue; import org.onosproject.store.service.WorkQueueStats; @@ -104,9 +104,6 @@ public class StorageManager implements StorageService, StorageAdminService { @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected MembershipService membershipService; - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected VersionService versionService; - private final Supplier transactionIdGenerator = () -> TransactionId.from(UUID.randomUUID().toString()); private DistributedPrimitiveCreator federatedPrimitiveCreator; @@ -177,7 +174,7 @@ public class StorageManager implements StorageService, StorageAdminService { @Override public ConsistentMapBuilder consistentMapBuilder() { checkPermission(STORAGE_WRITE); - return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator, versionService.version()); + return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator); } @Override @@ -251,6 +248,12 @@ public class StorageManager implements StorageService, StorageAdminService { return new DefaultLeaderElectorBuilder(federatedPrimitiveCreator); } + @Override + public TopicBuilder topicBuilder() { + checkPermission(STORAGE_WRITE); + return new DefaultDistributedTopicBuilder<>(atomicValueBuilder()); + } + @Override public WorkQueue getWorkQueue(String name, Serializer serializer) { checkPermission(STORAGE_WRITE); diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncAtomicValue.java new file mode 100644 index 0000000000..24dd35da66 --- /dev/null +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncAtomicValue.java @@ -0,0 +1,122 @@ +/* + * Copyright 2018-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.primitives.impl; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import com.google.common.collect.Maps; +import org.onosproject.store.service.AsyncAtomicValue; +import org.onosproject.store.service.AtomicValueEvent; +import org.onosproject.store.service.AtomicValueEventListener; + +/** + * An {@code AsyncAtomicValue} that transcodes values. + */ +public class TranscodingAsyncAtomicValue implements AsyncAtomicValue { + private final AsyncAtomicValue backingValue; + private final Function valueEncoder; + private final Function valueDecoder; + private final Map, InternalValueEventListener> listeners = Maps.newIdentityHashMap(); + + public TranscodingAsyncAtomicValue( + AsyncAtomicValue backingValue, Function valueEncoder, Function valueDecoder) { + this.backingValue = backingValue; + this.valueEncoder = k -> k == null ? null : valueEncoder.apply(k); + this.valueDecoder = k -> k == null ? null : valueDecoder.apply(k); + } + + @Override + public String name() { + return backingValue.name(); + } + + @Override + public CompletableFuture compareAndSet(V1 expect, V1 update) { + return backingValue.compareAndSet(valueEncoder.apply(expect), valueEncoder.apply(update)); + } + + @Override + public CompletableFuture get() { + return backingValue.get().thenApply(valueDecoder); + } + + @Override + public CompletableFuture getAndSet(V1 value) { + return backingValue.getAndSet(valueEncoder.apply(value)).thenApply(valueDecoder); + } + + @Override + public CompletableFuture set(V1 value) { + return backingValue.set(valueEncoder.apply(value)); + } + + @Override + public CompletableFuture addListener(AtomicValueEventListener listener) { + synchronized (listeners) { + InternalValueEventListener backingMapListener = + listeners.computeIfAbsent(listener, k -> new InternalValueEventListener(listener)); + return backingValue.addListener(backingMapListener); + } + } + + @Override + public CompletableFuture removeListener(AtomicValueEventListener listener) { + synchronized (listeners) { + InternalValueEventListener backingMapListener = listeners.remove(listener); + if (backingMapListener != null) { + return backingValue.removeListener(backingMapListener); + } else { + return CompletableFuture.completedFuture(null); + } + } + } + + @Override + public void addStatusChangeListener(Consumer listener) { + backingValue.addStatusChangeListener(listener); + } + + @Override + public void removeStatusChangeListener(Consumer listener) { + backingValue.removeStatusChangeListener(listener); + } + + @Override + public Collection> statusChangeListeners() { + return backingValue.statusChangeListeners(); + } + + private class InternalValueEventListener implements AtomicValueEventListener { + private final AtomicValueEventListener listener; + + InternalValueEventListener(AtomicValueEventListener listener) { + this.listener = listener; + } + + @Override + public void event(AtomicValueEvent event) { + listener.event(new AtomicValueEvent<>( + event.name(), + event.newValue() != null ? valueDecoder.apply(event.newValue()) : null, + event.oldValue() != null ? valueDecoder.apply(event.oldValue()) : null)); + } + } + +} diff --git a/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java b/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java index 4ba8a2cf5a..fed4cabc2e 100644 --- a/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java +++ b/protocols/pcep/server/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java @@ -33,6 +33,7 @@ import org.onosproject.store.service.LeaderElectorBuilder; import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; import org.onosproject.store.service.Topic; +import org.onosproject.store.service.TopicBuilder; import org.onosproject.store.service.TransactionContextBuilder; import org.onosproject.store.service.WorkQueue; @@ -90,6 +91,11 @@ public class StorageServiceAdapter implements StorageService { return null; } + @Override + public TopicBuilder topicBuilder() { + return null; + } + @Override public WorkQueue getWorkQueue(String name, Serializer serializer) { return null;