mirror of
https://github.com/opennetworkinglab/onos.git
synced 2025-10-16 18:02:05 +02:00
Ensure multimap events are published on replaceValues
Change-Id: Ie4fb007ae70e618f0adfd16fd506e326b24580bc
This commit is contained in:
parent
57a9af9df8
commit
c8b591ea1b
@ -486,11 +486,28 @@ public class AtomixConsistentSetMultimapService extends AbstractRaftService {
|
|||||||
|
|
||||||
protected Versioned<Collection<? extends byte[]>> replace(
|
protected Versioned<Collection<? extends byte[]>> replace(
|
||||||
Commit<? extends Replace> commit) {
|
Commit<? extends Replace> commit) {
|
||||||
if (!backingMap.containsKey(commit.value().key())) {
|
String key = commit.value().key();
|
||||||
backingMap.put(commit.value().key(),
|
if (!backingMap.containsKey(key)) {
|
||||||
new NonTransactionalCommit());
|
backingMap.put(key, new NonTransactionalCommit());
|
||||||
}
|
}
|
||||||
return backingMap.get(commit.value().key()).addCommit(commit);
|
|
||||||
|
Versioned<Collection<? extends byte[]>> values = backingMap.get(commit.value().key()).addCommit(commit);
|
||||||
|
if (values != null) {
|
||||||
|
Set<byte[]> addedValues = Sets.newTreeSet(new ByteArrayComparator());
|
||||||
|
addedValues.addAll(commit.value().values());
|
||||||
|
|
||||||
|
Set<byte[]> removedValues = Sets.newTreeSet(new ByteArrayComparator());
|
||||||
|
removedValues.addAll(values.value());
|
||||||
|
|
||||||
|
List<MultimapEvent<String, byte[]>> events = Lists.newArrayList();
|
||||||
|
Sets.difference(removedValues, addedValues)
|
||||||
|
.forEach(value -> events.add(new MultimapEvent<>("", key, null, value)));
|
||||||
|
Sets.difference(addedValues, removedValues)
|
||||||
|
.forEach(value -> events.add(new MultimapEvent<>("", key, value, null)));
|
||||||
|
|
||||||
|
publish(events);
|
||||||
|
}
|
||||||
|
return values;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -23,6 +23,8 @@ import java.util.Comparator;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
@ -34,9 +36,13 @@ import org.apache.commons.collections.keyvalue.DefaultMapEntry;
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.onlab.util.Tools;
|
import org.onlab.util.Tools;
|
||||||
import org.onosproject.store.service.AsyncIterator;
|
import org.onosproject.store.service.AsyncIterator;
|
||||||
|
import org.onosproject.store.service.MultimapEvent;
|
||||||
|
import org.onosproject.store.service.MultimapEventListener;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -441,6 +447,45 @@ public class AtomixConsistentSetMultimapTest extends AtomixTestBase<AtomixConsis
|
|||||||
map.destroy().join();
|
map.destroy().join();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultimapEvents() throws Throwable {
|
||||||
|
final byte[] value1 = Tools.getBytesUtf8("value1");
|
||||||
|
final byte[] value2 = Tools.getBytesUtf8("value2");
|
||||||
|
final byte[] value3 = Tools.getBytesUtf8("value3");
|
||||||
|
|
||||||
|
AtomixConsistentSetMultimap map = createResource("testFourMap");
|
||||||
|
TestMultimapEventListener listener = new TestMultimapEventListener();
|
||||||
|
|
||||||
|
// add listener; insert new value into map and verify an INSERT event is received.
|
||||||
|
map.addListener(listener).thenCompose(v -> map.put("foo", value1)).join();
|
||||||
|
MultimapEvent<String, byte[]> event = listener.event();
|
||||||
|
assertNotNull(event);
|
||||||
|
assertEquals(MultimapEvent.Type.INSERT, event.type());
|
||||||
|
assertTrue(Arrays.equals(value1, event.newValue()));
|
||||||
|
|
||||||
|
// remove listener and verify listener is not notified.
|
||||||
|
map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join();
|
||||||
|
assertFalse(listener.eventReceived());
|
||||||
|
|
||||||
|
// add listener; insert new value into map and verify an INSERT event is received.
|
||||||
|
map.addListener(listener)
|
||||||
|
.thenCompose(v -> map.replaceValues("foo", Arrays.asList(value2, value3))).join();
|
||||||
|
event = listener.event();
|
||||||
|
assertNotNull(event);
|
||||||
|
assertEquals(MultimapEvent.Type.REMOVE, event.type());
|
||||||
|
assertArrayEquals(value1, event.oldValue());
|
||||||
|
event = listener.event();
|
||||||
|
assertNotNull(event);
|
||||||
|
assertEquals(MultimapEvent.Type.INSERT, event.type());
|
||||||
|
assertArrayEquals(value3, event.newValue());
|
||||||
|
|
||||||
|
// remove listener and verify listener is not notified.
|
||||||
|
map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join();
|
||||||
|
assertFalse(listener.eventReceived());
|
||||||
|
|
||||||
|
map.removeListener(listener).join();
|
||||||
|
}
|
||||||
|
|
||||||
private AtomixConsistentSetMultimap createResource(String mapName) {
|
private AtomixConsistentSetMultimap createResource(String mapName) {
|
||||||
try {
|
try {
|
||||||
AtomixConsistentSetMultimap map = newPrimitive(mapName);
|
AtomixConsistentSetMultimap map = newPrimitive(mapName);
|
||||||
@ -548,4 +593,26 @@ public class AtomixConsistentSetMultimapTest extends AtomixTestBase<AtomixConsis
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class TestMultimapEventListener implements MultimapEventListener<String, byte[]> {
|
||||||
|
|
||||||
|
private final BlockingQueue<MultimapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void event(MultimapEvent<String, byte[]> event) {
|
||||||
|
try {
|
||||||
|
queue.put(event);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new IllegalStateException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean eventReceived() {
|
||||||
|
return !queue.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
public MultimapEvent<String, byte[]> event() throws InterruptedException {
|
||||||
|
return queue.take();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user