diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java index 2f5bc0a00d..b45e1b2138 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java @@ -339,8 +339,11 @@ public class EventuallyConsistentMapImpl checkNotNull(value, ERROR_NULL_VALUE); MapValue newValue = new MapValue<>(value, timestampProvider.apply(key, value)); + // Before mutating local map, ensure the update can be serialized without errors. + // This prevents replica divergence due to serialization failures. + UpdateEntry update = serializer.copy(new UpdateEntry(key, newValue)); if (putInternal(key, newValue)) { - notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value)); + notifyPeers(update, peerUpdateFunction.apply(key, value)); notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value)); } } @@ -417,13 +420,15 @@ public class EventuallyConsistentMapImpl AtomicBoolean updated = new AtomicBoolean(false); AtomicReference> previousValue = new AtomicReference<>(); - MapValue computedValue = items.compute(key, (k, mv) -> { + MapValue computedValue = items.compute(serializer.copy(key), (k, mv) -> { previousValue.set(mv); V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get()); MapValue newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue)); if (mv == null || newValue.isNewerThan(mv)) { updated.set(true); - return newValue; + // We return a copy to ensure updates to peers can be serialized. + // This prevents replica divergence due to serialization failures. + return serializer.copy(newValue); } else { return mv; } diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoSerializer.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoSerializer.java index cdc70581a3..e621f97197 100644 --- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoSerializer.java +++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoSerializer.java @@ -77,6 +77,11 @@ public class KryoSerializer implements StoreSerializer { return serializerPool.deserialize(stream); } + @Override + public T copy(T object) { + return decode(encode(object)); + } + @Override public String toString() { return MoreObjects.toStringHelper(getClass()) diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/StoreSerializer.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/StoreSerializer.java index 128a2ea61c..e605791f1e 100644 --- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/StoreSerializer.java +++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/StoreSerializer.java @@ -75,4 +75,13 @@ public interface StoreSerializer { * @param decoded type */ T decode(final InputStream stream); + + /** + * Returns a copy of the specfied object. + * + * @param object object to copy + * @return a copy of the object + * @param object type + */ + T copy(final T object); }