Add compatibility functions to AtomicValue/Topic

Change-Id: I4a597cfa3effe0a62714ab12440a2fc41ac58aa9
This commit is contained in:
Jordan Halterman 2018-04-05 23:07:47 -07:00 committed by Thomas Vachuska
parent ca7660a289
commit 400bbe5782
16 changed files with 426 additions and 26 deletions

View File

@ -15,6 +15,8 @@
*/
package org.onosproject.store.service;
import java.util.function.BiFunction;
import org.onosproject.store.primitives.DistributedPrimitiveOptions;
/**
@ -23,7 +25,22 @@ import org.onosproject.store.primitives.DistributedPrimitiveOptions;
* @param <V> atomic value type
*/
public abstract class AtomicValueOptions<O extends AtomicValueOptions<O, V>, V> extends DistributedPrimitiveOptions<O> {
protected BiFunction<V, org.onosproject.core.Version, V> compatibilityFunction;
public AtomicValueOptions() {
super(DistributedPrimitive.Type.VALUE);
}
/**
* Sets a compatibility function on the map.
*
* @param compatibilityFunction the compatibility function
* @return the consistent map builder
*/
@SuppressWarnings("unchecked")
public O withCompatibilityFunction(
BiFunction<V, org.onosproject.core.Version, V> compatibilityFunction) {
this.compatibilityFunction = compatibilityFunction;
return (O) this;
}
}

View File

