From d9bbde88b9e280261a9b9269d1944b0252dbc81c Mon Sep 17 00:00:00 2001 From: Ray Milkey Date: Thu, 9 Jun 2016 11:35:00 -0700 Subject: [PATCH] Fix ONOS-4683 - Don't process device events on the listener thread Change-Id: Icc465311c2c047dba11bacc69c745bbda55ea714 --- .../provider/lldp/impl/LldpLinkProvider.java | 88 ++++++++++++------- .../lldp/impl/LldpLinkProviderTest.java | 30 ++++--- .../provider/lldpcommon/LinkDiscovery.java | 3 + 3 files changed, 76 insertions(+), 45 deletions(-) diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java index 69449b3b93..d905e13b93 100644 --- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java +++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java @@ -15,9 +15,16 @@ */ package org.onosproject.provider.lldp.impl; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; +import java.util.Dictionary; +import java.util.EnumSet; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -26,7 +33,6 @@ import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.onlab.packet.Ethernet; -import org.onlab.util.SharedExecutors; import org.onosproject.cfg.ComponentConfigService; import org.onosproject.cluster.ClusterMetadataService; import org.onosproject.cluster.ClusterService; @@ -51,29 +57,24 @@ import org.onosproject.net.device.DeviceService; import org.onosproject.net.flow.DefaultTrafficSelector; import org.onosproject.net.flow.TrafficSelector; import org.onosproject.net.link.DefaultLinkDescription; -import org.onosproject.net.link.ProbedLinkProvider; import org.onosproject.net.link.LinkProviderRegistry; import org.onosproject.net.link.LinkProviderService; import org.onosproject.net.link.LinkService; +import org.onosproject.net.link.ProbedLinkProvider; import org.onosproject.net.packet.PacketContext; import org.onosproject.net.packet.PacketPriority; import org.onosproject.net.packet.PacketProcessor; import org.onosproject.net.packet.PacketService; import org.onosproject.net.provider.AbstractProvider; import org.onosproject.net.provider.ProviderId; -import org.onosproject.provider.lldpcommon.LinkDiscoveryContext; import org.onosproject.provider.lldpcommon.LinkDiscovery; +import org.onosproject.provider.lldpcommon.LinkDiscoveryContext; import org.osgi.service.component.ComponentContext; import org.slf4j.Logger; -import java.util.Dictionary; -import java.util.EnumSet; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import static com.google.common.base.Strings.isNullOrEmpty; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; @@ -141,6 +142,7 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv private LinkProviderService providerService; private ScheduledExecutorService executor; + protected ExecutorService eventExecutor; private boolean shuttingDown = false; @@ -240,6 +242,7 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv @Activate public void activate(ComponentContext context) { + eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/linkevents", "events-%d", log)); shuttingDown = false; cfgService.registerProperties(getClass()); appId = coreService.registerApplication(PROVIDER_NAME); @@ -274,6 +277,8 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv cfgService.unregisterProperties(getClass(), false); disable(); + eventExecutor.shutdownNow(); + eventExecutor = null; log.info("Stopped"); } @@ -542,27 +547,30 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv return; } - DeviceId deviceId = event.subject(); - Device device = deviceService.getDevice(deviceId); - if (device == null) { - log.debug("Device {} doesn't exist, or isn't there yet", deviceId); - return; - } - if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) { - updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id())); - } + eventExecutor.execute(() -> { + DeviceId deviceId = event.subject(); + Device device = deviceService.getDevice(deviceId); + if (device == null) { + log.debug("Device {} doesn't exist, or isn't there yet", deviceId); + return; + } + if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) { + updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id())); + } + }); } } - /** - * Processes device events. - */ - private class InternalDeviceListener implements DeviceListener { + private class DeviceEventProcessor implements Runnable { + + DeviceEvent event; + + DeviceEventProcessor(DeviceEvent event) { + this.event = event; + } + @Override - public void event(DeviceEvent event) { - if (event.type() == Type.PORT_STATS_UPDATED) { - return; - } + public void run() { Device device = event.subject(); Port port = event.port(); if (device == null) { @@ -617,6 +625,22 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv } } + /** + * Processes device events. + */ + private class InternalDeviceListener implements DeviceListener { + @Override + public void event(DeviceEvent event) { + if (event.type() == Type.PORT_STATS_UPDATED) { + return; + } + + Runnable deviceEventProcessor = new DeviceEventProcessor(event); + + eventExecutor.execute(deviceEventProcessor); + } + } + /** * Processes incoming packets. */ @@ -774,7 +798,7 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv @Override public void event(NetworkConfigEvent event) { - SharedExecutors.getPoolThreadExecutor().execute(() -> { + eventExecutor.execute(() -> { if (event.configClass() == LinkDiscoveryFromDevice.class && CONFIG_CHANGED.contains(event.type())) { diff --git a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java index fb1bf67a86..811baac2d3 100644 --- a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java +++ b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java @@ -15,11 +15,15 @@ */ package org.onosproject.provider.lldp.impl; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -66,14 +70,12 @@ import org.onosproject.net.packet.PacketServiceAdapter; import org.onosproject.net.provider.ProviderId; import org.onosproject.provider.lldpcommon.LinkDiscovery; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.MoreExecutors; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; @@ -146,6 +148,8 @@ public class LldpLinkProviderTest { provider.activate(null); + provider.eventExecutor = MoreExecutors.newDirectExecutorService(); + providerService = linkRegistry.registeredProvider(); } diff --git a/providers/lldpcommon/src/main/java/org/onosproject/provider/lldpcommon/LinkDiscovery.java b/providers/lldpcommon/src/main/java/org/onosproject/provider/lldpcommon/LinkDiscovery.java index f108cd4b70..1937e076db 100644 --- a/providers/lldpcommon/src/main/java/org/onosproject/provider/lldpcommon/LinkDiscovery.java +++ b/providers/lldpcommon/src/main/java/org/onosproject/provider/lldpcommon/LinkDiscovery.java @@ -255,6 +255,9 @@ public class LinkDiscovery implements TimerTask { } private void sendProbes(Long portNumber) { + if (context.packetService() == null) { + return; + } log.trace("Sending probes out to {}@{}", portNumber, device.id()); OutboundPacket pkt = createOutBoundLldp(portNumber); context.packetService().emit(pkt);