Refactor DistributedPacketStore to store packet requests in a ConsistentMultimap

Change-Id: Ia4a93c47fee726009673e99609b2f8800807e675
This commit is contained in:
Jordan Halterman 2018-06-04 14:53:06 -07:00 committed by Ray Milkey
parent b2f5795c68
commit 8c57a09e4d
12 changed files with 244 additions and 62 deletions

View File

@ -85,11 +85,21 @@ public class DefaultConsistentMultimap<K, V>
return complete(asyncMultimap.put(key, value)); return complete(asyncMultimap.put(key, value));
} }
@Override
public Versioned<Collection<? extends V>> putAndGet(K key, V value) {
return complete(asyncMultimap.putAndGet(key, value));
}
@Override @Override
public boolean remove(K key, V value) { public boolean remove(K key, V value) {
return complete(asyncMultimap.remove(key, value)); return complete(asyncMultimap.remove(key, value));
} }
@Override
public Versioned<Collection<? extends V>> removeAndGet(K key, V value) {
return complete(asyncMultimap.removeAndGet(key, value));
}
@Override @Override
public boolean removeAll(K key, Collection<? extends V> values) { public boolean removeAll(K key, Collection<? extends V> values) {
return complete(asyncMultimap.removeAll(key, values)); return complete(asyncMultimap.removeAll(key, values));

View File

@ -103,6 +103,19 @@ public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive, Asy
*/ */
CompletableFuture<Boolean> put(K key, V value); CompletableFuture<Boolean> put(K key, V value);
/**
* If the key-value pair does not already exist adds either the key value
* pair or the value to the set of values associated with the key and
* returns the updated value, if the key-value pair already exists then behavior
* is implementation specific with some implementations allowing duplicates
* and others ignoring put requests for existing entries.
*
* @param key the key to add
* @param value the value to add
* @return a future to be completed with the updated values
*/
CompletableFuture<Versioned<Collection<? extends V>>> putAndGet(K key, V value);
/** /**
* Removes the key-value pair with the specified values if it exists. In * Removes the key-value pair with the specified values if it exists. In
* implementations that allow duplicates which matching entry will be * implementations that allow duplicates which matching entry will be
@ -115,6 +128,17 @@ public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive, Asy
*/ */
CompletableFuture<Boolean> remove(K key, V value); CompletableFuture<Boolean> remove(K key, V value);
/**
* Removes the key-value pair with the specified values if it exists. In
* implementations that allow duplicates which matching entry will be
* removed is undefined.
*
* @param key the key of the pair to be removed
* @param value the value of the pair to be removed
* @return a future to be completed with the updated values
*/
CompletableFuture<Versioned<Collection<? extends V>>> removeAndGet(K key, V value);
/** /**
* Removes the key-value pairs with the specified key and values if they * Removes the key-value pairs with the specified key and values if they
* exist. In implementations that allow duplicates each instance of a key * exist. In implementations that allow duplicates each instance of a key

View File

@ -90,6 +90,19 @@ public interface ConsistentMultimap<K, V> extends DistributedPrimitive, Iterable
*/ */
boolean put(K key, V value); boolean put(K key, V value);
/**
* If the key-value pair does not already exist adds either the key value
* pair or the value to the set of values associated with the key and
* returns the updated value, if the key-value pair already exists then behavior
* is implementation specific with some implementations allowing duplicates
* and others ignoring put requests for existing entries.
*
* @param key the key to add
* @param value the value to add
* @return the updated values
*/
Versioned<Collection<? extends V>> putAndGet(K key, V value);
/** /**
* Removes the key-value pair with the specified values if it exists. In * Removes the key-value pair with the specified values if it exists. In
* implementations that allow duplicates which matching entry will be * implementations that allow duplicates which matching entry will be
@ -101,6 +114,17 @@ public interface ConsistentMultimap<K, V> extends DistributedPrimitive, Iterable
*/ */
boolean remove(K key, V value); boolean remove(K key, V value);
/**
* Removes the key-value pair with the specified values if it exists. In
* implementations that allow duplicates which matching entry will be
* removed is undefined.
*
* @param key the key of the pair to be removed
* @param value the value of the pair to be removed
* @return the updated values
*/
Versioned<Collection<? extends V>> removeAndGet(K key, V value);
/** /**
* Removes the key-value pairs with the specified key and values if they * Removes the key-value pairs with the specified key and values if they
* exist. In implementations that allow duplicates each instance of a key * exist. In implementations that allow duplicates each instance of a key

View File

@ -75,11 +75,23 @@ public class TestConsistentMultimap<K, V> implements ConsistentMultimap<K, V> {
return innermap.put(key, version(value)); return innermap.put(key, version(value));
} }
@Override
public Versioned<Collection<? extends V>> putAndGet(K key, V value) {
innermap.put(key, version(value));
return (Versioned<Collection<? extends V>>) innermap.get(key);
}
@Override @Override
public boolean remove(K key, V value) { public boolean remove(K key, V value) {
return innermap.remove(key, value); return innermap.remove(key, value);
} }
@Override
public Versioned<Collection<? extends V>> removeAndGet(K key, V value) {
innermap.remove(key, value);
return (Versioned<Collection<? extends V>>) innermap.get(key);
}
@Override @Override
public boolean removeAll(K key, Collection<? extends V> values) { public boolean removeAll(K key, Collection<? extends V> values) {
return false; return false;

View File

@ -15,9 +15,7 @@
*/ */
package org.onosproject.store.packet.impl; package org.onosproject.store.packet.impl;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Deactivate;
@ -43,20 +41,20 @@ import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService; import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject; import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap; import org.onosproject.store.service.ConsistentMultimap;
import org.onosproject.store.service.Serializer; import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService; import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.osgi.service.component.ComponentContext; import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.Collection;
import java.util.Dictionary; import java.util.Dictionary;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Strings.isNullOrEmpty;
@ -205,10 +203,10 @@ public class DistributedPacketStore
private final class PacketRequestTracker { private final class PacketRequestTracker {
private ConsistentMap<RequestKey, Set<PacketRequest>> requests; private ConsistentMultimap<RequestKey, PacketRequest> requests;
private PacketRequestTracker() { private PacketRequestTracker() {
requests = storageService.<RequestKey, Set<PacketRequest>>consistentMapBuilder() requests = storageService.<RequestKey, PacketRequest>consistentMultimapBuilder()
.withName("onos-packet-requests") .withName("onos-packet-requests")
.withSerializer(Serializer.using(KryoNamespace.newBuilder() .withSerializer(Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.API) .register(KryoNamespaces.API)
@ -218,68 +216,36 @@ public class DistributedPacketStore
} }
private void add(PacketRequest request) { private void add(PacketRequest request) {
AtomicBoolean firstRequest = addInternal(request); boolean firstRequest = addInternal(request);
if (firstRequest.get() && delegate != null) { if (firstRequest && delegate != null) {
// The instance that makes the first request will push to all devices // The instance that makes the first request will push to all devices
delegate.requestPackets(request); delegate.requestPackets(request);
} }
} }
private AtomicBoolean addInternal(PacketRequest request) { private boolean addInternal(PacketRequest request) {
AtomicBoolean firstRequest = new AtomicBoolean(false); Collection<? extends PacketRequest> values =
requests.compute(key(request), (s, existingRequests) -> { Versioned.valueOrNull(requests.putAndGet(key(request), request));
// Reset to false just in case this is a retry due to return values.size() == 1;
// ConcurrentModificationException
firstRequest.set(false);
if (existingRequests == null) {
firstRequest.set(true);
return ImmutableSet.of(request);
} else if (!existingRequests.contains(request)) {
firstRequest.set(true);
return ImmutableSet.<PacketRequest>builder()
.addAll(existingRequests)
.add(request)
.build();
} else {
return existingRequests;
}
});
return firstRequest;
} }
private void remove(PacketRequest request) { private void remove(PacketRequest request) {
AtomicBoolean removedLast = removeInternal(request); boolean removedLast = removeInternal(request);
if (removedLast.get() && delegate != null) { if (removedLast && delegate != null) {
// The instance that removes the last request will remove from all devices // The instance that removes the last request will remove from all devices
delegate.cancelPackets(request); delegate.cancelPackets(request);
} }
} }
private AtomicBoolean removeInternal(PacketRequest request) { private boolean removeInternal(PacketRequest request) {
AtomicBoolean removedLast = new AtomicBoolean(false); Collection<? extends PacketRequest> values =
requests.computeIfPresent(key(request), (s, existingRequests) -> { Versioned.valueOrNull(requests.removeAndGet(key(request), request));
// Reset to false just in case this is a retry due to return values == null || values.isEmpty();
// ConcurrentModificationException
removedLast.set(false);
if (existingRequests.contains(request)) {
Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
newRequests.remove(request);
if (newRequests.size() > 0) {
return ImmutableSet.copyOf(newRequests);
} else {
removedLast.set(true);
return null;
}
} else {
return existingRequests;
}
});
return removedLast;
} }
private List<PacketRequest> requests() { private List<PacketRequest> requests() {
List<PacketRequest> list = Lists.newArrayList(); List<PacketRequest> list = Lists.newArrayList();
requests.values().forEach(v -> list.addAll(v.value())); requests.values().forEach(v -> list.add(v));
list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue()); list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
return list; return list;
} }

View File

@ -77,11 +77,21 @@ public class DelegatingAsyncConsistentMultimap<K, V>
return delegateMap.put(key, value); return delegateMap.put(key, value);
} }
@Override
public CompletableFuture<Versioned<Collection<? extends V>>> putAndGet(K key, V value) {
return delegateMap.putAndGet(key, value);
}
@Override @Override
public CompletableFuture<Boolean> remove(K key, V value) { public CompletableFuture<Boolean> remove(K key, V value) {
return delegateMap.remove(key, value); return delegateMap.remove(key, value);
} }
@Override
public CompletableFuture<Versioned<Collection<? extends V>>> removeAndGet(K key, V value) {
return delegateMap.removeAndGet(key, value);
}
@Override @Override
public CompletableFuture<Boolean> removeAll( public CompletableFuture<Boolean> removeAll(
K key, Collection<? extends V> values) { K key, Collection<? extends V> values) {

View File

@ -110,6 +110,11 @@ public class PartitionedAsyncConsistentMultimap<K, V> implements AsyncConsistent
return getMultimap(key).put(key, value); return getMultimap(key).put(key, value);
} }
@Override
public CompletableFuture<Versioned<Collection<? extends V>>> putAndGet(K key, V value) {
return getMultimap(key).putAndGet(key, value);
}
@Override @Override
public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) { public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) {
return getMultimap(key).removeAll(key, values); return getMultimap(key).removeAll(key, values);
@ -173,6 +178,11 @@ public class PartitionedAsyncConsistentMultimap<K, V> implements AsyncConsistent
return getMultimap(key).remove(key, value); return getMultimap(key).remove(key, value);
} }
@Override
public CompletableFuture<Versioned<Collection<? extends V>>> removeAndGet(K key, V value) {
return getMultimap(key).removeAndGet(key, value);
}
@Override @Override
public CompletableFuture<AsyncIterator<Entry<K, V>>> iterator() { public CompletableFuture<AsyncIterator<Entry<K, V>>> iterator() {
return Tools.allOf(getMultimaps().stream().map(m -> m.iterator()).collect(Collectors.toList())) return Tools.allOf(getMultimaps().stream().map(m -> m.iterator()).collect(Collectors.toList()))

View File

@ -140,6 +140,16 @@ public class TranscodingAsyncConsistentMultimap<K1, V1, K2, V2>
} }
} }
@Override
public CompletableFuture<Versioned<Collection<? extends V1>>> putAndGet(K1 key, V1 value) {
try {
return backingMap.putAndGet(keyEncoder.apply(key), valueEncoder.apply(value))
.thenApply(versionedValueCollectionDecode);
} catch (Exception e) {
return Tools.exceptionalFuture(e);
}
}
@Override @Override
public CompletableFuture<Boolean> remove(K1 key, V1 value) { public CompletableFuture<Boolean> remove(K1 key, V1 value) {
try { try {
@ -150,6 +160,16 @@ public class TranscodingAsyncConsistentMultimap<K1, V1, K2, V2>
} }
} }
@Override
public CompletableFuture<Versioned<Collection<? extends V1>>> removeAndGet(K1 key, V1 value) {
try {
return backingMap.removeAndGet(keyEncoder.apply(key), valueEncoder.apply(value))
.thenApply(versionedValueCollectionDecode);
} catch (Exception e) {
return Tools.exceptionalFuture(e);
}
}
@Override @Override
public CompletableFuture<Boolean> removeAll( public CompletableFuture<Boolean> removeAll(
K1 key, Collection<? extends V1> values) { K1 key, Collection<? extends V1> values) {

View File

@ -61,9 +61,11 @@ import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSe
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll;
@ -147,6 +149,15 @@ public class AtomixConsistentSetMultimap
SERIALIZER::decode); SERIALIZER::decode);
} }
@Override
public CompletableFuture<Versioned<Collection<? extends byte[]>>> putAndGet(String key, byte[] value) {
return proxy.invoke(
PUT_AND_GET,
SERIALIZER::encode,
new Put(key, Lists.newArrayList(value), null),
SERIALIZER::decode);
}
@Override @Override
public CompletableFuture<Boolean> remove(String key, byte[] value) { public CompletableFuture<Boolean> remove(String key, byte[] value) {
return proxy.invoke(REMOVE, SERIALIZER::encode, new MultiRemove(key, return proxy.invoke(REMOVE, SERIALIZER::encode, new MultiRemove(key,
@ -154,6 +165,13 @@ public class AtomixConsistentSetMultimap
null), SERIALIZER::decode); null), SERIALIZER::decode);
} }
@Override
public CompletableFuture<Versioned<Collection<? extends byte[]>>> removeAndGet(String key, byte[] value) {
return proxy.invoke(REMOVE_AND_GET, SERIALIZER::encode, new MultiRemove(key,
Lists.newArrayList(value),
null), SERIALIZER::decode);
}
@Override @Override
public CompletableFuture<Boolean> removeAll(String key, Collection<? extends byte[]> values) { public CompletableFuture<Boolean> removeAll(String key, Collection<? extends byte[]> values) {
return proxy.invoke( return proxy.invoke(

View File

@ -45,7 +45,9 @@ public enum AtomixConsistentSetMultimapOperations implements OperationId {
VALUES(OperationType.QUERY), VALUES(OperationType.QUERY),
ENTRIES(OperationType.QUERY), ENTRIES(OperationType.QUERY),
PUT(OperationType.COMMAND), PUT(OperationType.COMMAND),
PUT_AND_GET(OperationType.COMMAND),
REMOVE(OperationType.COMMAND), REMOVE(OperationType.COMMAND),
REMOVE_AND_GET(OperationType.COMMAND),
REMOVE_ALL(OperationType.COMMAND), REMOVE_ALL(OperationType.COMMAND),
REPLACE(OperationType.COMMAND), REPLACE(OperationType.COMMAND),
CLEAR(OperationType.COMMAND), CLEAR(OperationType.COMMAND),

View File

@ -77,9 +77,11 @@ import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSe
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll; import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll;
@ -153,7 +155,9 @@ public class AtomixConsistentSetMultimapService extends AbstractRaftService {
executor.register(GET, serializer::decode, this::get, serializer::encode); executor.register(GET, serializer::decode, this::get, serializer::encode);
executor.register(REMOVE_ALL, serializer::decode, this::removeAll, serializer::encode); executor.register(REMOVE_ALL, serializer::decode, this::removeAll, serializer::encode);
executor.register(REMOVE, serializer::decode, this::multiRemove, serializer::encode); executor.register(REMOVE, serializer::decode, this::multiRemove, serializer::encode);
executor.register(REMOVE_AND_GET, serializer::decode, this::removeAndGet, serializer::encode);
executor.register(PUT, serializer::decode, this::put, serializer::encode); executor.register(PUT, serializer::decode, this::put, serializer::encode);
executor.register(PUT_AND_GET, serializer::decode, this::putAndGet, serializer::encode);
executor.register(REPLACE, serializer::decode, this::replace, serializer::encode); executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
executor.register(ADD_LISTENER, this::listen); executor.register(ADD_LISTENER, this::listen);
executor.register(REMOVE_LISTENER, this::unlisten); executor.register(REMOVE_LISTENER, this::unlisten);
@ -352,8 +356,8 @@ public class AtomixConsistentSetMultimapService extends AbstractRaftService {
} }
Versioned<Collection<? extends byte[]>> removedValues = backingMap Versioned<Collection<? extends byte[]>> removedValues = backingMap
.get(key) .get(key)
.addCommit(commit); .addCommit(commit);
if (removedValues != null) { if (removedValues != null) {
if (removedValues.value().isEmpty()) { if (removedValues.value().isEmpty()) {
@ -361,15 +365,46 @@ public class AtomixConsistentSetMultimapService extends AbstractRaftService {
} }
publish(removedValues.value().stream() publish(removedValues.value().stream()
.map(value -> new MultimapEvent<String, byte[]>( .map(value -> new MultimapEvent<String, byte[]>(
"", key, null, value)) "", key, null, value))
.collect(Collectors.toList())); .collect(Collectors.toList()));
return true; return true;
} }
return false; return false;
} }
/**
* Handles a removeAndGet commit.
*
* @param commit multiRemove commit
* @return the updated values or null if the values are empty
*/
protected Versioned<Collection<? extends byte[]>> removeAndGet(Commit<? extends MultiRemove> commit) {
String key = commit.value().key();
if (!backingMap.containsKey(key)) {
return null;
}
Versioned<Collection<? extends byte[]>> removedValues = backingMap
.get(key)
.addCommit(commit);
if (removedValues != null) {
if (removedValues.value().isEmpty()) {
backingMap.remove(key);
}
publish(removedValues.value().stream()
.map(value -> new MultimapEvent<String, byte[]>(
"", key, null, value))
.collect(Collectors.toList()));
}
return toVersioned(backingMap.get(key));
}
/** /**
* Handles a put commit, returns true if any change results from this * Handles a put commit, returns true if any change results from this
* commit. * commit.
@ -386,20 +421,49 @@ public class AtomixConsistentSetMultimapService extends AbstractRaftService {
} }
Versioned<Collection<? extends byte[]>> addedValues = backingMap Versioned<Collection<? extends byte[]>> addedValues = backingMap
.get(key) .get(key)
.addCommit(commit); .addCommit(commit);
if (addedValues != null) { if (addedValues != null) {
publish(addedValues.value().stream() publish(addedValues.value().stream()
.map(value -> new MultimapEvent<String, byte[]>( .map(value -> new MultimapEvent<String, byte[]>(
"", key, value, null)) "", key, value, null))
.collect(Collectors.toList())); .collect(Collectors.toList()));
return true; return true;
} }
return false; return false;
} }
/**
* Handles a putAndGet commit.
*
* @param commit a put commit
* @return the updated values
*/
protected Versioned<Collection<? extends byte[]>> putAndGet(Commit<? extends Put> commit) {
String key = commit.value().key();
if (commit.value().values().isEmpty()) {
return null;
}
if (!backingMap.containsKey(key)) {
backingMap.put(key, new NonTransactionalCommit());
}
Versioned<Collection<? extends byte[]>> addedValues = backingMap
.get(key)
.addCommit(commit);
if (addedValues != null) {
publish(addedValues.value().stream()
.map(value -> new MultimapEvent<String, byte[]>(
"", key, value, null))
.collect(Collectors.toList()));
}
return toVersioned(backingMap.get(key));
}
protected Versioned<Collection<? extends byte[]>> replace( protected Versioned<Collection<? extends byte[]>> replace(
Commit<? extends Replace> commit) { Commit<? extends Replace> commit) {
if (!backingMap.containsKey(commit.value().key())) { if (!backingMap.containsKey(commit.value().key())) {

View File

@ -238,6 +238,28 @@ public class AtomixConsistentSetMultimapTest extends AtomixTestBase<AtomixConsis
.thenAccept(result -> assertFalse(result)).join(); .thenAccept(result -> assertFalse(result)).join();
}); });
allKeys.forEach(key -> {
map.putAndGet(key, valueOne)
.thenAccept(result -> assertEquals(1, result.value().size()));
map.putAndGet(key, valueTwo)
.thenAccept(result -> assertEquals(2, result.value().size()));
map.putAndGet(key, valueThree)
.thenAccept(result -> assertEquals(3, result.value().size()));
map.putAndGet(key, valueFour)
.thenAccept(result -> assertEquals(4, result.value().size()));
});
allKeys.forEach(key -> {
map.removeAndGet(key, valueOne)
.thenAccept(result -> assertEquals(3, result.value().size()));
map.removeAndGet(key, valueTwo)
.thenAccept(result -> assertEquals(2, result.value().size()));
map.removeAndGet(key, valueThree)
.thenAccept(result -> assertEquals(1, result.value().size()));
map.removeAndGet(key, valueFour)
.thenAccept(result -> assertEquals(0, result.value().size()));
});
map.isEmpty().thenAccept(result -> assertTrue(result)).join(); map.isEmpty().thenAccept(result -> assertTrue(result)).join();
//Repopulate for next test //Repopulate for next test