diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java index 5b59fb2049..ad3bbb63f5 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java @@ -29,6 +29,7 @@ import java.util.function.Predicate; import io.atomix.protocols.raft.proxy.RaftProxy; import org.onlab.util.KryoNamespace; +import org.onlab.util.Tools; import org.onosproject.store.primitives.MapUpdate; import org.onosproject.store.primitives.TransactionId; import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.ContainsKey; @@ -49,6 +50,7 @@ import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperat import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations.TransactionRollback; import org.onosproject.store.serializers.KryoNamespaces; import org.onosproject.store.service.AsyncConsistentMap; +import org.onosproject.store.service.ConsistentMapException; import org.onosproject.store.service.MapEvent; import org.onosproject.store.service.MapEventListener; import org.onosproject.store.service.Serializer; @@ -296,9 +298,7 @@ public class AtomixConsistentMap extends AbstractRaftPrimitive implements AsyncC try { computedValue = remappingFunction.apply(key, existingValue); } catch (Exception e) { - CompletableFuture> future = new CompletableFuture<>(); - future.completeExceptionally(e); - return future; + return Tools.exceptionalFuture(e); } if (computedValue == null && r1 == null) { @@ -312,9 +312,17 @@ public class AtomixConsistentMap extends AbstractRaftPrimitive implements AsyncC new Put(key, computedValue), serializer()::decode) .whenComplete((r, e) -> throwIfLocked(r)) + .thenCompose(r -> checkLocked(r)) .thenApply(result -> new Versioned<>(computedValue, result.version())); } else if (computedValue == null) { - return remove(key, r1.version()).thenApply(v -> null); + return proxy.>invoke( + REMOVE_VERSION, + serializer()::encode, + new RemoveVersion(key, r1.version()), + serializer()::decode) + .whenComplete((r, e) -> throwIfLocked(r)) + .thenCompose(r -> checkLocked(r)) + .thenApply(v -> null); } else { return proxy.>invoke( REPLACE_VERSION, @@ -322,12 +330,22 @@ public class AtomixConsistentMap extends AbstractRaftPrimitive implements AsyncC new ReplaceVersion(key, r1.version(), computedValue), serializer()::decode) .whenComplete((r, e) -> throwIfLocked(r)) + .thenCompose(r -> checkLocked(r)) .thenApply(result -> result.status() == MapEntryUpdateResult.Status.OK ? new Versioned(computedValue, result.version()) : result.result()); } }); } + private CompletableFuture> checkLocked( + MapEntryUpdateResult result) { + if (result.status() == MapEntryUpdateResult.Status.PRECONDITION_FAILED || + result.status() == MapEntryUpdateResult.Status.WRITE_LOCK) { + return Tools.exceptionalFuture(new ConsistentMapException.ConcurrentModification()); + } + return CompletableFuture.completedFuture(result); + } + @Override public synchronized CompletableFuture addListener(MapEventListener listener, Executor executor) { diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java index fe5a2a9448..9d72be5e5b 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java @@ -337,7 +337,11 @@ public class AtomixConsistentMapService extends AbstractRaftService { String key = commit.value().key(); MapEntryValue oldValue = entries().get(key); MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value()); + + // If the value is null or a tombstone, this is an insert. + // Otherwise, only update the value if it has changed to reduce the number of events. if (valueIsNull(oldValue)) { + // If the key has been locked by a transaction, return a WRITE_LOCK error. if (preparedKeys.contains(key)) { return new MapEntryUpdateResult<>( MapEntryUpdateResult.Status.WRITE_LOCK, @@ -351,6 +355,7 @@ public class AtomixConsistentMapService extends AbstractRaftService { publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, toVersioned(newValue), result)); return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result); } else if (!valuesEqual(oldValue, newValue)) { + // If the key has been locked by a transaction, return a WRITE_LOCK error. if (preparedKeys.contains(key)) { return new MapEntryUpdateResult<>( MapEntryUpdateResult.Status.WRITE_LOCK, @@ -364,6 +369,7 @@ public class AtomixConsistentMapService extends AbstractRaftService { publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result)); return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result); } + // If the value hasn't changed, return a NOOP result. return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue)); } @@ -376,7 +382,10 @@ public class AtomixConsistentMapService extends AbstractRaftService { protected MapEntryUpdateResult putIfAbsent(Commit commit) { String key = commit.value().key(); MapEntryValue oldValue = entries().get(key); + + // If the value is null, this is an INSERT. if (valueIsNull(oldValue)) { + // If the key has been locked by a transaction, return a WRITE_LOCK error. if (preparedKeys.contains(key)) { return new MapEntryUpdateResult<>( MapEntryUpdateResult.Status.WRITE_LOCK, @@ -388,13 +397,16 @@ public class AtomixConsistentMapService extends AbstractRaftService { MapEntryValue.Type.VALUE, commit.index(), commit.value().value()); - entries().put(commit.value().key(), - new MapEntryValue(MapEntryValue.Type.VALUE, newValue.version(), newValue.value())); + entries().put(commit.value().key(), newValue); Versioned result = toVersioned(newValue); publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null)); return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, null); } - return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, commit.index(), key, toVersioned(oldValue)); + return new MapEntryUpdateResult<>( + MapEntryUpdateResult.Status.PRECONDITION_FAILED, + commit.index(), + key, + toVersioned(oldValue)); } /** @@ -407,7 +419,11 @@ public class AtomixConsistentMapService extends AbstractRaftService { String key = commit.value().key(); MapEntryValue oldValue = entries().get(key); MapEntryValue newValue = new MapEntryValue(MapEntryValue.Type.VALUE, commit.index(), commit.value().value()); + + // If the value is null or a tombstone, this is an insert. + // Otherwise, only update the value if it has changed to reduce the number of events. if (valueIsNull(oldValue)) { + // If the key has been locked by a transaction, return a WRITE_LOCK error. if (preparedKeys.contains(key)) { return new MapEntryUpdateResult<>( MapEntryUpdateResult.Status.WRITE_LOCK, @@ -420,6 +436,7 @@ public class AtomixConsistentMapService extends AbstractRaftService { publish(new MapEvent<>(MapEvent.Type.INSERT, "", key, result, null)); return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, commit.index(), key, result); } else if (!valuesEqual(oldValue, newValue)) { + // If the key has been locked by a transaction, return a WRITE_LOCK error. if (preparedKeys.contains(key)) { return new MapEntryUpdateResult<>( MapEntryUpdateResult.Status.WRITE_LOCK, @@ -445,15 +462,26 @@ public class AtomixConsistentMapService extends AbstractRaftService { */ private MapEntryUpdateResult removeIf(long index, String key, Predicate predicate) { MapEntryValue value = entries().get(key); - if (value == null || !predicate.test(value)) { - return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null); + + // If the value does not exist or doesn't match the predicate, return a PRECONDITION_FAILED error. + if (valueIsNull(value) || !predicate.test(value)) { + return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.PRECONDITION_FAILED, index, key, null); } - entries().remove(key); - if (!activeTransactions.isEmpty()) { - entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, value.version, null)); + + // If the key has been locked by a transaction, return a WRITE_LOCK error. + if (preparedKeys.contains(key)) { + return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null); } + + // If no transactions are active, remove the key. Otherwise, replace it with a tombstone. + if (activeTransactions.isEmpty()) { + entries().remove(key); + } else { + entries().put(key, new MapEntryValue(MapEntryValue.Type.TOMBSTONE, index, null)); + } + Versioned result = toVersioned(value); - publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result))); + publish(new MapEvent<>(MapEvent.Type.REMOVE, "", key, null, result)); return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result); } @@ -500,15 +528,24 @@ public class AtomixConsistentMapService extends AbstractRaftService { private MapEntryUpdateResult replaceIf( long index, String key, MapEntryValue newValue, Predicate predicate) { MapEntryValue oldValue = entries().get(key); - if (oldValue == null) { - return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, null); + + // If the key is not set or the current value doesn't match the predicate, return a PRECONDITION_FAILED error. + if (valueIsNull(oldValue) || !predicate.test(oldValue)) { + return new MapEntryUpdateResult<>( + MapEntryUpdateResult.Status.PRECONDITION_FAILED, + index, + key, + toVersioned(oldValue)); } - if (!predicate.test(oldValue)) { - return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.NOOP, index, key, toVersioned(oldValue)); + + // If the key has been locked by a transaction, return a WRITE_LOCK error. + if (preparedKeys.contains(key)) { + return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.WRITE_LOCK, index, key, null); } + entries().put(key, newValue); Versioned result = toVersioned(oldValue); - publish(Lists.newArrayList(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result))); + publish(new MapEvent<>(MapEvent.Type.UPDATE, "", key, toVersioned(newValue), result)); return new MapEntryUpdateResult<>(MapEntryUpdateResult.Status.OK, index, key, result); }