mirror of
https://github.com/opennetworkinglab/onos.git
synced 2025-11-03 09:41:14 +01:00
Throw ConcurrentModification exception when ConsistentMap compute call conflicts with concurrent operation(s)
Change-Id: Id07868873929f4f413878961c154b62933f3a3f2
This commit is contained in:
parent
51ad7175fc
commit
70df7679dd
@ -29,6 +29,7 @@ import java.util.function.Predicate;
|
||||
|
||||
import io.atomix.protocols.raft.proxy.RaftProxy;
|
||||
import org.onlab.util.KryoNamespace;
|
||||
import org.onlab.util.Tools;
|
||||
import org.onosproject.store.primitives.MapUpdate;
|
||||
import org.onosproject.store.primitives.TransactionId;
|
||||
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsKey;
|
||||
@ -49,6 +50,7 @@ import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperat
|
||||
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback;
|
||||
import org.onosproject.store.serializers.KryoNamespaces;
|
||||
import org.onosproject.store.service.AsyncConsistentMap;
|
||||
import org.onosproject.store.service.ConsistentMapException;
|
||||
import org.onosproject.store.service.MapEvent;
|
||||
import org.onosproject.store.service.MapEventListener;
|
||||
import org.onosproject.store.service.Serializer;
|
||||
@ -296,9 +298,7 @@ public class AtomixConsistentMap extends AbstractRaftPrimitive implements AsyncC
|
||||
try {
|
||||
computedValue = remappingFunction.apply(key, existingValue);
|
||||
} catch (Exception e) {
|
||||
CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
|
||||
future.completeExceptionally(e);
|
||||
return future;
|
||||
return Tools.exceptionalFuture(e);
|
||||
}
|
||||
|
||||
if (computedValue == null && r1 == null) {
|
||||
@ -312,9 +312,17 @@ public class AtomixConsistentMap extends AbstractRaftPrimitive implements AsyncC
|
||||
new Put(key, computedValue),
|
||||
serializer()::decode)
|
||||
.whenComplete((r, e) -> throwIfLocked(r))
|
||||
.thenCompose(r -> checkLocked(r))
|
||||
.thenApply(result -> new Versioned<>(computedValue, result.version()));
|
||||
} else if (computedValue == null) {
|
||||
return remove(key, r1.version()).thenApply(v -> null);
|
||||
return proxy.<RemoveVersion, MapEntryUpdateResult<String, byte[]>>invoke(
|
||||
REMOVE_VERSION,
|
||||
serializer()::encode,
|
||||
new RemoveVersion(key, r1.version()),
|
||||
serializer()::decode)
|
||||
.whenComplete((r, e) -> throwIfLocked(r))
|
||||
.thenCompose(r -> checkLocked(r))
|
||||
.thenApply(v -> null);
|
||||
} else {
|
||||
return proxy.<ReplaceVersion, MapEntryUpdateResult<String, byte[]>>invoke(
|
||||
REPLACE_VERSION,
|
||||
@ -322,12 +330,22 @@ public class AtomixConsistentMap extends AbstractRaftPrimitive implements AsyncC
|
||||
new ReplaceVersion(key, r1.version(), computedValue),
|
||||
serializer()::decode)
|
||||
.whenComplete((r, e) -> throwIfLocked(r))
|
||||
.thenCompose(r -> checkLocked(r))
|
||||
.thenApply(result -> result.status() == MapEntryUpdateResult.Status.OK
|
||||
? new Versioned(computedValue, result.version()) : result.result());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private CompletableFuture<MapEntryUpdateResult<String, byte[]>> checkLocked(
|
||||
MapEntryUpdateResult<String, byte[]> result) {
|
||||
if (result.status() == MapEntryUpdateResult.Status.PRECONDITION_FAILED ||
|
||||
result.status() == MapEntryUpdateResult.Status.WRITE_LOCK) {
|
||||
return Tools.exceptionalFuture(new ConsistentMapException.ConcurrentModification());
|
||||
}
|
||||
return CompletableFuture.completedFuture(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
|
||||
Executor executor) {
|
||||
|
||||
@ -337,7 +337,11 @@ public class AtomixConsistentMapService extends AbstractRaftService {
|
||||
String key = commit.value().key();
|
||||
MapEntryValue oldValue = entries().get(key);
|
||||
MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
|
||||
|
||||
// If the value is null or a tombstone, this is an insert.
|
||||
// Otherwise, only update the value if it has changed to reduce the number of events.
|
||||
if (valueIsNull(oldValue)) {
|
||||
// If the key has been locked by a transaction, return a WRITE_LOCK error.
|
||||
if (preparedKeys.contains(key)) {
|
||||
return new MapEntryUpdateResult<>(
|
||||
MapEntryUpdateResult.Status.WRITE_LOCK,
|
||||
@ -351,6 +355,7 @@ public class AtomixConsistentMapService extends AbstractRaftService {
|
||||
publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result));
|
||||
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
|
||||
} else if (!valuesEqual(oldValue, newValue)) {
|
||||
// If the key has been locked by a transaction, return a WRITE_LOCK error.
|
||||
if (preparedKeys.contains(key)) {
|
||||
return new MapEntryUpdateResult<>(
|
||||
MapEntryUpdateResult.Status.WRITE_LOCK,
|
||||
@ -364,6 +369,7 @@ public class AtomixConsistentMapService extends AbstractRaftService {
|
||||
publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
|
||||
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
|
||||
}
|
||||
// If the value hasn't changed, return a NOOP result.
|
||||
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
|
||||
}
|
||||
|
||||
@ -376,7 +382,10 @@ public class AtomixConsistentMapService extends AbstractRaftService {
|
||||
protected MapEntryUpdateResult<String, byte[]> putIfAbsent(Commit<? extends Put> commit) {
|
||||
String key = commit.value().key();
|
||||
MapEntryValue oldValue = entries().get(key);
|
||||
|
||||
// If the value is null, this is an INSERT.
|
||||
if (valueIsNull(oldValue)) {
|
||||
// If the key has been locked by a transaction, return a WRITE_LOCK error.
|
||||
if (preparedKeys.contains(key)) {
|
||||
return new MapEntryUpdateResult<>(
|
||||
MapEntryUpdateResult.Status.WRITE_LOCK,
|
||||
@ -388,13 +397,16 @@ public class AtomixConsistentMapService extends AbstractRaftService {
|
||||
MapEntryValue.Type.VALUE,
|
||||
commit.index(),
|
||||
commit.value().value());
|
||||
entries().put(commit.value().key(),
|
||||
new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value()));
|
||||
entries().put(commit.value().key(), newValue);
|
||||
Versioned<byte[]> result = toVersioned(newValue);
|
||||
publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
|
||||
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null);
|
||||
}
|
||||
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue));
|
||||
return new MapEntryUpdateResult<>(
|
||||
MapEntryUpdateResult.Status.PRECONDITION_FAILED,
|
||||
commit.index(),
|
||||
key,
|
||||
toVersioned(oldValue));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -407,7 +419,11 @@ public class AtomixConsistentMapService extends AbstractRaftService {
|
||||
String key = commit.value().key();
|
||||
MapEntryValue oldValue = entries().get(key);
|
||||
MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value());
|
||||
|
||||
// If the value is null or a tombstone, this is an insert.
|
||||
// Otherwise, only update the value if it has changed to reduce the number of events.
|
||||
if (valueIsNull(oldValue)) {
|
||||
// If the key has been locked by a transaction, return a WRITE_LOCK error.
|
||||
if (preparedKeys.contains(key)) {
|
||||
return new MapEntryUpdateResult<>(
|
||||
MapEntryUpdateResult.Status.WRITE_LOCK,
|
||||
@ -420,6 +436,7 @@ public class AtomixConsistentMapService extends AbstractRaftService {
|
||||
publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null));
|
||||
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result);
|
||||
} else if (!valuesEqual(oldValue, newValue)) {
|
||||
// If the key has been locked by a transaction, return a WRITE_LOCK error.
|
||||
if (preparedKeys.contains(key)) {
|
||||
return new MapEntryUpdateResult<>(
|
||||
MapEntryUpdateResult.Status.WRITE_LOCK,
|
||||
@ -445,15 +462,26 @@ public class AtomixConsistentMapService extends AbstractRaftService {
|
||||
*/
|
||||
private MapEntryUpdateResult<String, byte[]> removeIf(long index, String key, Predicate<MapEntryValue> predicate) {
|
||||
MapEntryValue value = entries().get(key);
|
||||
if (value == null || !predicate.test(value)) {
|
||||
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
|
||||
|
||||
// If the value does not exist or doesn't match the predicate, return a PRECONDITION_FAILED error.
|
||||
if (valueIsNull(value) || !predicate.test(value)) {
|
||||
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.PRECONDITION_FAILED, index, key, null);
|
||||
}
|
||||
|
||||
// If the key has been locked by a transaction, return a WRITE_LOCK error.
|
||||
if (preparedKeys.contains(key)) {
|
||||
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null);
|
||||
}
|
||||
|
||||
// If no transactions are active, remove the key. Otherwise, replace it with a tombstone.
|
||||
if (activeTransactions.isEmpty()) {
|
||||
entries().remove(key);
|
||||
if (!activeTransactions.isEmpty()) {
|
||||
entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null));
|
||||
} else {
|
||||
entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, index, null));
|
||||
}
|
||||
|
||||
Versioned<byte[]> result = toVersioned(value);
|
||||
publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result)));
|
||||
publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result));
|
||||
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
|
||||
}
|
||||
|
||||
@ -500,15 +528,24 @@ public class AtomixConsistentMapService extends AbstractRaftService {
|
||||
private MapEntryUpdateResult<String, byte[]> replaceIf(
|
||||
long index, String key, MapEntryValue newValue, Predicate<MapEntryValue> predicate) {
|
||||
MapEntryValue oldValue = entries().get(key);
|
||||
if (oldValue == null) {
|
||||
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null);
|
||||
|
||||
// If the key is not set or the current value doesn't match the predicate, return a PRECONDITION_FAILED error.
|
||||
if (valueIsNull(oldValue) || !predicate.test(oldValue)) {
|
||||
return new MapEntryUpdateResult<>(
|
||||
MapEntryUpdateResult.Status.PRECONDITION_FAILED,
|
||||
index,
|
||||
key,
|
||||
toVersioned(oldValue));
|
||||
}
|
||||
if (!predicate.test(oldValue)) {
|
||||
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, toVersioned(oldValue));
|
||||
|
||||
// If the key has been locked by a transaction, return a WRITE_LOCK error.
|
||||
if (preparedKeys.contains(key)) {
|
||||
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null);
|
||||
}
|
||||
|
||||
entries().put(key, newValue);
|
||||
Versioned<byte[]> result = toVersioned(oldValue);
|
||||
publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result)));
|
||||
publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result));
|
||||
return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result);
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user