CORD-1416 Implement multi-homing probing in HostLocationProvider

Also include following refactoring
    - Refactor the way we generate ARP probe
    - Remove some unused code

Change-Id: I96b1c47bd5731b7b38ef4d19a941d231b5d0054c
This commit is contained in:
Charles Chan 2017-08-14 11:42:11 -07:00 committed by Thomas Vachuska
parent 98c5bec26d
commit 35a323233b
8 changed files with 380 additions and 51 deletions

View File

@ -370,6 +370,8 @@ public class SegmentRoutingManager implements SegmentRoutingService {
"forceUnprovision", "true");
compCfgService.preSetProperty("org.onosproject.routeservice.store.RouteStoreImpl",
"distributed", "true");
compCfgService.preSetProperty("org.onosproject.provider.host.impl.HostLocationProvider",
"multihomingEnabled", "true");
processor = new InternalPacketProcessor();
linkListener = new InternalLinkListener();

View File

@ -16,6 +16,7 @@
package org.onosproject.net.host;
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onosproject.net.HostId;
import org.onosproject.net.HostLocation;
import org.onosproject.net.provider.ProviderService;
@ -57,4 +58,23 @@ public interface HostProviderService extends ProviderService<HostProvider> {
* @param location location of host that vanished
*/
void removeLocationFromHost(HostId hostId, HostLocation location);
/**
* Notifies HostProviderService the beginning of pending host location verification and
* retrieves the unique MAC address for the probe.
*
* @param hostId ID of the host
* @param hostLocation the host location that is under verification
* @return probeMac, the source MAC address ONOS uses to probe the host
*/
default MacAddress addPendingHostLocation(HostId hostId, HostLocation hostLocation) {
return MacAddress.NONE;
}
/**
* Notifies HostProviderService the end of pending host location verification.
*
* @param probeMac the source MAC address ONOS uses to probe the host
*/
default void removePendingHostLocation(MacAddress probeMac) {}
}

View File

@ -134,4 +134,22 @@ public interface HostStore extends Store<HostEvent, HostStoreDelegate> {
*/
Set<Host> getConnectedHosts(DeviceId deviceId);
/**
* Notifies HostStore the beginning of pending host location verification and
* retrieves the unique MAC address for the probe.
*
* @param hostId ID of the host
* @param hostLocation the host location that is under verification
* @return probeMac, the source MAC address ONOS uses to probe the host
*/
default MacAddress addPendingHostLocation(HostId hostId, HostLocation hostLocation) {
return MacAddress.NONE;
}
/**
* Notifies HostStore the end of pending host location verification.
*
* @param probeMac the source MAC address ONOS uses to probe the host
*/
default void removePendingHostLocation(MacAddress probeMac) {}
}

View File

