From 00c3f57d35dae14948e2e8b9c9f7901ed3da7875 Mon Sep 17 00:00:00 2001 From: HIGUCHI Yuta Date: Wed, 25 Feb 2015 07:33:50 -0800 Subject: [PATCH] Concurrently update EventuallyConsistentMap - Removed synchronized block on Map updates which may result in anti-entropy AD sent to the peer containing out-of-sync update/remove, such as update and remove for the same key, but stale information will be ignored on the remote peer by timestamp if timestamps are properly generated. Change-Id: Id4f993eb44b7858d37486be0d4baaff1f9025efa --- .../ecmap/EventuallyConsistentMapImpl.java | 68 +++++++++++-------- 1 file changed, 39 insertions(+), 29 deletions(-) diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java index d0e6733804..1069f63baa 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java +++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java @@ -16,6 +16,7 @@ package org.onosproject.store.ecmap; import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.tuple.Pair; import org.onlab.util.KryoNamespace; import org.onosproject.cluster.ClusterService; @@ -42,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -64,8 +66,8 @@ public class EventuallyConsistentMapImpl private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class); - private final Map> items; - private final Map removedItems; + private final ConcurrentMap> items; + private final ConcurrentMap removedItems; private final ClusterService clusterService; private final ClusterCommunicationService clusterCommunicator; @@ -267,16 +269,21 @@ public class EventuallyConsistentMapImpl return false; } - boolean success; - synchronized (this) { - Timestamped existing = items.get(key); + final MutableBoolean updated = new MutableBoolean(false); + + items.compute(key, (k, existing) -> { if (existing != null && existing.isNewerThan(timestamp)) { - log.debug("ecmap - existing was newer {}", value); - success = false; + updated.setFalse(); + return existing; } else { - items.put(key, new Timestamped<>(value, timestamp)); - success = true; + updated.setTrue(); + return new Timestamped<>(value, timestamp); } + }); + + boolean success = updated.booleanValue(); + if (!success) { + log.debug("ecmap - existing was newer {}", value); } if (success && removed != null) { @@ -303,13 +310,21 @@ public class EventuallyConsistentMapImpl } private boolean removeInternal(K key, Timestamp timestamp) { - Timestamped value = items.get(key); - if (value != null) { - if (value.isNewerThan(timestamp)) { - return false; + final MutableBoolean updated = new MutableBoolean(false); + + items.compute(key, (k, existing) -> { + if (existing != null && existing.isNewerThan(timestamp)) { + updated.setFalse(); + return existing; } else { - items.remove(key, value); + updated.setTrue(); + // remove from items map + return null; } + }); + + if (updated.isFalse()) { + return false; } Timestamp removedTimestamp = removedItems.get(key); @@ -554,23 +569,21 @@ public class EventuallyConsistentMapImpl List> externalEvents; boolean sync = false; - synchronized (this) { - externalEvents = antiEntropyCheckLocalItems(ad); + externalEvents = antiEntropyCheckLocalItems(ad); - antiEntropyCheckLocalRemoved(ad); + antiEntropyCheckLocalRemoved(ad); - externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad)); + externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad)); - // if remote ad has something unknown, actively sync - for (K key : ad.timestamps().keySet()) { - if (!items.containsKey(key)) { - sync = true; - break; - } + // if remote ad has something unknown, actively sync + for (K key : ad.timestamps().keySet()) { + if (!items.containsKey(key)) { + sync = true; + break; } - } // synchronized (this) + } - // Send the advertisement outside the synchronized block + // Send the advertisement back if this peer is out-of-sync if (sync) { final NodeId sender = ad.sender(); AntiEntropyAdvertisement myAd = createAdvertisement(); @@ -596,7 +609,6 @@ public class EventuallyConsistentMapImpl * @param ad remote anti-entropy advertisement * @return list of external events relating to local operations performed */ - // Guarded by synchronized (this) private List> antiEntropyCheckLocalItems( AntiEntropyAdvertisement ad) { final List> externalEvents @@ -652,7 +664,6 @@ public class EventuallyConsistentMapImpl * * @param ad remote anti-entropy advertisement */ - // Guarded by synchronized (this) private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement ad) { final NodeId sender = ad.sender(); @@ -690,7 +701,6 @@ public class EventuallyConsistentMapImpl * @param ad remote anti-entropy advertisement * @return list of external events relating to local operations performed */ - // Guarded by synchronized (this) private List> antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement ad) { final List> externalEvents