[SDFAB-20] Prevent listeners ejection and the stop of the group polling

Offload listeners processing to external executors to prevent
the listener ejection due to time consuming processing

In future, we may want to extend the same fix to the
HostManager and NetworkConfigHostProvider

Additionally, avoid the propagation of the exceptions in GroupDriverProvider
which leads to the cancellation of the peridioc poll task

Change-Id: I8ea4ec9fda1ccc48bbd3855fd443ee8760cbbb60
This commit is contained in:
pierventre 2021-07-09 22:42:17 +02:00 committed by Pier Luigi Ventre
parent df29dc7117
commit 52ef933d5e
6 changed files with 188 additions and 141 deletions

View File

@ -72,6 +72,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -79,6 +80,8 @@ import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
@ -152,6 +155,8 @@ public class SimpleIntManager implements IntService {
}
};
protected ExecutorService eventExecutor;
@Activate
public void activate() {
@ -206,6 +211,10 @@ public class SimpleIntManager implements IntService {
// Bootstrap config for already existing devices.
triggerAllDeviceConfigure();
// Bootstrap core event executor before adding listener
eventExecutor = newSingleThreadScheduledExecutor(groupedThreads(
"onos/int", "events-%d", log));
hostService.addListener(hostListener);
deviceService.addListener(deviceListener);
@ -256,6 +265,8 @@ public class SimpleIntManager implements IntService {
deviceService.getDevices().forEach(d -> cleanupDevice(d.id()));
netcfgService.removeListener(appConfigListener);
netcfgRegistry.unregisterConfigFactory(intAppConfigFactory);
eventExecutor.shutdownNow();
eventExecutor = null;
log.info("Deactivated");
}
@ -503,30 +514,34 @@ public class SimpleIntManager implements IntService {
private class InternalHostListener implements HostListener {
@Override
public void event(HostEvent event) {
final DeviceId deviceId = event.subject().location().deviceId();
triggerDeviceConfigure(deviceId);
eventExecutor.execute(() -> {
final DeviceId deviceId = event.subject().location().deviceId();
triggerDeviceConfigure(deviceId);
});
}
}
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
switch (event.type()) {
case DEVICE_ADDED:
case DEVICE_UPDATED:
case DEVICE_REMOVED:
case DEVICE_SUSPENDED:
case DEVICE_AVAILABILITY_CHANGED:
case PORT_ADDED:
case PORT_UPDATED:
case PORT_REMOVED:
triggerDeviceConfigure(event.subject().id());
return;
case PORT_STATS_UPDATED:
return;
default:
log.warn("Unknown device event type {}", event.type());
}
eventExecutor.execute(() -> {
switch (event.type()) {
case DEVICE_ADDED:
case DEVICE_UPDATED:
case DEVICE_REMOVED:
case DEVICE_SUSPENDED:
case DEVICE_AVAILABILITY_CHANGED:
case PORT_ADDED:
case PORT_UPDATED:
case PORT_REMOVED:
triggerDeviceConfigure(event.subject().id());
return;
case PORT_STATS_UPDATED:
return;
default:
log.warn("Unknown device event type {}", event.type());
}
});
}
}
@ -580,56 +595,56 @@ public class SimpleIntManager implements IntService {
@Override
public void event(NetworkConfigEvent event) {
switch (event.type()) {
case CONFIG_ADDED:
case CONFIG_UPDATED:
event.config()
.map(config -> (IntReportConfig) config)
.ifPresent(config -> {
IntDeviceConfig intDeviceConfig = IntDeviceConfig.builder()
.withMinFlowHopLatencyChangeNs(config.minFlowHopLatencyChangeNs())
.withCollectorPort(config.collectorPort())
.withCollectorIp(config.collectorIp())
.enabled(true)
.build();
setConfig(intDeviceConfig);
eventExecutor.execute(() -> {
if (event.configClass() == IntReportConfig.class) {
switch (event.type()) {
case CONFIG_ADDED:
case CONFIG_UPDATED:
event.config()
.map(config -> (IntReportConfig) config)
.ifPresent(config -> {
IntDeviceConfig intDeviceConfig = IntDeviceConfig.builder()
.withMinFlowHopLatencyChangeNs(config.minFlowHopLatencyChangeNs())
.withCollectorPort(config.collectorPort())
.withCollectorIp(config.collectorIp())
.enabled(true)
.build();
setConfig(intDeviceConfig);
// For each watched subnet, we install two INT rules.
// One match on the source, another match on the destination.
intentMap.clear();
config.watchSubnets().forEach(subnet -> {
IntIntent.Builder intIntentBuilder = IntIntent.builder()
.withReportType(IntIntent.IntReportType.TRACKED_FLOW)
.withReportType(IntIntent.IntReportType.DROPPED_PACKET)
.withReportType(IntIntent.IntReportType.CONGESTED_QUEUE)
.withTelemetryMode(IntIntent.TelemetryMode.POSTCARD);
if (subnet.prefixLength() == 0) {
// Special case, match any packet
installIntIntent(intIntentBuilder
.withSelector(DefaultTrafficSelector.emptySelector())
.build());
} else {
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchIPSrc(subnet)
.build();
installIntIntent(intIntentBuilder.withSelector(selector).build());
selector = DefaultTrafficSelector.builder()
.matchIPDst(subnet)
.build();
installIntIntent(intIntentBuilder.withSelector(selector).build());
}
});
});
break;
// TODO: Support removing INT config.
default:
break;
}
// For each watched subnet, we install two INT rules.
// One match on the source, another match on the destination.
intentMap.clear();
config.watchSubnets().forEach(subnet -> {
IntIntent.Builder intIntentBuilder = IntIntent.builder()
.withReportType(IntIntent.IntReportType.TRACKED_FLOW)
.withReportType(IntIntent.IntReportType.DROPPED_PACKET)
.withReportType(IntIntent.IntReportType.CONGESTED_QUEUE)
.withTelemetryMode(IntIntent.TelemetryMode.POSTCARD);
if (subnet.prefixLength() == 0) {
// Special case, match any packet
installIntIntent(intIntentBuilder
.withSelector(DefaultTrafficSelector.emptySelector())
.build());
} else {
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchIPSrc(subnet)
.build();
installIntIntent(intIntentBuilder.withSelector(selector).build());
selector = DefaultTrafficSelector.builder()
.matchIPDst(subnet)
.build();
installIntIntent(intIntentBuilder.withSelector(selector).build());
}
});
});
break;
// TODO: Support removing INT config.
default:
break;
}
}
});
}
@Override
public boolean isRelevant(NetworkConfigEvent event) {
return event.configClass() == IntReportConfig.class;
}
}
}

