Implement an option in CPRM to reprogram the flows when device reconnecting

Also remove unused AsyncDeviceFetcher in FibInstaller

Change-Id: I52e778a51854efd6bfe47c56569efa5c27d7c7fb
This commit is contained in:
Charles Chan 2017-02-28 15:15:17 -08:00 committed by Ray Milkey
parent 5deaab5fdc
commit c6d227e242
8 changed files with 135 additions and 61 deletions

View File

@ -22,7 +22,7 @@ import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService; import org.onosproject.net.device.DeviceService;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
@ -36,7 +36,8 @@ public final class AsyncDeviceFetcher {
private DeviceListener listener = new InternalDeviceListener(); private DeviceListener listener = new InternalDeviceListener();
private Map<DeviceId, CompletableFuture<DeviceId>> devices = new ConcurrentHashMap(); private Map<DeviceId, Runnable> onConnect = new ConcurrentHashMap<>();
private Map<DeviceId, Runnable> onDisconnect = new ConcurrentHashMap<>();
private AsyncDeviceFetcher(DeviceService deviceService) { private AsyncDeviceFetcher(DeviceService deviceService) {
this.deviceService = checkNotNull(deviceService); this.deviceService = checkNotNull(deviceService);
@ -48,24 +49,27 @@ public final class AsyncDeviceFetcher {
*/ */
public void shutdown() { public void shutdown() {
deviceService.removeListener(listener); deviceService.removeListener(listener);
devices.clear(); onConnect.clear();
onDisconnect.clear();
} }
/** /**
* Returns a completable future that completes when the device is available * Executes provided callback when given device connects/disconnects.
* for the first time. * @param deviceId device ID
* * @param onConnect callback that will be executed immediately if the device
* @param deviceId ID of the device * is currently online, or when the device becomes online
* @return completable future * @param onDisconnect callback that will be executed when the device becomes offline
*/ */
public CompletableFuture<DeviceId> getDevice(DeviceId deviceId) { void registerCallback(DeviceId deviceId, Runnable onConnect, Runnable onDisconnect) {
CompletableFuture<DeviceId> future = new CompletableFuture<>(); if (onConnect != null) {
return devices.computeIfAbsent(deviceId, deviceId1 -> {
if (deviceService.isAvailable(deviceId)) { if (deviceService.isAvailable(deviceId)) {
future.complete(deviceId); onConnect.run();
} }
return future; this.onConnect.put(deviceId, onConnect);
}); }
if (onDisconnect != null) {
this.onDisconnect.put(deviceId, onDisconnect);
}
} }
/** /**
@ -82,24 +86,23 @@ public final class AsyncDeviceFetcher {
@Override @Override
public void event(DeviceEvent event) { public void event(DeviceEvent event) {
switch (event.type()) { switch (event.type()) {
case DEVICE_ADDED: case DEVICE_ADDED:
case DEVICE_AVAILABILITY_CHANGED: case DEVICE_AVAILABILITY_CHANGED:
if (deviceService.isAvailable(event.subject().id())) {
DeviceId deviceId = event.subject().id(); DeviceId deviceId = event.subject().id();
CompletableFuture<DeviceId> future = devices.get(deviceId); if (deviceService.isAvailable(deviceId)) {
if (future != null) { Optional.ofNullable(onConnect.get(deviceId)).ifPresent(Runnable::run);
future.complete(deviceId); } else {
Optional.ofNullable(onDisconnect.get(deviceId)).ifPresent(Runnable::run);
} }
} break;
break; case DEVICE_UPDATED:
case DEVICE_UPDATED: case DEVICE_REMOVED:
case DEVICE_REMOVED: case DEVICE_SUSPENDED:
case DEVICE_SUSPENDED: case PORT_ADDED:
case PORT_ADDED: case PORT_UPDATED:
case PORT_UPDATED: case PORT_REMOVED:
case PORT_REMOVED: default:
default: break;
break;
} }
} }
} }

View File

@ -53,9 +53,9 @@ public class Router {
private InterfaceService interfaceService; private InterfaceService interfaceService;
private InterfaceListener listener = new InternalInterfaceListener(); private InterfaceListener listener = new InternalInterfaceListener();
private AsyncDeviceFetcher asyncDeviceFetcher; private DeviceService deviceService;
private volatile boolean deviceAvailable = false; private AsyncDeviceFetcher asyncDeviceFetcher;
/** /**
* Creates a new router interface manager. * Creates a new router interface manager.
@ -65,27 +65,26 @@ public class Router {
* @param deviceService device service * @param deviceService device service
* @param provisioner consumer that will provision new interfaces * @param provisioner consumer that will provision new interfaces
* @param unprovisioner consumer that will unprovision old interfaces * @param unprovisioner consumer that will unprovision old interfaces
* @param forceUnprovision force unprovision when the device goes offline
*/ */
public Router(RouterInfo info, public Router(RouterInfo info,
InterfaceService interfaceService, InterfaceService interfaceService,
DeviceService deviceService, DeviceService deviceService,
Consumer<InterfaceProvisionRequest> provisioner, Consumer<InterfaceProvisionRequest> provisioner,
Consumer<InterfaceProvisionRequest> unprovisioner) { Consumer<InterfaceProvisionRequest> unprovisioner,
boolean forceUnprovision) {
this.info = checkNotNull(info); this.info = checkNotNull(info);
this.provisioner = checkNotNull(provisioner); this.provisioner = checkNotNull(provisioner);
this.unprovisioner = checkNotNull(unprovisioner); this.unprovisioner = checkNotNull(unprovisioner);
this.interfaceService = checkNotNull(interfaceService); this.interfaceService = checkNotNull(interfaceService);
this.deviceService = checkNotNull(deviceService);
this.asyncDeviceFetcher = AsyncDeviceFetcher.create(deviceService); this.asyncDeviceFetcher = AsyncDeviceFetcher.create(deviceService);
asyncDeviceFetcher.getDevice(info.deviceId()) if (forceUnprovision) {
.thenAccept(deviceId1 -> { asyncDeviceFetcher.registerCallback(info.deviceId(), this::provision, this::forceUnprovision);
deviceAvailable = true; } else {
provision(); asyncDeviceFetcher.registerCallback(info.deviceId(), this::provision, null);
}).whenComplete((v, t) -> { }
if (t != null) {
log.error("Error provisioning: ", t);
}
});
interfaceService.addListener(listener); interfaceService.addListener(listener);
} }
@ -94,6 +93,8 @@ public class Router {
* Cleans up the router and unprovisions all interfaces. * Cleans up the router and unprovisions all interfaces.
*/ */
public void cleanup() { public void cleanup() {
asyncDeviceFetcher.shutdown();
interfaceService.removeListener(listener); interfaceService.removeListener(listener);
unprovision(); unprovision();
@ -112,8 +113,15 @@ public class Router {
* Changes the router configuration. * Changes the router configuration.
* *
* @param newConfig new configuration * @param newConfig new configuration
* @param forceUnprovision true if we want to force unprovision the device when it goes offline
*/ */
public void changeConfiguration(RouterInfo newConfig) { public void changeConfiguration(RouterInfo newConfig, boolean forceUnprovision) {
if (forceUnprovision) {
asyncDeviceFetcher.registerCallback(info.deviceId(), this::provision, this::forceUnprovision);
} else {
asyncDeviceFetcher.registerCallback(info.deviceId(), this::provision, null);
}
Set<String> oldConfiguredInterfaces = info.interfaces(); Set<String> oldConfiguredInterfaces = info.interfaces();
info = newConfig; info = newConfig;
Set<String> newConfiguredInterfaces = info.interfaces(); Set<String> newConfiguredInterfaces = info.interfaces();
@ -153,18 +161,21 @@ public class Router {
private void provision() { private void provision() {
getInterfacesForDevice(info.deviceId()) getInterfacesForDevice(info.deviceId())
.filter(this::shouldProvision)
.forEach(this::provision); .forEach(this::provision);
} }
private void unprovision() { private void unprovision() {
getInterfacesForDevice(info.deviceId()) getInterfacesForDevice(info.deviceId())
.filter(this::shouldProvision)
.forEach(this::unprovision); .forEach(this::unprovision);
} }
private void forceUnprovision() {
getInterfacesForDevice(info.deviceId())
.forEach(this::forceUnprovision);
}
private void provision(Interface intf) { private void provision(Interface intf) {
if (!provisioned.contains(intf) && shouldProvision(intf)) { if (!provisioned.contains(intf) && deviceAvailable(intf) && shouldProvision(intf)) {
log.info("Provisioning interface {}", intf); log.info("Provisioning interface {}", intf);
provisioner.accept(InterfaceProvisionRequest.of(info, intf)); provisioner.accept(InterfaceProvisionRequest.of(info, intf));
provisioned.add(intf); provisioned.add(intf);
@ -172,16 +183,28 @@ public class Router {
} }
private void unprovision(Interface intf) { private void unprovision(Interface intf) {
if (provisioned.contains(intf)) { if (provisioned.contains(intf) && deviceAvailable(intf) && shouldProvision(intf)) {
log.info("Unprovisioning interface {}", intf); log.info("Unprovisioning interface {}", intf);
unprovisioner.accept(InterfaceProvisionRequest.of(info, intf)); unprovisioner.accept(InterfaceProvisionRequest.of(info, intf));
provisioned.remove(intf); provisioned.remove(intf);
} }
} }
private void forceUnprovision(Interface intf) {
// Skip availability check when force unprovisioning an interface
if (provisioned.contains(intf) && shouldProvision(intf)) {
log.info("Unprovisioning interface {}", intf);
unprovisioner.accept(InterfaceProvisionRequest.of(info, intf));
provisioned.remove(intf);
}
}
private boolean deviceAvailable(Interface intf) {
return deviceService.isAvailable(intf.connectPoint().deviceId());
}
private boolean shouldProvision(Interface intf) { private boolean shouldProvision(Interface intf) {
return deviceAvailable && return info.interfaces().isEmpty() || info.interfaces().contains(intf.name());
(info.interfaces().isEmpty() || info.interfaces().contains(intf.name()));
} }
private Stream<Interface> getInterfacesForDevice(DeviceId deviceId) { private Stream<Interface> getInterfacesForDevice(DeviceId deviceId) {

View File

@ -8,6 +8,7 @@ TEST_DEPS = [
'//lib:TEST_ADAPTERS', '//lib:TEST_ADAPTERS',
'//incubator/api:onos-incubator-api-tests', '//incubator/api:onos-incubator-api-tests',
'//apps/routing-api:onos-apps-routing-api-tests', '//apps/routing-api:onos-apps-routing-api-tests',
'//utils/osgi:onlab-osgi-tests',
] ]
osgi_jar_with_tests ( osgi_jar_with_tests (

View File

@ -27,5 +27,13 @@
<artifactId>onos-apps-routing-cpr</artifactId> <artifactId>onos-apps-routing-cpr</artifactId>
<packaging>bundle</packaging> <packaging>bundle</packaging>
<dependencies>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-osgi</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</project> </project>

View File

@ -22,6 +22,8 @@ import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.EthType; import org.onlab.packet.EthType;
@ -30,7 +32,9 @@ import org.onlab.packet.Ip6Address;
import org.onlab.packet.IpPrefix; import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress; import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId; import org.onlab.packet.VlanId;
import org.onlab.util.Tools;
import org.onosproject.app.ApplicationService; import org.onosproject.app.ApplicationService;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId; import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService; import org.onosproject.core.CoreService;
import org.onosproject.incubator.net.intf.Interface; import org.onosproject.incubator.net.intf.Interface;
@ -63,8 +67,10 @@ import org.onosproject.routing.RoutingService;
import org.onosproject.routing.config.RouterConfigHelper; import org.onosproject.routing.config.RouterConfigHelper;
import org.onosproject.routing.config.RoutersConfig; import org.onosproject.routing.config.RoutersConfig;
import org.onosproject.routing.config.RoutingConfigurationService; import org.onosproject.routing.config.RoutingConfigurationService;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.Dictionary;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -127,6 +133,13 @@ public class ControlPlaneRedirectManager {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected RoutingConfigurationService rs; protected RoutingConfigurationService rs;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService cfgService;
@Property(name = "forceUnprovision", boolValue = false,
label = "Force unprovision when the device goes offline")
private boolean forceUnprovision = false;
private static final String APP_NAME = "org.onosproject.cpr"; private static final String APP_NAME = "org.onosproject.cpr";
private ApplicationId appId; private ApplicationId appId;
@ -139,9 +152,12 @@ public class ControlPlaneRedirectManager {
private final InternalHostListener hostListener = new InternalHostListener(); private final InternalHostListener hostListener = new InternalHostListener();
@Activate @Activate
protected void activate() { protected void activate(ComponentContext context) {
this.appId = coreService.registerApplication(APP_NAME); this.appId = coreService.registerApplication(APP_NAME);
cfgService.registerProperties(getClass());
modified(context);
networkConfigService.addListener(networkConfigListener); networkConfigService.addListener(networkConfigListener);
hostService.addListener(hostListener); hostService.addListener(hostListener);
@ -153,10 +169,34 @@ public class ControlPlaneRedirectManager {
@Deactivate @Deactivate
protected void deactivate() { protected void deactivate() {
cfgService.unregisterProperties(getClass(), false);
networkConfigService.removeListener(networkConfigListener); networkConfigService.removeListener(networkConfigListener);
hostService.removeListener(hostListener); hostService.removeListener(hostListener);
} }
@Modified
protected void modified(ComponentContext context) {
if (context != null) {
readComponentConfiguration(context);
processRouterConfig();
}
}
private void readComponentConfiguration(ComponentContext context) {
Dictionary<?, ?> properties = context.getProperties();
Boolean flag;
flag = Tools.isPropertyEnabled(properties, "forceUnprovision");
if (flag == null) {
log.info("ForceUnprovision is not configured, " +
"using current value of {}", forceUnprovision);
} else {
forceUnprovision = flag;
log.info("Configured. ForceUnprovision is {}",
forceUnprovision ? "enabled" : "disabled");
}
}
/** /**
* Sets up the router interfaces if router config is available. * Sets up the router interfaces if router config is available.
*/ */
@ -174,7 +214,7 @@ public class ControlPlaneRedirectManager {
if (r == null) { if (r == null) {
return createRouter(RouterInfo.from(router)); return createRouter(RouterInfo.from(router));
} else { } else {
r.changeConfiguration(RouterInfo.from(router)); r.changeConfiguration(RouterInfo.from(router), forceUnprovision);
return r; return r;
} }
}); });
@ -198,7 +238,8 @@ public class ControlPlaneRedirectManager {
interfaceService, interfaceService,
deviceService, deviceService,
this::provisionInterface, this::provisionInterface,
this::unprovisionInterface); this::unprovisionInterface,
forceUnprovision);
} }
private void provisionInterface(InterfaceProvisionRequest intf) { private void provisionInterface(InterfaceProvisionRequest intf) {

View File

@ -21,6 +21,7 @@ import org.easymock.EasyMock;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.onlab.osgi.ComponentContextAdapter;
import org.onlab.packet.EthType; import org.onlab.packet.EthType;
import org.onlab.packet.Ip6Address; import org.onlab.packet.Ip6Address;
import org.onlab.packet.IpAddress; import org.onlab.packet.IpAddress;
@ -152,7 +153,7 @@ public class ControlPlaneRedirectManagerTest {
controlPlaneRedirectManager.hostService = createNiceMock(HostService.class); controlPlaneRedirectManager.hostService = createNiceMock(HostService.class);
controlPlaneRedirectManager.mastershipService = mastershipService; controlPlaneRedirectManager.mastershipService = mastershipService;
controlPlaneRedirectManager.applicationService = applicationService; controlPlaneRedirectManager.applicationService = applicationService;
controlPlaneRedirectManager.activate(); controlPlaneRedirectManager.activate(new ComponentContextAdapter());
verify(flowObjectiveService); verify(flowObjectiveService);
} }

View File

@ -65,7 +65,6 @@ import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.flowobjective.ForwardingObjective; import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective; import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.ObjectiveContext; import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.routing.AsyncDeviceFetcher;
import org.onosproject.routing.NextHop; import org.onosproject.routing.NextHop;
import org.onosproject.routing.NextHopGroupKey; import org.onosproject.routing.NextHopGroupKey;
import org.onosproject.routing.RouterInfo; import org.onosproject.routing.RouterInfo;
@ -137,7 +136,6 @@ public class FibInstaller {
private DeviceId deviceId; private DeviceId deviceId;
private Router interfaceManager; private Router interfaceManager;
private AsyncDeviceFetcher asyncDeviceFetcher;
private ApplicationId coreAppId; private ApplicationId coreAppId;
private ApplicationId routerAppId; private ApplicationId routerAppId;
@ -177,8 +175,6 @@ public class FibInstaller {
networkConfigService.addListener(configListener); networkConfigService.addListener(configListener);
asyncDeviceFetcher = AsyncDeviceFetcher.create(deviceService);
processRouterConfig(); processRouterConfig();
applicationService.registerDeactivateHook(fibAppId, () -> cleanUp()); applicationService.registerDeactivateHook(fibAppId, () -> cleanUp());
@ -188,7 +184,6 @@ public class FibInstaller {
@Deactivate @Deactivate
protected void deactivate() { protected void deactivate() {
asyncDeviceFetcher.shutdown();
networkConfigService.removeListener(configListener); networkConfigService.removeListener(configListener);
componentConfigService.unregisterProperties(getClass(), false); componentConfigService.unregisterProperties(getClass(), false);
@ -226,7 +221,7 @@ public class FibInstaller {
interfaceManager = createRouter(RouterInfo.from(routerConfig)); interfaceManager = createRouter(RouterInfo.from(routerConfig));
} else { } else {
interfaceManager.changeConfiguration(RouterInfo.from(routerConfig)); interfaceManager.changeConfiguration(RouterInfo.from(routerConfig), false);
} }
} }
@ -253,7 +248,8 @@ public class FibInstaller {
interfaceService, interfaceService,
deviceService, deviceService,
this::provisionInterface, this::provisionInterface,
this::unprovisionInterface); this::unprovisionInterface,
false);
} }
private void updateRoute(ResolvedRoute route) { private void updateRoute(ResolvedRoute route) {

View File

@ -360,7 +360,8 @@ public class SegmentRoutingManager implements SegmentRoutingService {
"arpEnabled", "false"); "arpEnabled", "false");
compCfgService.preSetProperty("org.onosproject.net.host.impl.HostManager", compCfgService.preSetProperty("org.onosproject.net.host.impl.HostManager",
"greedyLearningIpv6", "true"); "greedyLearningIpv6", "true");
compCfgService.preSetProperty("org.onosproject.routing.cpr.ControlPlaneRedirectManager",
"forceUnprovision", "true");
processor = new InternalPacketProcessor(); processor = new InternalPacketProcessor();
linkListener = new InternalLinkListener(); linkListener = new InternalLinkListener();