mirror of
https://github.com/opennetworkinglab/onos.git
synced 2026-05-13 16:56:14 +02:00
Fix for ONOS-5035
Change-Id: I0a8edb0c77d2803070dba10c06f83390e1a09832
This commit is contained in:
parent
97abf7e5e4
commit
f1621f163d
@ -1,3 +1,4 @@
|
||||
|
||||
/*
|
||||
* Copyright 2015-present Open Networking Laboratory
|
||||
*
|
||||
@ -48,19 +49,25 @@ import org.onosproject.store.service.MapEvent;
|
||||
import org.onosproject.store.service.MapEventListener;
|
||||
import org.onosproject.store.service.Serializer;
|
||||
import org.onosproject.store.service.StorageService;
|
||||
import org.onosproject.store.service.DistributedPrimitive.Status;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
|
||||
import static org.onlab.util.Tools.groupedThreads;
|
||||
import static org.onosproject.net.DefaultAnnotations.merge;
|
||||
import static org.onosproject.net.host.HostEvent.Type.*;
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
@ -81,6 +88,7 @@ public class DistributedHostStore
|
||||
|
||||
private ConsistentMap<HostId, DefaultHost> hostsConsistentMap;
|
||||
private Map<HostId, DefaultHost> hosts;
|
||||
private Map<IpAddress, Set<Host>> hostsByIp;
|
||||
|
||||
private final ConcurrentHashMap<HostId, DefaultHost> prevHosts =
|
||||
new ConcurrentHashMap<>();
|
||||
@ -88,6 +96,10 @@ public class DistributedHostStore
|
||||
private MapEventListener<HostId, DefaultHost> hostLocationTracker =
|
||||
new HostLocationTracker();
|
||||
|
||||
private ScheduledExecutorService executor;
|
||||
|
||||
private Consumer<Status> statusChangeListener;
|
||||
|
||||
@Activate
|
||||
public void activate() {
|
||||
KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
|
||||
@ -105,6 +117,14 @@ public class DistributedHostStore
|
||||
|
||||
hostsConsistentMap.addListener(hostLocationTracker);
|
||||
|
||||
executor = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "store", log));
|
||||
statusChangeListener = status -> {
|
||||
if (status == Status.ACTIVE) {
|
||||
executor.execute(this::loadHostsByIp);
|
||||
}
|
||||
};
|
||||
hostsConsistentMap.addStatusChangeListener(statusChangeListener);
|
||||
loadHostsByIp();
|
||||
log.info("Started");
|
||||
}
|
||||
|
||||
@ -116,6 +136,20 @@ public class DistributedHostStore
|
||||
log.info("Stopped");
|
||||
}
|
||||
|
||||
private void loadHostsByIp() {
|
||||
hostsByIp = new ConcurrentHashMap<IpAddress, Set<Host>>();
|
||||
hostsConsistentMap.asJavaMap().values().forEach(host -> {
|
||||
host.ipAddresses().forEach(ip -> {
|
||||
Set<Host> existingHosts = hostsByIp.get(ip);
|
||||
if (existingHosts == null) {
|
||||
hostsByIp.put(ip, addHosts(host));
|
||||
} else {
|
||||
existingHosts.add(host);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private boolean shouldUpdate(DefaultHost existingHost,
|
||||
ProviderId providerId,
|
||||
HostId hostId,
|
||||
@ -216,6 +250,7 @@ public class DistributedHostStore
|
||||
if (addresses != null && addresses.contains(ipAddress)) {
|
||||
addresses = new HashSet<>(existingHost.ipAddresses());
|
||||
addresses.remove(ipAddress);
|
||||
removeIpFromHostsByIp(existingHost, ipAddress);
|
||||
return new DefaultHost(existingHost.providerId(),
|
||||
hostId,
|
||||
existingHost.mac(),
|
||||
@ -259,7 +294,8 @@ public class DistributedHostStore
|
||||
|
||||
@Override
|
||||
public Set<Host> getHosts(IpAddress ip) {
|
||||
return filter(hosts.values(), host -> host.ipAddresses().contains(ip));
|
||||
Set<Host> hosts = hostsByIp.get(ip);
|
||||
return hosts != null ? ImmutableSet.copyOf(hosts) : ImmutableSet.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -284,6 +320,58 @@ public class DistributedHostStore
|
||||
return collection.stream().filter(predicate).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
private Set<Host> addHosts(Host host) {
|
||||
Set<Host> hosts = Sets.newConcurrentHashSet();
|
||||
hosts.add(host);
|
||||
return hosts;
|
||||
}
|
||||
|
||||
private Set<Host> updateHosts(Set<Host> existingHosts, Host host) {
|
||||
Iterator<Host> iterator = existingHosts.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Host existingHost = iterator.next();
|
||||
if (existingHost.id().equals(host.id())) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
existingHosts.add(host);
|
||||
return existingHosts;
|
||||
}
|
||||
|
||||
private Set<Host> removeHosts(Set<Host> existingHosts, Host host) {
|
||||
if (existingHosts != null) {
|
||||
Iterator<Host> iterator = existingHosts.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Host existingHost = iterator.next();
|
||||
if (existingHost.id().equals(host.id())) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (existingHosts.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return existingHosts;
|
||||
}
|
||||
|
||||
private void updateHostsByIp(DefaultHost host) {
|
||||
host.ipAddresses().forEach(ip -> {
|
||||
hostsByIp.compute(ip, (k, v) -> v == null ? addHosts(host)
|
||||
: updateHosts(v, host));
|
||||
});
|
||||
}
|
||||
|
||||
private void removeHostsByIp(DefaultHost host) {
|
||||
host.ipAddresses().forEach(ip -> {
|
||||
hostsByIp.computeIfPresent(ip, (k, v) -> removeHosts(v, host));
|
||||
});
|
||||
}
|
||||
|
||||
private void removeIpFromHostsByIp(DefaultHost host, IpAddress ip) {
|
||||
hostsByIp.computeIfPresent(ip, (k, v) -> removeHosts(v, host));
|
||||
}
|
||||
|
||||
private class HostLocationTracker implements MapEventListener<HostId, DefaultHost> {
|
||||
@Override
|
||||
public void event(MapEvent<HostId, DefaultHost> event) {
|
||||
@ -291,9 +379,11 @@ public class DistributedHostStore
|
||||
Host prevHost = prevHosts.put(host.id(), host);
|
||||
switch (event.type()) {
|
||||
case INSERT:
|
||||
updateHostsByIp(host);
|
||||
notifyDelegate(new HostEvent(HOST_ADDED, host));
|
||||
break;
|
||||
case UPDATE:
|
||||
updateHostsByIp(host);
|
||||
if (!Objects.equals(prevHost.location(), host.location())) {
|
||||
notifyDelegate(new HostEvent(HOST_MOVED, host, prevHost));
|
||||
} else if (!Objects.equals(prevHost, host)) {
|
||||
@ -301,6 +391,7 @@ public class DistributedHostStore
|
||||
}
|
||||
break;
|
||||
case REMOVE:
|
||||
removeHostsByIp(host);
|
||||
if (prevHosts.remove(host.id()) != null) {
|
||||
notifyDelegate(new HostEvent(HOST_REMOVED, host));
|
||||
}
|
||||
|
||||
@ -29,6 +29,8 @@ import org.onosproject.net.host.HostDescription;
|
||||
import org.onosproject.net.provider.ProviderId;
|
||||
import org.onosproject.store.service.TestStorageService;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
@ -40,6 +42,7 @@ public class DistributedHostStoreTest extends TestCase {
|
||||
private DistributedHostStore ecXHostStore;
|
||||
|
||||
private static final HostId HOSTID = HostId.hostId(MacAddress.valueOf("1a:1a:1a:1a:1a:1a"));
|
||||
private static final HostId HOSTID1 = HostId.hostId(MacAddress.valueOf("1a:1a:1a:1a:1a:1b"));
|
||||
|
||||
private static final IpAddress IP1 = IpAddress.valueOf("10.2.0.2");
|
||||
private static final IpAddress IP2 = IpAddress.valueOf("10.2.0.3");
|
||||
@ -68,10 +71,7 @@ public class DistributedHostStoreTest extends TestCase {
|
||||
ips.add(IP1);
|
||||
ips.add(IP2);
|
||||
|
||||
HostDescription description = new DefaultHostDescription(HOSTID.mac(),
|
||||
HOSTID.vlanId(),
|
||||
HostLocation.NONE,
|
||||
ips);
|
||||
HostDescription description = createHostDesc(HOSTID, ips);
|
||||
ecXHostStore.createOrUpdateHost(PID, HOSTID, description, false);
|
||||
ecXHostStore.removeIp(HOSTID, IP1);
|
||||
Host host = ecXHostStore.getHost(HOSTID);
|
||||
@ -80,4 +80,47 @@ public class DistributedHostStoreTest extends TestCase {
|
||||
assertTrue(host.ipAddresses().contains(IP2));
|
||||
}
|
||||
|
||||
}
|
||||
@Test
|
||||
public void testAddHostByIp() {
|
||||
Set<IpAddress> ips = new HashSet<>();
|
||||
ips.add(IP1);
|
||||
ips.add(IP2);
|
||||
|
||||
HostDescription description = createHostDesc(HOSTID, ips);
|
||||
ecXHostStore.createOrUpdateHost(PID, HOSTID, description, false);
|
||||
|
||||
Set<Host> hosts = ecXHostStore.getHosts(IP1);
|
||||
|
||||
assertFalse(hosts.size() > 1);
|
||||
assertTrue(hosts.size() == 1);
|
||||
|
||||
HostDescription description1 = createHostDesc(HOSTID1, Sets.newHashSet(IP2));
|
||||
ecXHostStore.createOrUpdateHost(PID, HOSTID1, description1, false);
|
||||
|
||||
Set<Host> hosts1 = ecXHostStore.getHosts(IP2);
|
||||
|
||||
assertFalse(hosts1.size() < 1);
|
||||
assertTrue(hosts1.size() == 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveHostByIp() {
|
||||
Set<IpAddress> ips = new HashSet<>();
|
||||
ips.add(IP1);
|
||||
ips.add(IP2);
|
||||
|
||||
HostDescription description = createHostDesc(HOSTID, ips);
|
||||
ecXHostStore.createOrUpdateHost(PID, HOSTID, description, false);
|
||||
ecXHostStore.removeIp(HOSTID, IP1);
|
||||
Set<Host> hosts = ecXHostStore.getHosts(IP1);
|
||||
assertTrue(hosts.size() == 0);
|
||||
}
|
||||
|
||||
private HostDescription createHostDesc(HostId hostId, Set<IpAddress> ips) {
|
||||
return new DefaultHostDescription(hostId.mac(),
|
||||
hostId.vlanId(),
|
||||
HostLocation.NONE,
|
||||
ips);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user