diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java index 6c864b016f..fd2e676634 100644 --- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java +++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java @@ -486,11 +486,28 @@ public class AtomixConsistentSetMultimapService extends AbstractRaftService { protected Versioned> replace( Commit commit) { - if (!backingMap.containsKey(commit.value().key())) { - backingMap.put(commit.value().key(), - new NonTransactionalCommit()); + String key = commit.value().key(); + if (!backingMap.containsKey(key)) { + backingMap.put(key, new NonTransactionalCommit()); } - return backingMap.get(commit.value().key()).addCommit(commit); + + Versioned> values = backingMap.get(commit.value().key()).addCommit(commit); + if (values != null) { + Set addedValues = Sets.newTreeSet(new ByteArrayComparator()); + addedValues.addAll(commit.value().values()); + + Set removedValues = Sets.newTreeSet(new ByteArrayComparator()); + removedValues.addAll(values.value()); + + List> events = Lists.newArrayList(); + Sets.difference(removedValues, addedValues) + .forEach(value -> events.add(new MultimapEvent<>("", key, null, value))); + Sets.difference(addedValues, removedValues) + .forEach(value -> events.add(new MultimapEvent<>("", key, value, null))); + + publish(events); + } + return values; } /** diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java index b14afb3bc8..44d09bf538 100644 --- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java +++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java @@ -23,6 +23,8 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; @@ -34,9 +36,13 @@ import org.apache.commons.collections.keyvalue.DefaultMapEntry; import org.junit.Test; import org.onlab.util.Tools; import org.onosproject.store.service.AsyncIterator; +import org.onosproject.store.service.MultimapEvent; +import org.onosproject.store.service.MultimapEventListener; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; /** @@ -441,6 +447,45 @@ public class AtomixConsistentSetMultimapTest extends AtomixTestBase map.put("foo", value1)).join(); + MultimapEvent event = listener.event(); + assertNotNull(event); + assertEquals(MultimapEvent.Type.INSERT, event.type()); + assertTrue(Arrays.equals(value1, event.newValue())); + + // remove listener and verify listener is not notified. + map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join(); + assertFalse(listener.eventReceived()); + + // add listener; insert new value into map and verify an INSERT event is received. + map.addListener(listener) + .thenCompose(v -> map.replaceValues("foo", Arrays.asList(value2, value3))).join(); + event = listener.event(); + assertNotNull(event); + assertEquals(MultimapEvent.Type.REMOVE, event.type()); + assertArrayEquals(value1, event.oldValue()); + event = listener.event(); + assertNotNull(event); + assertEquals(MultimapEvent.Type.INSERT, event.type()); + assertArrayEquals(value3, event.newValue()); + + // remove listener and verify listener is not notified. + map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join(); + assertFalse(listener.eventReceived()); + + map.removeListener(listener).join(); + } + private AtomixConsistentSetMultimap createResource(String mapName) { try { AtomixConsistentSetMultimap map = newPrimitive(mapName); @@ -548,4 +593,26 @@ public class AtomixConsistentSetMultimapTest extends AtomixTestBase { + + private final BlockingQueue> queue = new ArrayBlockingQueue<>(1); + + @Override + public void event(MultimapEvent event) { + try { + queue.put(event); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + + public boolean eventReceived() { + return !queue.isEmpty(); + } + + public MultimapEvent event() throws InterruptedException { + return queue.take(); + } + } }