diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java index 600af28106..c14b187178 100644 --- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java +++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java @@ -85,11 +85,21 @@ public class DefaultConsistentMultimap return complete(asyncMultimap.put(key, value)); } + @Override + public Versioned> putAndGet(K key, V value) { + return complete(asyncMultimap.putAndGet(key, value)); + } + @Override public boolean remove(K key, V value) { return complete(asyncMultimap.remove(key, value)); } + @Override + public Versioned> removeAndGet(K key, V value) { + return complete(asyncMultimap.removeAndGet(key, value)); + } + @Override public boolean removeAll(K key, Collection values) { return complete(asyncMultimap.removeAll(key, values)); diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java index 12b4645254..94943b8d33 100644 --- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java +++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java @@ -103,6 +103,19 @@ public interface AsyncConsistentMultimap extends DistributedPrimitive, Asy */ CompletableFuture put(K key, V value); + /** + * If the key-value pair does not already exist adds either the key value + * pair or the value to the set of values associated with the key and + * returns the updated value, if the key-value pair already exists then behavior + * is implementation specific with some implementations allowing duplicates + * and others ignoring put requests for existing entries. + * + * @param key the key to add + * @param value the value to add + * @return a future to be completed with the updated values + */ + CompletableFuture>> putAndGet(K key, V value); + /** * Removes the key-value pair with the specified values if it exists. In * implementations that allow duplicates which matching entry will be @@ -115,6 +128,17 @@ public interface AsyncConsistentMultimap extends DistributedPrimitive, Asy */ CompletableFuture remove(K key, V value); + /** + * Removes the key-value pair with the specified values if it exists. In + * implementations that allow duplicates which matching entry will be + * removed is undefined. + * + * @param key the key of the pair to be removed + * @param value the value of the pair to be removed + * @return a future to be completed with the updated values + */ + CompletableFuture>> removeAndGet(K key, V value); + /** * Removes the key-value pairs with the specified key and values if they * exist. In implementations that allow duplicates each instance of a key diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java index bdbbee0ff5..99045a867c 100644 --- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java +++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java @@ -90,6 +90,19 @@ public interface ConsistentMultimap extends DistributedPrimitive, Iterable */ boolean put(K key, V value); + /** + * If the key-value pair does not already exist adds either the key value + * pair or the value to the set of values associated with the key and + * returns the updated value, if the key-value pair already exists then behavior + * is implementation specific with some implementations allowing duplicates + * and others ignoring put requests for existing entries. + * + * @param key the key to add + * @param value the value to add + * @return the updated values + */ + Versioned> putAndGet(K key, V value); + /** * Removes the key-value pair with the specified values if it exists. In * implementations that allow duplicates which matching entry will be @@ -101,6 +114,17 @@ public interface ConsistentMultimap extends DistributedPrimitive, Iterable */ boolean remove(K key, V value); + /** + * Removes the key-value pair with the specified values if it exists. In + * implementations that allow duplicates which matching entry will be + * removed is undefined. + * + * @param key the key of the pair to be removed + * @param value the value of the pair to be removed + * @return the updated values + */ + Versioned> removeAndGet(K key, V value); + /** * Removes the key-value pairs with the specified key and values if they * exist. In implementations that allow duplicates each instance of a key diff --git a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java index a97337e2a1..08be2e4b48 100644 --- a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java +++ b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java @@ -75,11 +75,23 @@ public class TestConsistentMultimap implements ConsistentMultimap { return innermap.put(key, version(value)); } + @Override + public Versioned> putAndGet(K key, V value) { + innermap.put(key, version(value)); + return (Versioned>) innermap.get(key); + } + @Override public boolean remove(K key, V value) { return innermap.remove(key, value); } + @Override + public Versioned> removeAndGet(K key, V value) { + innermap.remove(key, value); + return (Versioned>) innermap.get(key); + } + @Override public boolean removeAll(K key, Collection values) { return false; diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java index 5fdea0540f..1c02da93a9 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java @@ -15,9 +15,7 @@ */ package org.onosproject.store.packet.impl; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -43,20 +41,20 @@ import org.onosproject.store.AbstractStore; import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.cluster.messaging.MessageSubject; import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.service.ConsistentMap; +import org.onosproject.store.service.ConsistentMultimap; import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.Versioned; import org.osgi.service.component.ComponentContext; import org.slf4j.Logger; +import java.util.Collection; import java.util.Dictionary; import java.util.List; import java.util.Objects; import java.util.Properties; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; @@ -205,10 +203,10 @@ public class DistributedPacketStore private final class PacketRequestTracker { - private ConsistentMap> requests; + private ConsistentMultimap requests; private PacketRequestTracker() { - requests = storageService.>consistentMapBuilder() + requests = storageService.consistentMultimapBuilder() .withName("onos-packet-requests") .withSerializer(Serializer.using(KryoNamespace.newBuilder() .register(KryoNamespaces.API) @@ -218,68 +216,36 @@ public class DistributedPacketStore } private void add(PacketRequest request) { - AtomicBoolean firstRequest = addInternal(request); - if (firstRequest.get() && delegate != null) { + boolean firstRequest = addInternal(request); + if (firstRequest && delegate != null) { // The instance that makes the first request will push to all devices delegate.requestPackets(request); } } - private AtomicBoolean addInternal(PacketRequest request) { - AtomicBoolean firstRequest = new AtomicBoolean(false); - requests.compute(key(request), (s, existingRequests) -> { - // Reset to false just in case this is a retry due to - // ConcurrentModificationException - firstRequest.set(false); - if (existingRequests == null) { - firstRequest.set(true); - return ImmutableSet.of(request); - } else if (!existingRequests.contains(request)) { - firstRequest.set(true); - return ImmutableSet.builder() - .addAll(existingRequests) - .add(request) - .build(); - } else { - return existingRequests; - } - }); - return firstRequest; + private boolean addInternal(PacketRequest request) { + Collection values = + Versioned.valueOrNull(requests.putAndGet(key(request), request)); + return values.size() == 1; } private void remove(PacketRequest request) { - AtomicBoolean removedLast = removeInternal(request); - if (removedLast.get() && delegate != null) { + boolean removedLast = removeInternal(request); + if (removedLast && delegate != null) { // The instance that removes the last request will remove from all devices delegate.cancelPackets(request); } } - private AtomicBoolean removeInternal(PacketRequest request) { - AtomicBoolean removedLast = new AtomicBoolean(false); - requests.computeIfPresent(key(request), (s, existingRequests) -> { - // Reset to false just in case this is a retry due to - // ConcurrentModificationException - removedLast.set(false); - if (existingRequests.contains(request)) { - Set newRequests = Sets.newHashSet(existingRequests); - newRequests.remove(request); - if (newRequests.size() > 0) { - return ImmutableSet.copyOf(newRequests); - } else { - removedLast.set(true); - return null; - } - } else { - return existingRequests; - } - }); - return removedLast; + private boolean removeInternal(PacketRequest request) { + Collection values = + Versioned.valueOrNull(requests.removeAndGet(key(request), request)); + return values == null || values.isEmpty(); } private List requests() { List list = Lists.newArrayList(); - requests.values().forEach(v -> list.addAll(v.value())); + requests.values().forEach(v -> list.add(v)); list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue()); return list; } diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java index bb249e12a5..779bb7ac45 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java @@ -77,11 +77,21 @@ public class DelegatingAsyncConsistentMultimap return delegateMap.put(key, value); } + @Override + public CompletableFuture>> putAndGet(K key, V value) { + return delegateMap.putAndGet(key, value); + } + @Override public CompletableFuture remove(K key, V value) { return delegateMap.remove(key, value); } + @Override + public CompletableFuture>> removeAndGet(K key, V value) { + return delegateMap.removeAndGet(key, value); + } + @Override public CompletableFuture removeAll( K key, Collection values) { diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java index badbf05c10..a4e0bf95a1 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java @@ -110,6 +110,11 @@ public class PartitionedAsyncConsistentMultimap implements AsyncConsistent return getMultimap(key).put(key, value); } + @Override + public CompletableFuture>> putAndGet(K key, V value) { + return getMultimap(key).putAndGet(key, value); + } + @Override public CompletableFuture removeAll(K key, Collection values) { return getMultimap(key).removeAll(key, values); @@ -173,6 +178,11 @@ public class PartitionedAsyncConsistentMultimap implements AsyncConsistent return getMultimap(key).remove(key, value); } + @Override + public CompletableFuture>> removeAndGet(K key, V value) { + return getMultimap(key).removeAndGet(key, value); + } + @Override public CompletableFuture>> iterator() { return Tools.allOf(getMultimaps().stream().map(m -> m.iterator()).collect(Collectors.toList())) diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java index b00d06a8ba..6fab8c82bc 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java @@ -140,6 +140,16 @@ public class TranscodingAsyncConsistentMultimap } } + @Override + public CompletableFuture>> putAndGet(K1 key, V1 value) { + try { + return backingMap.putAndGet(keyEncoder.apply(key), valueEncoder.apply(value)) + .thenApply(versionedValueCollectionDecode); + } catch (Exception e) { + return Tools.exceptionalFuture(e); + } + } + @Override public CompletableFuture remove(K1 key, V1 value) { try { @@ -150,6 +160,16 @@ public class TranscodingAsyncConsistentMultimap } } + @Override + public CompletableFuture>> removeAndGet(K1 key, V1 value) { + try { + return backingMap.removeAndGet(keyEncoder.apply(key), valueEncoder.apply(value)) + .thenApply(versionedValueCollectionDecode); + } catch (Exception e) { + return Tools.exceptionalFuture(e); + } + } + @Override public CompletableFuture removeAll( K1 key, Collection values) { diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java index 44cc614b83..39d49fedd2 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java @@ -61,9 +61,11 @@ import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSe import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT; +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL; +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_AND_GET; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll; @@ -147,6 +149,15 @@ public class AtomixConsistentSetMultimap SERIALIZER::decode); } + @Override + public CompletableFuture>> putAndGet(String key, byte[] value) { + return proxy.invoke( + PUT_AND_GET, + SERIALIZER::encode, + new Put(key, Lists.newArrayList(value), null), + SERIALIZER::decode); + } + @Override public CompletableFuture remove(String key, byte[] value) { return proxy.invoke(REMOVE, SERIALIZER::encode, new MultiRemove(key, @@ -154,6 +165,13 @@ public class AtomixConsistentSetMultimap null), SERIALIZER::decode); } + @Override + public CompletableFuture>> removeAndGet(String key, byte[] value) { + return proxy.invoke(REMOVE_AND_GET, SERIALIZER::encode, new MultiRemove(key, + Lists.newArrayList(value), + null), SERIALIZER::decode); + } + @Override public CompletableFuture removeAll(String key, Collection values) { return proxy.invoke( diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java index 82372a2dcf..6ad93b65dc 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java @@ -45,7 +45,9 @@ public enum AtomixConsistentSetMultimapOperations implements OperationId { VALUES(OperationType.QUERY), ENTRIES(OperationType.QUERY), PUT(OperationType.COMMAND), + PUT_AND_GET(OperationType.COMMAND), REMOVE(OperationType.COMMAND), + REMOVE_AND_GET(OperationType.COMMAND), REMOVE_ALL(OperationType.COMMAND), REPLACE(OperationType.COMMAND), CLEAR(OperationType.COMMAND), diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java index 2312d8f4b7..5a93bef9b1 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java @@ -77,9 +77,11 @@ import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSe import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT; +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL; +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_AND_GET; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll; @@ -153,7 +155,9 @@ public class AtomixConsistentSetMultimapService extends AbstractRaftService { executor.register(GET, serializer::decode, this::get, serializer::encode); executor.register(REMOVE_ALL, serializer::decode, this::removeAll, serializer::encode); executor.register(REMOVE, serializer::decode, this::multiRemove, serializer::encode); + executor.register(REMOVE_AND_GET, serializer::decode, this::removeAndGet, serializer::encode); executor.register(PUT, serializer::decode, this::put, serializer::encode); + executor.register(PUT_AND_GET, serializer::decode, this::putAndGet, serializer::encode); executor.register(REPLACE, serializer::decode, this::replace, serializer::encode); executor.register(ADD_LISTENER, this::listen); executor.register(REMOVE_LISTENER, this::unlisten); @@ -352,8 +356,8 @@ public class AtomixConsistentSetMultimapService extends AbstractRaftService { } Versioned> removedValues = backingMap - .get(key) - .addCommit(commit); + .get(key) + .addCommit(commit); if (removedValues != null) { if (removedValues.value().isEmpty()) { @@ -361,15 +365,46 @@ public class AtomixConsistentSetMultimapService extends AbstractRaftService { } publish(removedValues.value().stream() - .map(value -> new MultimapEvent( - "", key, null, value)) - .collect(Collectors.toList())); + .map(value -> new MultimapEvent( + "", key, null, value)) + .collect(Collectors.toList())); return true; } return false; } + /** + * Handles a removeAndGet commit. + * + * @param commit multiRemove commit + * @return the updated values or null if the values are empty + */ + protected Versioned> removeAndGet(Commit commit) { + String key = commit.value().key(); + + if (!backingMap.containsKey(key)) { + return null; + } + + Versioned> removedValues = backingMap + .get(key) + .addCommit(commit); + + if (removedValues != null) { + if (removedValues.value().isEmpty()) { + backingMap.remove(key); + } + + publish(removedValues.value().stream() + .map(value -> new MultimapEvent( + "", key, null, value)) + .collect(Collectors.toList())); + } + + return toVersioned(backingMap.get(key)); + } + /** * Handles a put commit, returns true if any change results from this * commit. @@ -386,20 +421,49 @@ public class AtomixConsistentSetMultimapService extends AbstractRaftService { } Versioned> addedValues = backingMap - .get(key) - .addCommit(commit); + .get(key) + .addCommit(commit); if (addedValues != null) { publish(addedValues.value().stream() - .map(value -> new MultimapEvent( - "", key, value, null)) - .collect(Collectors.toList())); + .map(value -> new MultimapEvent( + "", key, value, null)) + .collect(Collectors.toList())); return true; } return false; } + /** + * Handles a putAndGet commit. + * + * @param commit a put commit + * @return the updated values + */ + protected Versioned> putAndGet(Commit commit) { + String key = commit.value().key(); + if (commit.value().values().isEmpty()) { + return null; + } + if (!backingMap.containsKey(key)) { + backingMap.put(key, new NonTransactionalCommit()); + } + + Versioned> addedValues = backingMap + .get(key) + .addCommit(commit); + + if (addedValues != null) { + publish(addedValues.value().stream() + .map(value -> new MultimapEvent( + "", key, value, null)) + .collect(Collectors.toList())); + } + + return toVersioned(backingMap.get(key)); + } + protected Versioned> replace( Commit commit) { if (!backingMap.containsKey(commit.value().key())) { diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java index 0247fca6b8..01f5d00c43 100644 --- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java +++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java @@ -238,6 +238,28 @@ public class AtomixConsistentSetMultimapTest extends AtomixTestBase assertFalse(result)).join(); }); + allKeys.forEach(key -> { + map.putAndGet(key, valueOne) + .thenAccept(result -> assertEquals(1, result.value().size())); + map.putAndGet(key, valueTwo) + .thenAccept(result -> assertEquals(2, result.value().size())); + map.putAndGet(key, valueThree) + .thenAccept(result -> assertEquals(3, result.value().size())); + map.putAndGet(key, valueFour) + .thenAccept(result -> assertEquals(4, result.value().size())); + }); + + allKeys.forEach(key -> { + map.removeAndGet(key, valueOne) + .thenAccept(result -> assertEquals(3, result.value().size())); + map.removeAndGet(key, valueTwo) + .thenAccept(result -> assertEquals(2, result.value().size())); + map.removeAndGet(key, valueThree) + .thenAccept(result -> assertEquals(1, result.value().size())); + map.removeAndGet(key, valueFour) + .thenAccept(result -> assertEquals(0, result.value().size())); + }); + map.isEmpty().thenAccept(result -> assertTrue(result)).join(); //Repopulate for next test