From caad26b86f260db2a929ea7d44b6ed010f73297c Mon Sep 17 00:00:00 2001 From: HIGUCHI Yuta Date: Sat, 16 Apr 2016 16:06:11 -0700 Subject: [PATCH] Fix for ONOS-4315 - Additional log on error - Allow count=0 using CountDownCompleter - test case to detect the issue (@Ignored by default right now) - other bug fixes found along the way Based on patch by Madan@China Change-Id: I7d6cb8c214052859900ef7ee0337a7e1c8a9d295 --- .../impl/AtomixConsistentMapState.java | 17 +++++-- .../impl/AtomixConsistentMapTest.java | 44 ++++++++++++++++++- .../org/onlab/util/CountDownCompleter.java | 5 ++- 3 files changed, 60 insertions(+), 6 deletions(-) diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java index 4823902cee..48fd4a21c1 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java @@ -18,6 +18,7 @@ package org.onosproject.store.primitives.resources.impl; import static org.onosproject.store.service.MapEvent.Type.INSERT; import static org.onosproject.store.service.MapEvent.Type.REMOVE; import static org.onosproject.store.service.MapEvent.Type.UPDATE; +import static org.slf4j.LoggerFactory.getLogger; import io.atomix.copycat.server.session.ServerSession; import io.atomix.copycat.server.Commit; import io.atomix.copycat.server.Snapshottable; @@ -60,7 +61,9 @@ import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapComman import org.onosproject.store.service.MapEvent; import org.onosproject.store.service.MapTransaction; import org.onosproject.store.service.Versioned; +import org.slf4j.Logger; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -72,6 +75,7 @@ import static com.google.common.base.Preconditions.checkState; */ public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable { + private final Logger log = getLogger(getClass()); private final Map> listeners = new HashMap<>(); private final Map mapEntries = new HashMap<>(); private final Set preparedKeys = Sets.newHashSet(); @@ -384,7 +388,7 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se } MapEntryValue existingValue = mapEntries.get(key); if (existingValue == null) { - if (update.currentValue() != null) { + if (update.type() != MapUpdate.Type.PUT_IF_ABSENT) { return PrepareResult.OPTIMISTIC_LOCK_FAILURE; } } else { @@ -399,6 +403,9 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se transaction.updates().forEach(u -> preparedKeys.add(u.key())); ok = true; return PrepareResult.OK; + } catch (Exception e) { + log.warn("Failure applying {}", commit, e); + throw Throwables.propagate(e); } finally { if (!ok) { commit.close(); @@ -416,6 +423,9 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se TransactionId transactionId = commit.operation().transactionId(); try { return commitInternal(transactionId); + } catch (Exception e) { + log.warn("Failure applying {}", commit, e); + throw Throwables.propagate(e); } finally { commit.close(); } @@ -438,12 +448,11 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se List> eventsToPublish = Lists.newArrayList(); for (MapUpdate update : transaction.updates()) { String key = update.key(); + checkState(preparedKeys.remove(key), "key is not prepared"); MapEntryValue previousValue = mapEntries.remove(key); MapEntryValue newValue = null; - checkState(preparedKeys.remove(key), "key is not prepared"); if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) { - newValue = new TransactionalCommit(key, - versionCounter.incrementAndGet(), completer); + newValue = new TransactionalCommit(key, versionCounter.incrementAndGet(), completer); } eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue))); if (newValue != null) { diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java index f7c44c2023..a5fdb4948a 100644 --- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java +++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java @@ -16,6 +16,8 @@ package org.onosproject.store.primitives.resources.impl; import io.atomix.resource.ResourceType; + +import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; import java.util.Arrays; @@ -348,6 +350,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase { map.addListener(listener).join(); + // PUT_IF_ABSENT MapUpdate update1 = MapUpdate.newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT) .withKey("foo") @@ -359,6 +362,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase { map.prepare(tx).thenAccept(result -> { assertEquals(true, result); }).join(); + // verify changes in Tx is not visible yet until commit assertFalse(listener.eventReceived()); map.size().thenAccept(result -> { @@ -371,7 +375,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase { try { map.put("foo", value2).join(); - assertTrue(false); + fail("update to map entry in open tx should fail with Exception"); } catch (CompletionException e) { assertEquals(ConcurrentModificationException.class, e.getCause().getClass()); } @@ -384,6 +388,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase { assertEquals(MapEvent.Type.INSERT, event.type()); assertTrue(Arrays.equals(value1, event.newValue().value())); + // map should be update-able after commit map.put("foo", value2).thenAccept(result -> { assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1)); }).join(); @@ -391,6 +396,43 @@ public class AtomixConsistentMapTest extends AtomixTestBase { assertNotNull(event); assertEquals(MapEvent.Type.UPDATE, event.type()); assertTrue(Arrays.equals(value2, event.newValue().value())); + + + // REMOVE_IF_VERSION_MATCH + byte[] currFoo = map.get("foo").get().value(); + long currFooVersion = map.get("foo").get().version(); + MapUpdate remove1 = + MapUpdate.newBuilder().withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH) + .withKey("foo") + .withCurrentVersion(currFooVersion) + .build(); + + tx = new MapTransaction<>(TransactionId.from("tx2"), Arrays.asList(remove1)); + + map.prepare(tx).thenAccept(result -> { + assertTrue("prepare should succeed", result); + }).join(); + // verify changes in Tx is not visible yet until commit + assertFalse(listener.eventReceived()); + + map.size().thenAccept(size -> { + assertThat(size, is(1)); + }).join(); + + map.get("foo").thenAccept(result -> { + assertThat(result.value(), is(currFoo)); + }).join(); + + map.commit(tx.transactionId()).join(); + event = listener.event(); + assertNotNull(event); + assertEquals(MapEvent.Type.REMOVE, event.type()); + assertArrayEquals(currFoo, event.oldValue().value()); + + map.size().thenAccept(size -> { + assertThat(size, is(0)); + }).join(); + } protected void transactionRollbackTests(int clusterSize) throws Throwable { diff --git a/utils/misc/src/main/java/org/onlab/util/CountDownCompleter.java b/utils/misc/src/main/java/org/onlab/util/CountDownCompleter.java index e3ef9bea0e..ef17bc41d0 100644 --- a/utils/misc/src/main/java/org/onlab/util/CountDownCompleter.java +++ b/utils/misc/src/main/java/org/onlab/util/CountDownCompleter.java @@ -45,10 +45,13 @@ public final class CountDownCompleter { * @param onCompleteCallback callback to invoke when completer is completed */ public CountDownCompleter(T object, long count, Consumer onCompleteCallback) { - checkState(count > 0, "count must be positive"); + checkState(count >= 0, "count must be non-negative"); this.counter = new AtomicLong(count); this.object = checkNotNull(object); this.onCompleteCallback = checkNotNull(onCompleteCallback); + if (count == 0) { + onCompleteCallback.accept(object); + } } /**