View File

@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.After;
@ -81,6 +82,7 @@ import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.onlab.junit.TestTools.assertAfter;
import static org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable.IntFunctionality.POSTCARD;
import static org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable.IntFunctionality.SINK;
import static org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable.IntFunctionality.SOURCE;
@ -151,6 +153,7 @@ public class SimpleIntManagerTest {
manager.hostService = hostService;
manager.netcfgService = networkConfigService;
manager.netcfgRegistry = networkConfigRegistry;
manager.eventExecutor = MoreExecutors.newDirectExecutorService();
expect(coreService.registerApplication(APP_NAME))
.andReturn(APP_ID).anyTimes();
@ -226,7 +229,7 @@ public class SimpleIntManagerTest {
// The INT intent installation order can be random, so we need to collect
// all expected INT intents and check if actual intent exists.
assertEquals(5, intentMap.size());
assertAfter(50, 100, () -> assertEquals(5, intentMap.size()));
intentMap.entrySet().forEach(entry -> {
IntIntent actualIntIntent = entry.getValue().value();
assertTrue(expectedIntIntents.contains(actualIntIntent));

View File

@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
@ -65,6 +66,9 @@ public class RouteMonitor {
private final ScheduledExecutorService reaperExecutor =
newSingleThreadScheduledExecutor(groupedThreads("route/reaper", "", log));
private final ExecutorService eventExecutor = newSingleThreadScheduledExecutor(groupedThreads(
"onos/routemonitor", "events-%d", log));
/**
* Creates a new route monitor.
*
@ -94,6 +98,8 @@ public class RouteMonitor {
public void shutdown() {
stopProcessing();
clusterService.removeListener(clusterListener);
eventExecutor.shutdownNow();
reaperExecutor.shutdownNow();
asyncLock.unlock();
}
@ -145,31 +151,33 @@ public class RouteMonitor {
@Override
public void event(ClusterEvent event) {
switch (event.type()) {
case INSTANCE_DEACTIVATED:
NodeId id = event.subject().id();
log.info("Node {} deactivated", id);
eventExecutor.execute(() -> {
switch (event.type()) {
case INSTANCE_DEACTIVATED:
NodeId id = event.subject().id();
log.info("Node {} deactivated", id);
// DistributedLock is introduced to guarantee that minority nodes won't try to remove
// routes that originated from majority nodes.
// Adding 15 seconds retry for the leadership election to be completed.
asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
if (result != null && result.isPresent()) {
log.debug("Lock obtained. Put {} into removal queue", id);
queue.addOne(id);
asyncLock.unlock();
} else {
log.debug("Fail to obtain lock. Do not remove routes from {}", id);
}
});
break;
case INSTANCE_ADDED:
case INSTANCE_REMOVED:
case INSTANCE_ACTIVATED:
case INSTANCE_READY:
default:
break;
}
// DistributedLock is introduced to guarantee that minority nodes won't try to remove
// routes that originated from majority nodes.
// Adding 15 seconds retry for the leadership election to be completed.
asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
if (result != null && result.isPresent()) {
log.debug("Lock obtained. Put {} into removal queue", id);
queue.addOne(id);
asyncLock.unlock();
} else {
log.debug("Fail to obtain lock. Do not remove routes from {}", id);
}
});
break;
case INSTANCE_ADDED:
case INSTANCE_REMOVED:
case INSTANCE_ACTIVATED:
case INSTANCE_READY:
default:
break;
}
});
}
}
}

View File

@ -117,12 +117,16 @@ public class GroupDriverProvider extends AbstractProvider implements GroupProvid
}
private void pollGroups() {
deviceService.getAvailableDevices().forEach(device -> {
if (mastershipService.isLocalMaster(device.id()) &&
device.is(GroupProgrammable.class)) {
pollDeviceGroups(device.id());
}
});
try {
deviceService.getAvailableDevices().forEach(device -> {
if (mastershipService.isLocalMaster(device.id()) &&
device.is(GroupProgrammable.class)) {
pollDeviceGroups(device.id());
}
});
} catch (Exception e) {
log.warn("Exception thrown while polling groups", e);
}
}
private void pollDeviceGroups(DeviceId deviceId) {

View File

@ -508,26 +508,29 @@ public class MeterManager
@Override
public void event(DeviceEvent event) {
switch (event.type()) {
case DEVICE_REMOVED:
case DEVICE_AVAILABILITY_CHANGED:
DeviceId deviceId = event.subject().id();
if (!deviceService.isAvailable(deviceId)) {
BasicDeviceConfig cfg = netCfgService.getConfig(deviceId, BasicDeviceConfig.class);
//if purgeOnDisconnection is set for the device or it's a global configuration
// lets remove the meters.
boolean purge = cfg != null && cfg.isPurgeOnDisconnectionConfigured() ?
cfg.purgeOnDisconnection() : purgeOnDisconnection;
if (purge) {
log.info("PurgeOnDisconnection is requested for device {}, " +
"removing meters", deviceId);
store.purgeMeter(deviceId);
DeviceId deviceId = event.subject().id();
meterInstallers.execute(() -> {
switch (event.type()) {
case DEVICE_REMOVED:
case DEVICE_AVAILABILITY_CHANGED:
if (!deviceService.isAvailable(deviceId)) {
BasicDeviceConfig cfg = netCfgService.getConfig(deviceId, BasicDeviceConfig.class);
//if purgeOnDisconnection is set for the device or it's a global configuration
// lets remove the meters.
boolean purge = cfg != null && cfg.isPurgeOnDisconnectionConfigured() ?
cfg.purgeOnDisconnection() : purgeOnDisconnection;
if (purge) {
log.info("PurgeOnDisconnection is requested for device {}, " +
"removing meters", deviceId);
store.purgeMeter(deviceId);
}
}
}
break;
default:
break;
}
break;
default:
break;
}
}, deviceId.hashCode());
}
}

View File

@ -52,9 +52,13 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
/**
* Entity that manages gNMI subscription for devices using OpenConfig models and
* that reports relevant events to the core.
@ -78,6 +82,8 @@ class GnmiDeviceStateSubscriber {
private final Striped<Lock> deviceLocks = Striped.lock(30);
private ExecutorService eventExecutor;
GnmiDeviceStateSubscriber(GnmiController gnmiController, DeviceService deviceService,
MastershipService mastershipService,
DeviceProviderService providerService) {
@ -88,6 +94,8 @@ class GnmiDeviceStateSubscriber {
}
public void activate() {
eventExecutor = newSingleThreadScheduledExecutor(groupedThreads(
"onos/gnmi", "events-%d", log));
deviceService.addListener(deviceEventListener);
mastershipService.addListener(mastershipListener);
gnmiController.addListener(gnmiEventListener);
@ -100,6 +108,8 @@ class GnmiDeviceStateSubscriber {
deviceService.removeListener(deviceEventListener);
mastershipService.removeListener(mastershipListener);
gnmiController.removeListener(gnmiEventListener);
eventExecutor.shutdownNow();
eventExecutor = null;
}
private void checkSubscription(DeviceId deviceId) {
@ -248,19 +258,21 @@ class GnmiDeviceStateSubscriber {
@Override
public void event(GnmiEvent event) {
if (!deviceSubscribed.containsKey(event.subject().deviceId())) {
log.warn("Received gNMI event from {}, but we did'nt expect to " +
"be subscribed to it! Discarding event...",
event.subject().deviceId());
return;
}
eventExecutor.execute(() -> {
if (!deviceSubscribed.containsKey(event.subject().deviceId())) {
log.warn("Received gNMI event from {}, but we did'nt expect to " +
"be subscribed to it! Discarding event...",
event.subject().deviceId());
return;
}
log.debug("Received gNMI event {}", event.toString());
if (event.type() == GnmiEvent.Type.UPDATE) {
handleGnmiUpdate((GnmiUpdate) event.subject());
} else {
log.debug("Unsupported gNMI event type: {}", event.type());
}
log.debug("Received gNMI event {}", event.toString());
if (event.type() == GnmiEvent.Type.UPDATE) {
handleGnmiUpdate((GnmiUpdate) event.subject());
} else {
log.debug("Unsupported gNMI event type: {}", event.type());
}
});
}
}
@ -268,7 +280,7 @@ class GnmiDeviceStateSubscriber {
@Override
public void event(MastershipEvent event) {
checkSubscription(event.subject());
eventExecutor.execute(() -> checkSubscription(event.subject()));
}
}
@ -276,18 +288,20 @@ class GnmiDeviceStateSubscriber {
@Override
public void event(DeviceEvent event) {
switch (event.type()) {
case DEVICE_ADDED:
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_UPDATED:
case DEVICE_REMOVED:
case PORT_ADDED:
case PORT_REMOVED:
checkSubscription(event.subject().id());
break;
default:
break;
}
eventExecutor.execute(() -> {
switch (event.type()) {
case DEVICE_ADDED:
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_UPDATED:
case DEVICE_REMOVED:
case PORT_ADDED:
case PORT_REMOVED:
checkSubscription(event.subject().id());
break;
default:
break;
}
});
}
}
}