@ -455,6 +455,16 @@ public class HostManager
store.removeLocation(hostId, location);
}
@Override
public MacAddress addPendingHostLocation(HostId hostId, HostLocation hostLocation) {
return store.addPendingHostLocation(hostId, hostLocation);
}
@Override
public void removePendingHostLocation(MacAddress probeMac) {
store.removePendingHostLocation(probeMac);
}
private boolean allowedToChange(HostId hostId) {
// Disallow removing inexistent host or host provided by others
Host host = store.getHost(hostId);

View File

@ -15,6 +15,9 @@
*/
package org.onosproject.store.host.impl;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
@ -49,6 +52,7 @@ import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.DistributedPrimitive.Status;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import java.util.Collection;
@ -58,7 +62,10 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -88,29 +95,67 @@ public class DistributedHostStore
private ConsistentMap<HostId, DefaultHost> hostsConsistentMap;
private Map<HostId, DefaultHost> hosts;
private Map<IpAddress, Set<Host>> hostsByIp;
private MapEventListener<HostId, DefaultHost> hostLocationTracker =
new HostLocationTracker();
private ConsistentMap<MacAddress, PendingHostLocation> pendingHostsConsistentMap;
private Map<MacAddress, PendingHostLocation> pendingHosts;
private MapEventListener<MacAddress, PendingHostLocation> pendingHostListener =
new PendingHostListener();
private ScheduledExecutorService executor;
private Consumer<Status> statusChangeListener;
// TODO make this configurable
private static final int PROBE_TIMEOUT_MS = 1000;
private Cache<MacAddress, PendingHostLocation> pendingHostsCache = CacheBuilder.newBuilder()
.expireAfterWrite(PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.removalListener((RemovalNotification<MacAddress, PendingHostLocation> notification) -> {
switch (notification.getCause()) {
case EXPIRED:
PendingHostLocation expired = notification.getValue();
if (expired != null) {
log.info("Evict timeout probe {} from pendingHostLocations", notification.getValue());
timeoutPendingHostLocation(notification.getKey());
}
break;
case EXPLICIT:
break;
default:
log.warn("Remove {} from pendingHostLocations for unexpected reason {}",
notification.getKey(), notification.getCause());
}
}).build();
private ScheduledExecutorService cacheCleaner = Executors.newSingleThreadScheduledExecutor();
@Activate
public void activate() {
KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API);
hostsConsistentMap = storageService.<HostId, DefaultHost>consistentMapBuilder()
.withName("onos-hosts")
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(hostSerializer.build()))
.build();
hostsConsistentMap.addListener(hostLocationTracker);
hosts = hostsConsistentMap.asJavaMap();
KryoNamespace.Builder pendingHostSerializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(PendingHostLocation.class);
pendingHostsConsistentMap = storageService.<MacAddress, PendingHostLocation>consistentMapBuilder()
.withName("onos-hosts-pending")
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(pendingHostSerializer.build()))
.build();
pendingHostsConsistentMap.addListener(pendingHostListener);
pendingHosts = pendingHostsConsistentMap.asJavaMap();
hostsConsistentMap.addListener(hostLocationTracker);
cacheCleaner.scheduleAtFixedRate(pendingHostsCache::cleanUp, 0,
PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
executor = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "store", log));
statusChangeListener = status -> {
@ -127,6 +172,8 @@ public class DistributedHostStore
public void deactivate() {
hostsConsistentMap.removeListener(hostLocationTracker);
cacheCleaner.shutdown();
log.info("Stopped");
}
@ -335,6 +382,32 @@ public class DistributedHostStore
return ImmutableSet.copyOf(filtered);
}
@Override
public MacAddress addPendingHostLocation(HostId hostId, HostLocation hostLocation) {
// Use ONLab OUI (3 bytes) + atomic counter (3 bytes) as the source MAC of the probe
long nextIndex = storageService.getAtomicCounter("onos-hosts-probe-index").getAndIncrement();
MacAddress probeMac = MacAddress.valueOf(MacAddress.NONE.toLong() + nextIndex);
PendingHostLocation phl = new PendingHostLocation(hostId, hostLocation);
pendingHostsCache.put(probeMac, phl);
pendingHosts.put(probeMac, phl);
return probeMac;
}
@Override
public void removePendingHostLocation(MacAddress probeMac) {
pendingHostsCache.invalidate(probeMac);
pendingHosts.remove(probeMac);
}
private void timeoutPendingHostLocation(MacAddress probeMac) {
pendingHosts.computeIfPresent(probeMac, (k, v) -> {
v.setExpired(true);
return v;
});
}
private Set<Host> filter(Collection<DefaultHost> collection, Predicate<DefaultHost> predicate) {
return collection.stream().filter(predicate).collect(Collectors.toSet());
}
@ -418,4 +491,28 @@ public class DistributedHostStore
}
}
}
private class PendingHostListener implements MapEventListener<MacAddress, PendingHostLocation> {
@Override
public void event(MapEvent<MacAddress, PendingHostLocation> event) {
Versioned<PendingHostLocation> newValue = event.newValue();
switch (event.type()) {
case INSERT:
break;
case UPDATE:
if (newValue.value().expired()) {
Executor locationRemover = Executors.newSingleThreadScheduledExecutor();
locationRemover.execute(() -> {
pendingHosts.remove(event.key());
removeLocation(newValue.value().hostId(), newValue.value().location());
});
}
break;
case REMOVE:
break;
default:
log.warn("Unknown map event type: {}", event.type());
}
}
}
}

View File