@ -113,6 +113,14 @@ public interface PrimitiveService {
*/
LeaderElectorBuilder leaderElectorBuilder();
/**
* Creates a new TopicBuilder.
*
* @param <T> topic value type
* @return topic builder
*/
<T> TopicBuilder<T> topicBuilder();
/**
* Creates a new transaction context builder.
*

View File

@ -120,6 +120,14 @@ public interface StorageService extends PrimitiveService {
*/
LeaderElectorBuilder leaderElectorBuilder();
/**
* Creates a new TopicBuilder.
*
* @param <T> topic value type
* @return topic builder
*/
<T> TopicBuilder<T> topicBuilder();
/**
* Creates a new transaction context builder.
*

View File

@ -0,0 +1,28 @@
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
/**
* Builder for {@link Topic} instances.
*
* @param <T> type for topic value
*/
public abstract class TopicBuilder<T>
extends TopicOptions<TopicBuilder<T>, T>
implements DistributedPrimitiveBuilder<Topic<T>> {
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import java.util.function.BiFunction;
import org.onosproject.store.primitives.DistributedPrimitiveOptions;
/**
* Builder for {@link Topic} instances.
*
* @param <T> type for topic value
*/
public abstract class TopicOptions<O extends TopicOptions<O, T>, T>
extends DistributedPrimitiveOptions<O> {
protected BiFunction<T, org.onosproject.core.Version, T> compatibilityFunction;
public TopicOptions() {
super(DistributedPrimitive.Type.TOPIC);
}
/**
* Sets a compatibility function on the map.
*
* @param compatibilityFunction the compatibility function
* @return the consistent map builder
*/
@SuppressWarnings("unchecked")
public O withCompatibilityFunction(
BiFunction<T, org.onosproject.core.Version, T> compatibilityFunction) {
this.compatibilityFunction = compatibilityFunction;
return (O) this;
}
}

View File

@ -74,6 +74,11 @@ public class CoordinationServiceAdapter implements CoordinationService {
return null;
}
@Override
public <T> TopicBuilder<T> topicBuilder() {
return null;
}
@Override
public TransactionContextBuilder transactionContextBuilder() {
return null;

View File

@ -69,6 +69,11 @@ public class StorageServiceAdapter implements StorageService {
return null;
}
@Override
public <T> TopicBuilder<T> topicBuilder() {
return null;
}
@Override
public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
return null;

View File

@ -177,8 +177,13 @@ public class DistributedApplicationStore extends ApplicationArchive
.withCompatibilityFunction(this::convertApplication)
.build();
appActivationTopic = storageService.getTopic("onos-apps-activation-topic",
Serializer.using(KryoNamespaces.API));
appActivationTopic = storageService.<Application>topicBuilder()
.withName("onos-apps-activation-topic")
.withSerializer(Serializer.using(KryoNamespaces.API))
.withVersion(versionService.version())
.withRevisionType(RevisionType.PROPAGATE)
.withCompatibilityFunction(this::convertApplication)
.build();
activationExecutor = newSingleThreadExecutor(groupedThreads("onos/store/app",
"app-activation", log));
@ -206,8 +211,8 @@ public class DistributedApplicationStore extends ApplicationArchive
// version, update the stored application with the new version.
ApplicationDescription appDesc = getApplicationDescription(appHolder.app.id().name());
if (!appDesc.version().equals(appHolder.app().version())) {
Application newApplication = DefaultApplication.builder(appHolder.app())
.withVersion(appDesc.version())
Application newApplication = DefaultApplication.builder(appDesc)
.withAppId(appHolder.app.id())
.build();
return new InternalApplicationHolder(
newApplication, appHolder.state, appHolder.permissions);
@ -215,6 +220,21 @@ public class DistributedApplicationStore extends ApplicationArchive
return appHolder;
}
/**
* Converts the versions of stored applications propagated from the prior version to the local application versions.
*/
private Application convertApplication(Application app, Version version) {
// Load the application description from disk. If the version doesn't match the persisted
// version, update the stored application with the new version.
ApplicationDescription appDesc = getApplicationDescription(app.id().name());
if (!appDesc.version().equals(app.version())) {
return DefaultApplication.builder(appDesc)
.withAppId(app.id())
.build();
}
return app;
}
/**
* Activates applications that should be activated according to the distributed store.
*/

View File

@ -29,7 +29,6 @@ import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultPartition;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.PartitionId;
import org.onosproject.core.VersionService;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@ -52,6 +51,7 @@ import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.LeaderElectorBuilder;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Topic;
import org.onosproject.store.service.TopicBuilder;
import org.onosproject.store.service.TransactionContextBuilder;
import org.onosproject.store.service.WorkQueue;
import org.slf4j.Logger;
@ -81,9 +81,6 @@ public class CoordinationManager implements CoordinationService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PersistenceService persistenceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected VersionService versionService;
private StoragePartition partition;
private DistributedPrimitiveCreator primitiveCreator;
@ -139,7 +136,7 @@ public class CoordinationManager implements CoordinationService {
@Override
public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
checkPermission(STORAGE_WRITE);
return new DefaultConsistentMapBuilder<>(primitiveCreator, versionService.version());
return new DefaultConsistentMapBuilder<>(primitiveCreator);
}
@Override
@ -204,6 +201,12 @@ public class CoordinationManager implements CoordinationService {
return new DefaultLeaderElectorBuilder(primitiveCreator);
}
@Override
public <T> TopicBuilder<T> topicBuilder() {
checkPermission(STORAGE_WRITE);
return new DefaultDistributedTopicBuilder<>(atomicValueBuilder());
}
@Override
public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
checkPermission(STORAGE_WRITE);

View File

@ -17,9 +17,11 @@ package org.onosproject.store.primitives.impl;
import java.util.function.Supplier;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.Serializer;
import static com.google.common.base.Preconditions.checkNotNull;
@ -38,8 +40,24 @@ public class DefaultAtomicValueBuilder<V> extends AtomicValueBuilder<V> {
@Override
public AsyncAtomicValue<V> build() {
return new DefaultAsyncAtomicValue<>(checkNotNull(name()),
checkNotNull(serializer()),
mapBuilder.buildAsyncMap());
if (compatibilityFunction != null) {
Serializer serializer = Serializer.using(KryoNamespaces.API, CompatibleValue.class);
AsyncAtomicValue<CompatibleValue<byte[]>> rawValue = new DefaultAsyncAtomicValue<>(
checkNotNull(name()), serializer, mapBuilder.buildAsyncMap());
AsyncAtomicValue<CompatibleValue<V>> compatibleValue =
DistributedPrimitives.newTranscodingAtomicValue(
rawValue,
value -> value == null ? null :
new CompatibleValue<byte[]>(serializer().encode(value.value()), value.version()),
value -> value == null ? null :
new CompatibleValue<V>(serializer().decode(value.value()), value.version()));
return DistributedPrimitives.newCompatibleAtomicValue(compatibleValue, compatibilityFunction, version());
}
return new DefaultAsyncAtomicValue<>(
checkNotNull(name()),
checkNotNull(serializer()),
mapBuilder.buildAsyncMap());
}
}

View File

@ -15,7 +15,6 @@
*/
package org.onosproject.store.primitives.impl;
import org.onosproject.core.Version;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
@ -32,11 +31,9 @@ import org.onosproject.store.service.Serializer;
public class DefaultConsistentMapBuilder<K, V> extends ConsistentMapBuilder<K, V> {
private final DistributedPrimitiveCreator primitiveCreator;
private final Version version;
public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator, Version version) {
public DefaultConsistentMapBuilder(DistributedPrimitiveCreator primitiveCreator) {
this.primitiveCreator = primitiveCreator;
this.version = version;
}
@Override

View File

@ -0,0 +1,71 @@
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.util.function.BiFunction;
import org.onosproject.core.Version;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.RevisionType;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Topic;
import org.onosproject.store.service.TopicBuilder;
/**
* Default topic builder.
*/
public class DefaultDistributedTopicBuilder<T> extends TopicBuilder<T> {
private final AtomicValueBuilder<T> valueBuilder;
public DefaultDistributedTopicBuilder(AtomicValueBuilder<T> valueBuilder) {
this.valueBuilder = valueBuilder;
}
@Override
public TopicBuilder<T> withName(String name) {
valueBuilder.withName(name);
return this;
}
@Override
public TopicBuilder<T> withSerializer(Serializer serializer) {
valueBuilder.withSerializer(serializer);
return this;
}
@Override
public TopicBuilder<T> withVersion(Version version) {
valueBuilder.withVersion(version);
return this;
}
@Override
public TopicBuilder<T> withRevisionType(RevisionType revisionType) {
valueBuilder.withRevisionType(revisionType);
return this;
}
@Override
public TopicBuilder<T> withCompatibilityFunction(BiFunction<T, Version, T> compatibilityFunction) {
valueBuilder.withCompatibilityFunction(compatibilityFunction);
return this;
}
@Override
public Topic<T> build() {
return new DefaultDistributedTopic<>(valueBuilder.build());
}
}

View File

@ -15,17 +15,18 @@
*/
package org.onosproject.store.primitives.impl;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.onosproject.core.Version;
import org.onosproject.store.service.AsyncAtomicCounterMap;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncDocumentTree;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* Misc utilities for working with {@code DistributedPrimitive}s.
*/
@ -116,6 +117,45 @@ public final class DistributedPrimitives {
return new TranscodingAsyncConsistentMap<>(map, k -> k, k -> k, encoder, decoder);
}
/**
* Creates an instance of {@code AsyncAtomicValue} that transforms value types.
*
* @param value backing value
* @param valueEncoder transformer for value type of returned value to value type of input value
* @param valueDecoder transformer for value type of input value to value type of returned value
* @param <V1> returned value type
* @param <V2> input value type
* @return new counter map
*/
public static <V1, V2> AsyncAtomicValue<V1> newTranscodingAtomicValue(AsyncAtomicValue<V2> value,
Function<V1, V2> valueEncoder,
Function<V2, V1> valueDecoder) {
return new TranscodingAsyncAtomicValue<>(value, valueEncoder, valueDecoder);
}
/**
* Creates an instance of {@code AsyncAtomicValue} that converts values from other versions.
*
* @param atomicValue backing value
* @param compatibilityFunction the compatibility function
* @param version local node version
* @param <V> value type
* @return compatible map
*/
public static <V> AsyncAtomicValue<V> newCompatibleAtomicValue(
AsyncAtomicValue<CompatibleValue<V>> atomicValue,
BiFunction<V, Version, V> compatibilityFunction,
Version version) {
Function<V, CompatibleValue<V>> encoder = value -> new CompatibleValue<>(value, version);
Function<CompatibleValue<V>, V> decoder = value -> {
if (!value.version().equals(version)) {
return compatibilityFunction.apply(value.value(), value.version());
}
return value.value();
};
return new TranscodingAsyncAtomicValue<>(atomicValue, encoder, decoder);
}
/**
* Creates an instance of {@code AsyncAtomicCounterMap} that transforms key types.
*
@ -127,8 +167,8 @@ public final class DistributedPrimitives {
* @return new counter map
*/
public static <K1, K2> AsyncAtomicCounterMap<K1> newTranscodingAtomicCounterMap(AsyncAtomicCounterMap<K2> map,
Function<K1, K2> keyEncoder,
Function<K2, K1> keyDecoder) {
Function<K1, K2> keyEncoder,
Function<K2, K1> keyDecoder) {
return new TranscodingAsyncAtomicCounterMap<>(map, keyEncoder, keyDecoder);
}

View File

@ -35,7 +35,6 @@ import org.onosproject.cluster.Member;
import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.PartitionId;
import org.onosproject.core.VersionService;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@ -66,6 +65,7 @@ import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Topic;
import org.onosproject.store.service.TopicBuilder;
import org.onosproject.store.service.TransactionContextBuilder;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.WorkQueueStats;
@ -104,9 +104,6 @@ public class StorageManager implements StorageService, StorageAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MembershipService membershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected VersionService versionService;
private final Supplier<TransactionId> transactionIdGenerator =
() -> TransactionId.from(UUID.randomUUID().toString());
private DistributedPrimitiveCreator federatedPrimitiveCreator;
@ -177,7 +174,7 @@ public class StorageManager implements StorageService, StorageAdminService {
@Override
public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
checkPermission(STORAGE_WRITE);
return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator, versionService.version());
return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator);
}
@Override
@ -251,6 +248,12 @@ public class StorageManager implements StorageService, StorageAdminService {
return new DefaultLeaderElectorBuilder(federatedPrimitiveCreator);
}
@Override
public <T> TopicBuilder<T> topicBuilder() {
checkPermission(STORAGE_WRITE);
return new DefaultDistributedTopicBuilder<>(atomicValueBuilder());
}
@Override
public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
checkPermission(STORAGE_WRITE);

View File

@ -0,0 +1,122 @@
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import com.google.common.collect.Maps;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AtomicValueEvent;
import org.onosproject.store.service.AtomicValueEventListener;
/**
* An {@code AsyncAtomicValue} that transcodes values.
*/
public class TranscodingAsyncAtomicValue<V1, V2> implements AsyncAtomicValue<V1> {
private final AsyncAtomicValue<V2> backingValue;
private final Function<V1, V2> valueEncoder;
private final Function<V2, V1> valueDecoder;
private final Map<AtomicValueEventListener<V1>, InternalValueEventListener> listeners = Maps.newIdentityHashMap();
public TranscodingAsyncAtomicValue(
AsyncAtomicValue<V2> backingValue, Function<V1, V2> valueEncoder, Function<V2, V1> valueDecoder) {
this.backingValue = backingValue;
this.valueEncoder = k -> k == null ? null : valueEncoder.apply(k);
this.valueDecoder = k -> k == null ? null : valueDecoder.apply(k);
}
@Override
public String name() {
return backingValue.name();
}
@Override
public CompletableFuture<Boolean> compareAndSet(V1 expect, V1 update) {
return backingValue.compareAndSet(valueEncoder.apply(expect), valueEncoder.apply(update));
}
@Override
public CompletableFuture<V1> get() {
return backingValue.get().thenApply(valueDecoder);
}
@Override
public CompletableFuture<V1> getAndSet(V1 value) {
return backingValue.getAndSet(valueEncoder.apply(value)).thenApply(valueDecoder);
}
@Override
public CompletableFuture<Void> set(V1 value) {
return backingValue.set(valueEncoder.apply(value));
}
@Override
public CompletableFuture<Void> addListener(AtomicValueEventListener<V1> listener) {
synchronized (listeners) {
InternalValueEventListener backingMapListener =
listeners.computeIfAbsent(listener, k -> new InternalValueEventListener(listener));
return backingValue.addListener(backingMapListener);
}
}
@Override
public CompletableFuture<Void> removeListener(AtomicValueEventListener<V1> listener) {
synchronized (listeners) {
InternalValueEventListener backingMapListener = listeners.remove(listener);
if (backingMapListener != null) {
return backingValue.removeListener(backingMapListener);
} else {
return CompletableFuture.completedFuture(null);
}
}
}
@Override
public void addStatusChangeListener(Consumer<Status> listener) {
backingValue.addStatusChangeListener(listener);
}
@Override
public void removeStatusChangeListener(Consumer<Status> listener) {
backingValue.removeStatusChangeListener(listener);
}
@Override
public Collection<Consumer<Status>> statusChangeListeners() {
return backingValue.statusChangeListeners();
}
private class InternalValueEventListener implements AtomicValueEventListener<V2> {
private final AtomicValueEventListener<V1> listener;
InternalValueEventListener(AtomicValueEventListener<V1> listener) {
this.listener = listener;
}
@Override
public void event(AtomicValueEvent<V2> event) {
listener.event(new AtomicValueEvent<>(
event.name(),
event.newValue() != null ? valueDecoder.apply(event.newValue()) : null,
event.oldValue() != null ? valueDecoder.apply(event.oldValue()) : null));
}
}
}

View File

@ -33,6 +33,7 @@ import org.onosproject.store.service.LeaderElectorBuilder;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Topic;
import org.onosproject.store.service.TopicBuilder;
import org.onosproject.store.service.TransactionContextBuilder;
import org.onosproject.store.service.WorkQueue;
@ -90,6 +91,11 @@ public class StorageServiceAdapter implements StorageService {
return null;
}
@Override
public <T> TopicBuilder<T> topicBuilder() {
return null;
}
@Override
public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
return null;