ONOS-2280: Fix NPE in hosts EC map

Change-Id: I4cb74d7c9526dc0e836e1e2790748324f60183f5
This commit is contained in:
Madan Jampani 2015-07-01 17:37:50 -07:00 committed by Gerrit Code Review
parent 41ee2f0e5d
commit d13f3b8854
3 changed files with 44 additions and 37 deletions

View File

@ -47,6 +47,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.Timer; import java.util.Timer;
@ -312,7 +313,9 @@ public class EventuallyConsistentMapImpl<K, V>
public V remove(K key) { public V remove(K key) {
checkState(!destroyed, destroyedMessage); checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY); checkNotNull(key, ERROR_NULL_KEY);
return removeInternal(key, Optional.empty()); MapValue<V> tombstone = new MapValue<>(null, timestampProvider.apply(key, null));
MapValue<V> previousValue = removeInternal(key, Optional.empty(), tombstone);
return previousValue != null ? previousValue.get() : null;
} }
@Override @Override
@ -320,35 +323,40 @@ public class EventuallyConsistentMapImpl<K, V>
checkState(!destroyed, destroyedMessage); checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY); checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE); checkNotNull(value, ERROR_NULL_VALUE);
removeInternal(key, Optional.of(value)); MapValue<V> tombstone = new MapValue<>(null, timestampProvider.apply(key, null));
removeInternal(key, Optional.of(value), tombstone);
} }
private V removeInternal(K key, Optional<V> value) { private MapValue<V> removeInternal(K key, Optional<V> value, MapValue<V> tombstone) {
checkState(!destroyed, destroyedMessage); checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY); checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE); checkNotNull(value, ERROR_NULL_VALUE);
MapValue<V> newValue = new MapValue<>(null, timestampProvider.apply(key, value.orElse(null))); checkState(tombstone.isTombstone());
AtomicBoolean updated = new AtomicBoolean(false); AtomicBoolean updated = new AtomicBoolean(false);
AtomicReference<V> previousValue = new AtomicReference<>(); AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
items.compute(key, (k, existing) -> { items.compute(key, (k, existing) -> {
if (existing != null && existing.isAlive()) { boolean valueMatches = true;
updated.set(!value.isPresent() || value.get().equals(existing.get())); if (value.isPresent() && existing != null && existing.isAlive()) {
previousValue.set(existing.get()); valueMatches = Objects.equals(value.get(), existing.get());
} }
updated.set(existing == null || newValue.isNewerThan(existing)); updated.set(valueMatches && (existing == null || tombstone.isNewerThan(existing)));
return updated.get() ? newValue : existing; if (updated.get()) {
previousValue.set(existing);
}
return updated.get() ? tombstone : existing;
}); });
if (updated.get()) { if (updated.get()) {
notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, previousValue.get())); notifyPeers(new UpdateEntry<>(key, tombstone), peerUpdateFunction.apply(key, null));
notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get())); if (previousValue.get() != null && previousValue.get().isAlive()) {
notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get().get()));
}
if (persistent) { if (persistent) {
persistentStore.update(key, newValue); persistentStore.update(key, tombstone);
}
} }
return previousValue.get(); return previousValue.get();
} }
return null;
}
@Override @Override
public void putAll(Map<? extends K, ? extends V> m) { public void putAll(Map<? extends K, ? extends V> m) {
@ -540,12 +548,13 @@ public class EventuallyConsistentMapImpl<K, V>
if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) { if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
// local value is more recent, push to sender // local value is more recent, push to sender
queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender)); queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
} else {
if (remoteValueDigest.isTombstone()
&& remoteValueDigest.timestamp().isNewerThan(localValue.timestamp())) {
if (updateInternal(key, new MapValue<>(null, remoteValueDigest.timestamp()))) {
externalEvents.add(new EventuallyConsistentMapEvent<>(REMOVE, key, null));
} }
if (remoteValueDigest != null && remoteValueDigest.isTombstone()) {
MapValue<V> previousValue = removeInternal(key,
Optional.empty(),
new MapValue<>(null, remoteValueDigest.timestamp()));
if (previousValue != null && previousValue.isAlive()) {
externalEvents.add(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
} }
} }
}); });
@ -559,10 +568,13 @@ public class EventuallyConsistentMapImpl<K, V>
updates.forEach(update -> { updates.forEach(update -> {
final K key = update.key(); final K key = update.key();
final MapValue<V> value = update.value(); final MapValue<V> value = update.value();
if (value.isTombstone()) {
if (updateInternal(key, value)) { MapValue<V> previousValue = removeInternal(key, Optional.empty(), value);
final EventuallyConsistentMapEvent.Type type = value.isTombstone() ? REMOVE : PUT; if (previousValue != null && previousValue.get() != null) {
notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value.get())); notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
}
} else if (updateInternal(key, value)) {
notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value.get()));
} }
}); });
} }

View File

@ -1,5 +1,6 @@
package org.onosproject.store.host.impl; package org.onosproject.store.host.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.net.DefaultAnnotations.merge; import static org.onosproject.net.DefaultAnnotations.merge;
import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED; import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED; import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
@ -11,7 +12,6 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -50,13 +50,9 @@ import org.slf4j.Logger;
import com.google.common.collect.HashMultimap; import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps; import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap; import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import static com.google.common.collect.Multimaps.newSetMultimap;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static com.google.common.collect.Sets.newConcurrentHashSet;
/** /**
* Manages the inventory of hosts using a {@code EventuallyConsistentMap}. * Manages the inventory of hosts using a {@code EventuallyConsistentMap}.
@ -76,9 +72,10 @@ public class ECHostStore
protected LogicalClockService clockService; protected LogicalClockService clockService;
// Hosts tracked by their location // Hosts tracked by their location
private final Multimap<ConnectPoint, Host> locations private final SetMultimap<ConnectPoint, Host> locations =
= synchronizedSetMultimap(newSetMultimap(new ConcurrentHashMap<>(), Multimaps.synchronizedSetMultimap(
() -> newConcurrentHashSet())); HashMultimap.<ConnectPoint, Host>create());
private final SetMultimap<ConnectPoint, PortAddresses> portAddresses = private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
Multimaps.synchronizedSetMultimap( Multimaps.synchronizedSetMultimap(
HashMultimap.<ConnectPoint, PortAddresses>create()); HashMultimap.<ConnectPoint, PortAddresses>create());
@ -252,7 +249,7 @@ public class ECHostStore
@Override @Override
public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) { public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) {
DefaultHost host = event.value(); DefaultHost host = checkNotNull(event.value());
if (event.type() == PUT) { if (event.type() == PUT) {
locations.put(host.location(), host); locations.put(host.location(), host);
} else if (event.type() == REMOVE) { } else if (event.type() == REMOVE) {

View File

@ -326,8 +326,6 @@ public class EventuallyConsistentMapImplTest {
= getListener(); = getListener();
listener.event(new EventuallyConsistentMapEvent<>( listener.event(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1)); EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
listener.event(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
listener.event(new EventuallyConsistentMapEvent<>( listener.event(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1)); EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
listener.event(new EventuallyConsistentMapEvent<>( listener.event(new EventuallyConsistentMapEvent<>(