From 762246d50e352f14d1c652301751d75fdd3b173f Mon Sep 17 00:00:00 2001 From: Madan Jampani Date: Tue, 21 Jul 2015 15:40:59 -0700 Subject: [PATCH] Added a AtomicValue distributed primitive. Change-Id: I00ff165cbd9c6e4f2610af9877ff262527b7b048 --- .../store/service/AtomicValue.java | 69 +++++++++++ .../store/service/AtomicValueBuilder.java | 70 +++++++++++ .../store/service/AtomicValueEvent.java | 109 +++++++++++++++++ .../service/AtomicValueEventListener.java | 28 +++++ .../store/service/StorageService.java | 8 ++ .../consistent/impl/DatabaseManager.java | 6 + .../consistent/impl/DefaultAtomicValue.java | 115 ++++++++++++++++++ .../impl/DefaultAtomicValueBuilder.java | 63 ++++++++++ 8 files changed, 468 insertions(+) create mode 100644 core/api/src/main/java/org/onosproject/store/service/AtomicValue.java create mode 100644 core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java create mode 100644 core/api/src/main/java/org/onosproject/store/service/AtomicValueEvent.java create mode 100644 core/api/src/main/java/org/onosproject/store/service/AtomicValueEventListener.java create mode 100644 core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java create mode 100644 core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValueBuilder.java diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java new file mode 100644 index 0000000000..dfa0fb3c88 --- /dev/null +++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java @@ -0,0 +1,69 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.service; + +/** + * Distributed version of java.util.concurrent.atomic.AtomicReference. + * + * @param value type + */ +public interface AtomicValue { + + /** + * Atomically sets the value to the given updated value if the current value is equal to the expected value. + *

+ * IMPORTANT: Equality is based on the equality of the serialized byte[] representations. + *

+ * @param expect the expected value + * @param update the new value + * @return true if successful. false return indicates that the actual value was not equal to the expected value. + */ + boolean compareAndSet(V expect, V update); + + /** + * Gets the current value. + * @return current value + */ + V get(); + + /** + * Atomically sets to the given value and returns the old value. + * @param value the new value + * @return previous value + */ + V getAndSet(V value); + + /** + * Sets to the given value. + * @param value new value + */ + void set(V value); + + /** + * Registers the specified listener to be notified whenever the atomic value is updated. + * + * @param listener listener to notify about events + */ + void addListener(AtomicValueEventListener listener); + + /** + * Unregisters the specified listener such that it will no longer + * receive atomic value update notifications. + * + * @param listener listener to unregister + */ + void removeListener(AtomicValueEventListener listener); +} diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java new file mode 100644 index 0000000000..6527b9c69f --- /dev/null +++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java @@ -0,0 +1,70 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.service; + +/** + * Builder for constructing new AtomicValue instances. + * + * @param atomic value type + */ +public interface AtomicValueBuilder { + /** + * Sets the name for the atomic value. + *

+ * Each atomic value is identified by a unique name. + *

+ *

+ * Note: This is a mandatory parameter. + *

+ * + * @param name name of the atomic value + * @return this AtomicValueBuilder for method chaining + */ + AtomicValueBuilder withName(String name); + + /** + * Sets a serializer that can be used to serialize the value. + *

+ * Note: This is a mandatory parameter. + *

+ * + * @param serializer serializer + * @return this AtomicValueBuilder for method chaining + */ + AtomicValueBuilder withSerializer(Serializer serializer); + + /** + * Creates this atomic value on the partition that spans the entire cluster. + *

+ * When partitioning is disabled, the value state will be + * ephemeral and does not survive a full cluster restart. + *

+ *

+ * Note: By default partitions are enabled. + *

+ * @return this AtomicValueBuilder for method chaining + */ + AtomicValueBuilder withPartitionsDisabled(); + + /** + * Builds a AtomicValue based on the configuration options + * supplied to this builder. + * + * @return new AtomicValue + * @throws java.lang.RuntimeException if a mandatory parameter is missing + */ + AtomicValue build(); +} diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValueEvent.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValueEvent.java new file mode 100644 index 0000000000..226b16469e --- /dev/null +++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValueEvent.java @@ -0,0 +1,109 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.service; + +import java.util.Objects; + +import com.google.common.base.MoreObjects; + +/** + * Representation of a AtomicValue update notification. + * + * @param atomic value type + */ +public class AtomicValueEvent { + + /** + * AtomicValueEvent type. + */ + public enum Type { + + /** + * Value was updated. + */ + UPDATE, + } + + private final String name; + private final Type type; + private final V value; + + /** + * Creates a new event object. + * + * @param name AtomicValue name + * @param type the type of the event + * @param value the new value + */ + public AtomicValueEvent(String name, Type type, V value) { + this.name = name; + this.type = type; + this.value = value; + } + + /** + * Returns the AtomicValue name. + * + * @return name of atomic value + */ + public String name() { + return name; + } + + /** + * Returns the type of the event. + * + * @return the type of the event + */ + public Type type() { + return type; + } + + /** + * Returns the new updated value. + * + * @return the value + */ + public V value() { + return value; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof AtomicValueEvent)) { + return false; + } + + AtomicValueEvent that = (AtomicValueEvent) o; + return Objects.equals(this.name, that.name) && + Objects.equals(this.type, that.type) && + Objects.equals(this.value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(name, type, value); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("name", name) + .add("type", type) + .add("value", value) + .toString(); + } +} \ No newline at end of file diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValueEventListener.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValueEventListener.java new file mode 100644 index 0000000000..b29d903b41 --- /dev/null +++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValueEventListener.java @@ -0,0 +1,28 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.service; + +/** + * Listener to be notified about updates to a AtomicValue. + */ +public interface AtomicValueEventListener { + /** + * Reacts to the specified event. + * + * @param event the event + */ + void event(AtomicValueEvent event); +} diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java index 58ac3908b9..9ba8780741 100644 --- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java +++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java @@ -70,6 +70,14 @@ public interface StorageService { */ AtomicCounterBuilder atomicCounterBuilder(); + /** + * Creates a new AtomicValueBuilder. + * + * @param atomic value type + * @return atomic value builder + */ + AtomicValueBuilder atomicValueBuilder(); + /** * Creates a new transaction context builder. * diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java index 86f7103070..d04f5c7aaf 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java @@ -63,6 +63,7 @@ import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.cluster.messaging.MessageSubject; import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl; import org.onosproject.store.service.AtomicCounterBuilder; +import org.onosproject.store.service.AtomicValueBuilder; import org.onosproject.store.service.ConsistentMapBuilder; import org.onosproject.store.service.ConsistentMapException; import org.onosproject.store.service.DistributedQueueBuilder; @@ -382,6 +383,11 @@ public class DatabaseManager implements StorageService, StorageAdminService { return new DefaultAtomicCounterBuilder(inMemoryDatabase, partitionedDatabase); } + @Override + public AtomicValueBuilder atomicValueBuilder() { + return new DefaultAtomicValueBuilder<>(this); + } + @Override public List getMapInfo() { List maps = Lists.newArrayList(); diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java new file mode 100644 index 0000000000..c13a3b85d9 --- /dev/null +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java @@ -0,0 +1,115 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.consistent.impl; + +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import org.onosproject.store.service.AtomicValue; +import org.onosproject.store.service.AtomicValueEvent; +import org.onosproject.store.service.AtomicValueEventListener; +import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.MapEvent; +import org.onosproject.store.service.MapEventListener; +import org.onosproject.store.service.Serializer; +import org.onosproject.store.service.Versioned; + +/** + * Default implementation of AtomicValue. + * + * @param value type + */ +public class DefaultAtomicValue implements AtomicValue { + + private final Set> listeners = new CopyOnWriteArraySet<>(); + private final ConsistentMap valueMap; + private final String name; + private final Serializer serializer; + private final MapEventListener mapEventListener = new InternalMapEventListener(); + + public DefaultAtomicValue(ConsistentMap valueMap, + String name, + Serializer serializer) { + this.valueMap = valueMap; + this.name = name; + this.serializer = serializer; + } + + @Override + public boolean compareAndSet(V expect, V update) { + if (expect == null) { + if (update == null) { + return true; + } + return valueMap.putIfAbsent(name, serializer.encode(update)) == null; + } else { + if (update == null) { + return valueMap.remove(name, serializer.encode(expect)); + } + return valueMap.replace(name, serializer.encode(expect), serializer.encode(update)); + } + } + + @Override + public V get() { + Versioned rawValue = valueMap.get(name); + return rawValue == null ? null : serializer.decode(rawValue.value()); + } + + @Override + public V getAndSet(V value) { + Versioned previousValue = value == null ? + valueMap.remove(name) : valueMap.put(name, serializer.encode(value)); + return previousValue == null ? null : serializer.decode(previousValue.value()); + } + + @Override + public void set(V value) { + getAndSet(value); + } + + @Override + public void addListener(AtomicValueEventListener listener) { + synchronized (listeners) { + if (listeners.add(listener)) { + if (listeners.size() == 1) { + valueMap.addListener(mapEventListener); + } + } + } + } + + @Override + public void removeListener(AtomicValueEventListener listener) { + synchronized (listeners) { + if (listeners.remove(listener)) { + if (listeners.size() == 0) { + valueMap.removeListener(mapEventListener); + } + } + } + } + + private class InternalMapEventListener implements MapEventListener { + + @Override + public void event(MapEvent mapEvent) { + V newValue = mapEvent.type() == MapEvent.Type.REMOVE ? null : serializer.decode(mapEvent.value().value()); + AtomicValueEvent atomicValueEvent = new AtomicValueEvent<>(name, AtomicValueEvent.Type.UPDATE, newValue); + listeners.forEach(l -> l.event(atomicValueEvent)); + } + } +} diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValueBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValueBuilder.java new file mode 100644 index 0000000000..a267de097b --- /dev/null +++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValueBuilder.java @@ -0,0 +1,63 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onosproject.store.consistent.impl; + +import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.AtomicValue; +import org.onosproject.store.service.AtomicValueBuilder; +import org.onosproject.store.service.ConsistentMapBuilder; +import org.onosproject.store.service.Serializer; + +/** + * Default implementation of AtomicValueBuilder. + * + * @param value type + */ +public class DefaultAtomicValueBuilder implements AtomicValueBuilder { + + private Serializer serializer; + private String name; + private ConsistentMapBuilder mapBuilder; + + public DefaultAtomicValueBuilder(DatabaseManager manager) { + mapBuilder = manager.consistentMapBuilder() + .withName("onos-atomic-values") + .withSerializer(Serializer.using(KryoNamespaces.BASIC)); + } + + @Override + public AtomicValueBuilder withName(String name) { + this.name = name; + return this; + } + + @Override + public AtomicValueBuilder withSerializer(Serializer serializer) { + this.serializer = serializer; + return this; + } + + @Override + public AtomicValueBuilder withPartitionsDisabled() { + mapBuilder.withPartitionsDisabled(); + return this; + } + + @Override + public AtomicValue build() { + return new DefaultAtomicValue<>(mapBuilder.build(), name, serializer); + } +} \ No newline at end of file