Fix ONOS-4683 - Don't process device events on the listener thread

Change-Id: Icc465311c2c047dba11bacc69c745bbda55ea714
This commit is contained in:
Ray Milkey 2016-06-09 11:35:00 -07:00 committed by Gerrit Code Review
parent 1499864095
commit d9bbde88b9
3 changed files with 76 additions and 45 deletions

View File

@ -15,9 +15,16 @@
*/ */
package org.onosproject.provider.lldp.impl; package org.onosproject.provider.lldp.impl;
import com.google.common.collect.ImmutableMap; import java.util.Dictionary;
import com.google.common.collect.ImmutableSet; import java.util.EnumSet;
import com.google.common.collect.Maps; import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Deactivate;
@ -26,7 +33,6 @@ import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.Ethernet; import org.onlab.packet.Ethernet;
import org.onlab.util.SharedExecutors;
import org.onosproject.cfg.ComponentConfigService; import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterMetadataService; import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterService; import org.onosproject.cluster.ClusterService;
@ -51,29 +57,24 @@ import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficSelector; import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.TrafficSelector; import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.link.DefaultLinkDescription; import org.onosproject.net.link.DefaultLinkDescription;
import org.onosproject.net.link.ProbedLinkProvider;
import org.onosproject.net.link.LinkProviderRegistry; import org.onosproject.net.link.LinkProviderRegistry;
import org.onosproject.net.link.LinkProviderService; import org.onosproject.net.link.LinkProviderService;
import org.onosproject.net.link.LinkService; import org.onosproject.net.link.LinkService;
import org.onosproject.net.link.ProbedLinkProvider;
import org.onosproject.net.packet.PacketContext; import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketPriority; import org.onosproject.net.packet.PacketPriority;
import org.onosproject.net.packet.PacketProcessor; import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService; import org.onosproject.net.packet.PacketService;
import org.onosproject.net.provider.AbstractProvider; import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId; import org.onosproject.net.provider.ProviderId;
import org.onosproject.provider.lldpcommon.LinkDiscoveryContext;
import org.onosproject.provider.lldpcommon.LinkDiscovery; import org.onosproject.provider.lldpcommon.LinkDiscovery;
import org.onosproject.provider.lldpcommon.LinkDiscoveryContext;
import org.osgi.service.component.ComponentContext; import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.Dictionary; import com.google.common.collect.ImmutableMap;
import java.util.EnumSet; import com.google.common.collect.ImmutableSet;
import java.util.Map; import com.google.common.collect.Maps;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@ -141,6 +142,7 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv
private LinkProviderService providerService; private LinkProviderService providerService;
private ScheduledExecutorService executor; private ScheduledExecutorService executor;
protected ExecutorService eventExecutor;
private boolean shuttingDown = false; private boolean shuttingDown = false;
@ -240,6 +242,7 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv
@Activate @Activate
public void activate(ComponentContext context) { public void activate(ComponentContext context) {
eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/linkevents", "events-%d", log));
shuttingDown = false; shuttingDown = false;
cfgService.registerProperties(getClass()); cfgService.registerProperties(getClass());
appId = coreService.registerApplication(PROVIDER_NAME); appId = coreService.registerApplication(PROVIDER_NAME);
@ -274,6 +277,8 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv
cfgService.unregisterProperties(getClass(), false); cfgService.unregisterProperties(getClass(), false);
disable(); disable();
eventExecutor.shutdownNow();
eventExecutor = null;
log.info("Stopped"); log.info("Stopped");
} }
@ -542,27 +547,30 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv
return; return;
} }
DeviceId deviceId = event.subject(); eventExecutor.execute(() -> {
Device device = deviceService.getDevice(deviceId); DeviceId deviceId = event.subject();
if (device == null) { Device device = deviceService.getDevice(deviceId);
log.debug("Device {} doesn't exist, or isn't there yet", deviceId); if (device == null) {
return; log.debug("Device {} doesn't exist, or isn't there yet", deviceId);
} return;
if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) { }
updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id())); if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) {
} updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id()));
}
});
} }
} }
/** private class DeviceEventProcessor implements Runnable {
* Processes device events.
*/ DeviceEvent event;
private class InternalDeviceListener implements DeviceListener {
DeviceEventProcessor(DeviceEvent event) {
this.event = event;
}
@Override @Override
public void event(DeviceEvent event) { public void run() {
if (event.type() == Type.PORT_STATS_UPDATED) {
return;
}
Device device = event.subject(); Device device = event.subject();
Port port = event.port(); Port port = event.port();
if (device == null) { if (device == null) {
@ -617,6 +625,22 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv
} }
} }
/**
* Processes device events.
*/
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
if (event.type() == Type.PORT_STATS_UPDATED) {
return;
}
Runnable deviceEventProcessor = new DeviceEventProcessor(event);
eventExecutor.execute(deviceEventProcessor);
}
}
/** /**
* Processes incoming packets. * Processes incoming packets.
*/ */
@ -774,7 +798,7 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv
@Override @Override
public void event(NetworkConfigEvent event) { public void event(NetworkConfigEvent event) {
SharedExecutors.getPoolThreadExecutor().execute(() -> { eventExecutor.execute(() -> {
if (event.configClass() == LinkDiscoveryFromDevice.class && if (event.configClass() == LinkDiscoveryFromDevice.class &&
CONFIG_CHANGED.contains(event.type())) { CONFIG_CHANGED.contains(event.type())) {

View File

@ -15,11 +15,15 @@
*/ */
package org.onosproject.provider.lldp.impl; package org.onosproject.provider.lldp.impl;
import com.google.common.collect.ArrayListMultimap; import java.nio.ByteBuffer;
import com.google.common.collect.ImmutableList; import java.util.Collections;
import com.google.common.collect.ImmutableMap; import java.util.HashMap;
import com.google.common.collect.ImmutableSet; import java.util.HashSet;
import com.google.common.collect.Lists; import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -66,14 +70,12 @@ import org.onosproject.net.packet.PacketServiceAdapter;
import org.onosproject.net.provider.ProviderId; import org.onosproject.net.provider.ProviderId;
import org.onosproject.provider.lldpcommon.LinkDiscovery; import org.onosproject.provider.lldpcommon.LinkDiscovery;
import java.nio.ByteBuffer; import com.google.common.collect.ArrayListMultimap;
import java.util.Collections; import com.google.common.collect.ImmutableList;
import java.util.HashMap; import com.google.common.collect.ImmutableMap;
import java.util.HashSet; import com.google.common.collect.ImmutableSet;
import java.util.List; import com.google.common.collect.Lists;
import java.util.Map; import com.google.common.util.concurrent.MoreExecutors;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expect;
@ -146,6 +148,8 @@ public class LldpLinkProviderTest {
provider.activate(null); provider.activate(null);
provider.eventExecutor = MoreExecutors.newDirectExecutorService();
providerService = linkRegistry.registeredProvider(); providerService = linkRegistry.registeredProvider();
} }

View File

@ -255,6 +255,9 @@ public class LinkDiscovery implements TimerTask {
} }
private void sendProbes(Long portNumber) { private void sendProbes(Long portNumber) {
if (context.packetService() == null) {
return;
}
log.trace("Sending probes out to {}@{}", portNumber, device.id()); log.trace("Sending probes out to {}@{}", portNumber, device.id());
OutboundPacket pkt = createOutBoundLldp(portNumber); OutboundPacket pkt = createOutBoundLldp(portNumber);
context.packetService().emit(pkt); context.packetService().emit(pkt);