diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java index a0f6e66d1a..a8e5bc1bb9 100644 --- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java +++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java @@ -1,3 +1,4 @@ + /* * Copyright 2015-present Open Networking Laboratory * @@ -48,19 +49,25 @@ import org.onosproject.store.service.MapEvent; import org.onosproject.store.service.MapEventListener; import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; +import org.onosproject.store.service.DistributedPrimitive.Status; import org.slf4j.Logger; import java.util.Collection; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static org.onlab.util.Tools.groupedThreads; import static org.onosproject.net.DefaultAnnotations.merge; import static org.onosproject.net.host.HostEvent.Type.*; import static org.slf4j.LoggerFactory.getLogger; @@ -81,6 +88,7 @@ public class DistributedHostStore private ConsistentMap hostsConsistentMap; private Map hosts; + private Map> hostsByIp; private final ConcurrentHashMap prevHosts = new ConcurrentHashMap<>(); @@ -88,6 +96,10 @@ public class DistributedHostStore private MapEventListener hostLocationTracker = new HostLocationTracker(); + private ScheduledExecutorService executor; + + private Consumer statusChangeListener; + @Activate public void activate() { KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder() @@ -105,6 +117,14 @@ public class DistributedHostStore hostsConsistentMap.addListener(hostLocationTracker); + executor = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "store", log)); + statusChangeListener = status -> { + if (status == Status.ACTIVE) { + executor.execute(this::loadHostsByIp); + } + }; + hostsConsistentMap.addStatusChangeListener(statusChangeListener); + loadHostsByIp(); log.info("Started"); } @@ -116,6 +136,20 @@ public class DistributedHostStore log.info("Stopped"); } + private void loadHostsByIp() { + hostsByIp = new ConcurrentHashMap>(); + hostsConsistentMap.asJavaMap().values().forEach(host -> { + host.ipAddresses().forEach(ip -> { + Set existingHosts = hostsByIp.get(ip); + if (existingHosts == null) { + hostsByIp.put(ip, addHosts(host)); + } else { + existingHosts.add(host); + } + }); + }); + } + private boolean shouldUpdate(DefaultHost existingHost, ProviderId providerId, HostId hostId, @@ -216,6 +250,7 @@ public class DistributedHostStore if (addresses != null && addresses.contains(ipAddress)) { addresses = new HashSet<>(existingHost.ipAddresses()); addresses.remove(ipAddress); + removeIpFromHostsByIp(existingHost, ipAddress); return new DefaultHost(existingHost.providerId(), hostId, existingHost.mac(), @@ -259,7 +294,8 @@ public class DistributedHostStore @Override public Set getHosts(IpAddress ip) { - return filter(hosts.values(), host -> host.ipAddresses().contains(ip)); + Set hosts = hostsByIp.get(ip); + return hosts != null ? ImmutableSet.copyOf(hosts) : ImmutableSet.of(); } @Override @@ -284,6 +320,58 @@ public class DistributedHostStore return collection.stream().filter(predicate).collect(Collectors.toSet()); } + private Set addHosts(Host host) { + Set hosts = Sets.newConcurrentHashSet(); + hosts.add(host); + return hosts; + } + + private Set updateHosts(Set existingHosts, Host host) { + Iterator iterator = existingHosts.iterator(); + while (iterator.hasNext()) { + Host existingHost = iterator.next(); + if (existingHost.id().equals(host.id())) { + iterator.remove(); + } + } + existingHosts.add(host); + return existingHosts; + } + + private Set removeHosts(Set existingHosts, Host host) { + if (existingHosts != null) { + Iterator iterator = existingHosts.iterator(); + while (iterator.hasNext()) { + Host existingHost = iterator.next(); + if (existingHost.id().equals(host.id())) { + iterator.remove(); + } + } + } + + if (existingHosts.isEmpty()) { + return null; + } + return existingHosts; + } + + private void updateHostsByIp(DefaultHost host) { + host.ipAddresses().forEach(ip -> { + hostsByIp.compute(ip, (k, v) -> v == null ? addHosts(host) + : updateHosts(v, host)); + }); + } + + private void removeHostsByIp(DefaultHost host) { + host.ipAddresses().forEach(ip -> { + hostsByIp.computeIfPresent(ip, (k, v) -> removeHosts(v, host)); + }); + } + + private void removeIpFromHostsByIp(DefaultHost host, IpAddress ip) { + hostsByIp.computeIfPresent(ip, (k, v) -> removeHosts(v, host)); + } + private class HostLocationTracker implements MapEventListener { @Override public void event(MapEvent event) { @@ -291,9 +379,11 @@ public class DistributedHostStore Host prevHost = prevHosts.put(host.id(), host); switch (event.type()) { case INSERT: + updateHostsByIp(host); notifyDelegate(new HostEvent(HOST_ADDED, host)); break; case UPDATE: + updateHostsByIp(host); if (!Objects.equals(prevHost.location(), host.location())) { notifyDelegate(new HostEvent(HOST_MOVED, host, prevHost)); } else if (!Objects.equals(prevHost, host)) { @@ -301,6 +391,7 @@ public class DistributedHostStore } break; case REMOVE: + removeHostsByIp(host); if (prevHosts.remove(host.id()) != null) { notifyDelegate(new HostEvent(HOST_REMOVED, host)); } diff --git a/core/store/dist/src/test/java/org/onosproject/store/host/impl/DistributedHostStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/host/impl/DistributedHostStoreTest.java index 3f9348863f..a103800f53 100644 --- a/core/store/dist/src/test/java/org/onosproject/store/host/impl/DistributedHostStoreTest.java +++ b/core/store/dist/src/test/java/org/onosproject/store/host/impl/DistributedHostStoreTest.java @@ -29,6 +29,8 @@ import org.onosproject.net.host.HostDescription; import org.onosproject.net.provider.ProviderId; import org.onosproject.store.service.TestStorageService; +import com.google.common.collect.Sets; + import java.util.HashSet; import java.util.Set; @@ -40,6 +42,7 @@ public class DistributedHostStoreTest extends TestCase { private DistributedHostStore ecXHostStore; private static final HostId HOSTID = HostId.hostId(MacAddress.valueOf("1a:1a:1a:1a:1a:1a")); + private static final HostId HOSTID1 = HostId.hostId(MacAddress.valueOf("1a:1a:1a:1a:1a:1b")); private static final IpAddress IP1 = IpAddress.valueOf("10.2.0.2"); private static final IpAddress IP2 = IpAddress.valueOf("10.2.0.3"); @@ -68,10 +71,7 @@ public class DistributedHostStoreTest extends TestCase { ips.add(IP1); ips.add(IP2); - HostDescription description = new DefaultHostDescription(HOSTID.mac(), - HOSTID.vlanId(), - HostLocation.NONE, - ips); + HostDescription description = createHostDesc(HOSTID, ips); ecXHostStore.createOrUpdateHost(PID, HOSTID, description, false); ecXHostStore.removeIp(HOSTID, IP1); Host host = ecXHostStore.getHost(HOSTID); @@ -80,4 +80,47 @@ public class DistributedHostStoreTest extends TestCase { assertTrue(host.ipAddresses().contains(IP2)); } -} \ No newline at end of file + @Test + public void testAddHostByIp() { + Set ips = new HashSet<>(); + ips.add(IP1); + ips.add(IP2); + + HostDescription description = createHostDesc(HOSTID, ips); + ecXHostStore.createOrUpdateHost(PID, HOSTID, description, false); + + Set hosts = ecXHostStore.getHosts(IP1); + + assertFalse(hosts.size() > 1); + assertTrue(hosts.size() == 1); + + HostDescription description1 = createHostDesc(HOSTID1, Sets.newHashSet(IP2)); + ecXHostStore.createOrUpdateHost(PID, HOSTID1, description1, false); + + Set hosts1 = ecXHostStore.getHosts(IP2); + + assertFalse(hosts1.size() < 1); + assertTrue(hosts1.size() == 2); + } + + @Test + public void testRemoveHostByIp() { + Set ips = new HashSet<>(); + ips.add(IP1); + ips.add(IP2); + + HostDescription description = createHostDesc(HOSTID, ips); + ecXHostStore.createOrUpdateHost(PID, HOSTID, description, false); + ecXHostStore.removeIp(HOSTID, IP1); + Set hosts = ecXHostStore.getHosts(IP1); + assertTrue(hosts.size() == 0); + } + + private HostDescription createHostDesc(HostId hostId, Set ips) { + return new DefaultHostDescription(hostId.mac(), + hostId.vlanId(), + HostLocation.NONE, + ips); + } + +}