@ -0,0 +1,108 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.host.impl;
import org.onosproject.net.HostId;
import org.onosproject.net.HostLocation;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Internal data structure to record the info of a host with location that is under verification.
*/
class PendingHostLocation {
private HostId hostId;
private HostLocation location;
private boolean expired;
/**
* Constructs PendingHostLocation.
*
* @param hostId Host ID
* @param location location to be verified
*/
PendingHostLocation(HostId hostId, HostLocation location) {
this.hostId = hostId;
this.location = location;
this.expired = false;
}
/**
* Gets HostId of this entry.
*
* @return host id
*/
HostId hostId() {
return hostId;
}
/**
* Gets HostLocation of this entry.
*
* @return host location
*/
HostLocation location() {
return location;
}
/**
* Determine whether this probe is expired or not.
*
* @return true if this entry is expired and waiting to be removed from the cache
*/
boolean expired() {
return expired;
}
/**
* Sets whether this probe is expired or not.
*
* @param expired true if this entry is expired and waiting to be removed from the cache
*/
void setExpired(boolean expired) {
this.expired = expired;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof PendingHostLocation)) {
return false;
}
PendingHostLocation that = (PendingHostLocation) o;
return (Objects.equals(this.hostId, that.hostId) &&
Objects.equals(this.location, that.location) &&
Objects.equals(this.expired, that.expired));
}
@Override
public int hashCode() {
return Objects.hash(hostId, location, expired);
}
@Override
public String toString() {
return toStringHelper(getClass())
.add("hostId", hostId)
.add("location", location)
.add("expired", expired)
.toString();
}
}

View File

@ -15,6 +15,7 @@
*/
package org.onosproject.provider.host.impl;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@ -31,10 +32,10 @@ import org.onlab.packet.ICMP6;
import org.onlab.packet.IPacket;
import org.onlab.packet.IPv4;
import org.onlab.packet.IPv6;
import org.onlab.packet.Ip4Address;
import org.onlab.packet.Ip6Address;
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onlab.packet.TpPort;
import org.onlab.packet.UDP;
import org.onlab.packet.VlanId;
import org.onlab.packet.dhcp.Dhcp6ClientIdOption;
@ -90,6 +91,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;
import java.util.Set;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
@ -162,9 +164,11 @@ public class HostLocationProvider extends AbstractProvider implements HostProvid
label = "Enable requesting packet intercepts")
private boolean requestInterceptsEnabled = true;
protected ExecutorService eventHandler;
@Property(name = "multihomingEnabled", boolValue = false,
label = "Allow hosts to be multihomed")
private boolean multihomingEnabled = false;
private static final byte[] SENDER_ADDRESS = IpAddress.valueOf("0.0.0.0").toOctets();
protected ExecutorService eventHandler;
/**
* Creates an OpenFlow host provider.
@ -245,18 +249,6 @@ public class HostLocationProvider extends AbstractProvider implements HostProvid
packetService.cancelPackets(ipv6NsSelector, PacketPriority.CONTROL, appId);
packetService.cancelPackets(ipv6NaSelector, PacketPriority.CONTROL, appId);
}
// Use DHCP
TrafficSelector dhcpServerSelector = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPProtocol(IPv4.PROTOCOL_UDP)
.matchUdpSrc(TpPort.tpPort(UDP.DHCP_SERVER_PORT))
.build();
TrafficSelector dhcpClientSelector = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPProtocol(IPv4.PROTOCOL_UDP)
.matchUdpSrc(TpPort.tpPort(UDP.DHCP_CLIENT_PORT))
.build();
}
/**
@ -336,6 +328,17 @@ public class HostLocationProvider extends AbstractProvider implements HostProvid
log.info("Configured. Request intercepts is {}",
requestInterceptsEnabled ? "enabled" : "disabled");
}
flag = Tools.isPropertyEnabled(properties, "multihomingEnabled");
if (flag == null) {
log.info("Multihoming is not configured, " +
"using current value of {}", multihomingEnabled);
} else {
multihomingEnabled = flag;
log.info("Configured. Multihoming is {}",
multihomingEnabled ? "enabled" : "disabled");
}
}
@Override
@ -378,26 +381,9 @@ public class HostLocationProvider extends AbstractProvider implements HostProvid
// This method is using source ip as 0.0.0.0 , to receive the reply even from the sub net hosts.
private Ethernet buildArpRequest(IpAddress targetIp, Host host) {
ARP arp = new ARP();
arp.setHardwareType(ARP.HW_TYPE_ETHERNET)
.setHardwareAddressLength((byte) Ethernet.DATALAYER_ADDRESS_LENGTH)
.setProtocolType(ARP.PROTO_TYPE_IP)
.setProtocolAddressLength((byte) IpAddress.INET_BYTE_LENGTH)
.setOpCode(ARP.OP_REQUEST);
arp.setSenderHardwareAddress(MacAddress.BROADCAST.toBytes())
.setSenderProtocolAddress(SENDER_ADDRESS)
.setTargetHardwareAddress(MacAddress.BROADCAST.toBytes())
.setTargetProtocolAddress(targetIp.toOctets());
Ethernet ethernet = new Ethernet();
ethernet.setEtherType(Ethernet.TYPE_ARP)
.setDestinationMACAddress(MacAddress.BROADCAST)
.setSourceMACAddress(MacAddress.BROADCAST).setPayload(arp);
ethernet.setPad(true);
return ethernet;
return ARP.buildArpRequest(MacAddress.BROADCAST.toBytes(), Ip4Address.ZERO.toOctets(),
MacAddress.BROADCAST.toBytes(), targetIp.toOctets(),
MacAddress.BROADCAST.toBytes(), VlanId.NONE.toShort());
}
private class InternalHostProvider implements PacketProcessor {
@ -414,9 +400,23 @@ public class HostLocationProvider extends AbstractProvider implements HostProvid
private void createOrUpdateHost(HostId hid, MacAddress mac,
VlanId vlan, HostLocation hloc,
IpAddress ip) {
Set<HostLocation> newLocations = Sets.newHashSet(hloc);
if (multihomingEnabled) {
Host existingHost = hostService.getHost(hid);
if (existingHost != null) {
Set<HostLocation> prevLocations = existingHost.locations();
newLocations.addAll(prevLocations);
if (!existingHost.locations().contains(hloc)) {
probeLocations(existingHost);
}
}
}
HostDescription desc = ip == null || ip.isZero() || ip.isSelfAssigned() ?
new DefaultHostDescription(mac, vlan, hloc) :
new DefaultHostDescription(mac, vlan, hloc, ip);
new DefaultHostDescription(mac, vlan, newLocations, Sets.newHashSet(), false) :
new DefaultHostDescription(mac, vlan, newLocations, Sets.newHashSet(ip), false);
try {
providerService.hostDetected(hid, desc, false);
} catch (IllegalStateException e) {
@ -424,6 +424,50 @@ public class HostLocationProvider extends AbstractProvider implements HostProvid
}
}
/**
* Start verification procedure of all previous locations by sending probes.
*
* @param host Host to be probed
*/
private void probeLocations(Host host) {
host.locations().forEach(location -> {
MacAddress probeMac = providerService.addPendingHostLocation(host.id(), location);
host.ipAddresses().stream().findFirst().ifPresent(ip -> {
Ethernet probe;
if (ip.isIp4()) {
probe = ARP.buildArpRequest(probeMac.toBytes(), Ip4Address.ZERO.toOctets(),
host.id().mac().toBytes(), ip.toOctets(),
host.id().mac().toBytes(), host.id().vlanId().toShort());
} else {
probe = NeighborSolicitation.buildNdpSolicit(
ip.getIp6Address().toOctets(),
IPv6.getLinkLocalAddress(probeMac.toBytes()),
IPv6.getSolicitNodeAddress(ip.getIp6Address().toOctets()),
probeMac.toBytes(),
IPv6.getMCastMacAddress(ip.getIp6Address().toOctets()),
host.id().vlanId());
}
sendProbe(probe, location);
});
});
}
/**
* Send the probe packet on given port.
*
* @param probe the probe packet
* @param connectPoint the port we want to probe
*/
private void sendProbe(Ethernet probe, ConnectPoint connectPoint) {
log.info("Probing host {} on location {} with probeMac {}",
probe.getDestinationMAC(), connectPoint, probe.getSourceMAC());
TrafficTreatment treatment = DefaultTrafficTreatment.builder().setOutput(connectPoint.port()).build();
OutboundPacket outboundPacket = new DefaultOutboundPacket(connectPoint.deviceId(),
treatment, ByteBuffer.wrap(probe.serialize()));
packetService.emit(outboundPacket);
}
/**
* Updates IP address for an existing host.
*
@ -433,12 +477,12 @@ public class HostLocationProvider extends AbstractProvider implements HostProvid
private void updateHostIp(HostId hid, IpAddress ip) {
Host host = hostService.getHost(hid);
if (host == null) {
log.debug("Fail to update IP for {}. Host does not exist");
log.warn("Fail to update IP for {}. Host does not exist", hid);
return;
}
HostDescription desc = new DefaultHostDescription(hid.mac(), hid.vlanId(),
host.location(), ip);
host.locations(), Sets.newHashSet(ip), false);
try {
providerService.hostDetected(hid, desc, false);
} catch (IllegalStateException e) {
@ -478,6 +522,14 @@ public class HostLocationProvider extends AbstractProvider implements HostProvid
HostLocation hloc = new HostLocation(heardOn, System.currentTimeMillis());
HostId hid = HostId.hostId(eth.getSourceMAC(), vlan);
MacAddress destMac = eth.getDestinationMAC();
// Receives a location probe. Invalid entry from the cache
if (multihomingEnabled && destMac.isOnos() && !MacAddress.NONE.equals(destMac)) {
log.info("Receives probe for {}/{} on {}", srcMac, vlan, heardOn);
providerService.removePendingHostLocation(destMac);
return;
}
// ARP: possible new hosts, update both location and IP
if (eth.getEtherType() == Ethernet.TYPE_ARP) {

View File

@ -345,42 +345,64 @@ public class ARP extends BasePacket {
*
* @param senderMacAddress the mac address of the sender
* @param senderIpAddress the ip address of the sender
* @param targetAddress the address to resolve
* @param targetMacAddress the mac address of the target
* @param targetIpAddress the ip address to resolve
* @param destinationMacAddress the mac address put in Ethernet header
* @param vlanId the vlan id
* @return the Ethernet frame containing the ARP request
*/
public static Ethernet buildArpRequest(byte[] senderMacAddress,
byte[] senderIpAddress,
byte[] targetAddress,
byte[] targetMacAddress,
byte[] targetIpAddress,
byte[] destinationMacAddress,
short vlanId) {
if (senderMacAddress.length != MacAddress.MAC_ADDRESS_LENGTH ||
senderIpAddress.length != Ip4Address.BYTE_LENGTH ||
targetAddress.length != Ip4Address.BYTE_LENGTH) {
targetIpAddress.length != Ip4Address.BYTE_LENGTH) {
return null;
}
ARP arpRequest = new ARP();
arpRequest.setHardwareType(ARP.HW_TYPE_ETHERNET)
.setProtocolType(ARP.PROTO_TYPE_IP)
.setHardwareAddressLength(
(byte) Ethernet.DATALAYER_ADDRESS_LENGTH)
.setHardwareAddressLength((byte) Ethernet.DATALAYER_ADDRESS_LENGTH)
.setProtocolAddressLength((byte) Ip4Address.BYTE_LENGTH)
.setOpCode(ARP.OP_REQUEST)
.setSenderHardwareAddress(senderMacAddress)
.setTargetHardwareAddress(MacAddress.ZERO.toBytes())
.setTargetHardwareAddress(targetMacAddress)
.setSenderProtocolAddress(senderIpAddress)
.setTargetProtocolAddress(targetAddress);
.setTargetProtocolAddress(targetIpAddress);
Ethernet eth = new Ethernet();
eth.setDestinationMACAddress(MacAddress.BROADCAST.toBytes())
eth.setDestinationMACAddress(destinationMacAddress)
.setSourceMACAddress(senderMacAddress)
.setEtherType(Ethernet.TYPE_ARP)
.setVlanID(vlanId)
.setPad(true)
.setPayload(arpRequest);
return eth;
}
/**
* Builds an ARP request using the supplied parameters.
*
* @param senderMacAddress the mac address of the sender
* @param senderIpAddress the ip address of the sender
* @param targetIpAddress the ip address to resolve
* @param vlanId the vlan id
* @return the Ethernet frame containing the ARP request
*/
public static Ethernet buildArpRequest(byte[] senderMacAddress,
byte[] senderIpAddress,
byte[] targetIpAddress,
short vlanId) {
return buildArpRequest(senderMacAddress, senderIpAddress,
MacAddress.ZERO.toBytes(), targetIpAddress,
MacAddress.BROADCAST.toBytes(), vlanId);
}
/**
* Builds an ARP reply based on a request.
*