From 64a1c8e76ea29bc56502cd443f9da3e29dc6e784 Mon Sep 17 00:00:00 2001 From: Charles Chan Date: Wed, 23 Jan 2019 15:03:17 -0800 Subject: [PATCH] Offload packet processing to another thread Also update unit tests Change-Id: Ib94c796083e2d75912f77667d3cfe4ed794694e9 --- .../dhcprelay/DhcpRelayManager.java | 26 ++-- .../dhcprelay/DhcpRelayManagerTest.java | 129 +++++++++--------- .../segmentrouting/SegmentRoutingManager.java | 21 ++- .../SegmentRoutingNeighbourDispatcher.java | 4 + 4 files changed, 99 insertions(+), 81 deletions(-) diff --git a/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java b/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java index bb14e16640..7fe6983d32 100644 --- a/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java +++ b/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java @@ -120,8 +120,9 @@ import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FAC ) public class DhcpRelayManager implements DhcpRelayService { public static final String DHCP_RELAY_APP = "org.onosproject.dhcprelay"; - public static final String ROUTE_STORE_IMPL = - "org.onosproject.routeservice.store.RouteStoreImpl"; + public static final String ROUTE_STORE_IMPL = "org.onosproject.routeservice.store.RouteStoreImpl"; + + private static final int DEFAULT_POOL_SIZE = 32; private static final TrafficSelector ARP_SELECTOR = DefaultTrafficSelector.builder() .matchEthType(Ethernet.TYPE_ARP) @@ -215,6 +216,7 @@ public class DhcpRelayManager implements DhcpRelayService { private ScheduledExecutorService timerExecutor; protected ExecutorService devEventExecutor; + private ExecutorService packetExecutor; protected DeviceListener deviceListener = new InternalDeviceListener(); private DhcpRelayPacketProcessor dhcpRelayPacketProcessor = new DhcpRelayPacketProcessor(); @@ -244,12 +246,10 @@ public class DhcpRelayManager implements DhcpRelayService { packetService.addProcessor(dhcpRelayPacketProcessor, PacketProcessor.director(0)); timerExecutor = Executors.newScheduledThreadPool(1, - groupedThreads("dhcpRelay", - "config-reloader-%d", log)); - timerExecutor.scheduleAtFixedRate(new Dhcp6Timer(), - 0, - dhcpPollInterval, - TimeUnit.SECONDS); + groupedThreads("onos/dhcprelay", "config-reloader-%d", log)); + timerExecutor.scheduleAtFixedRate(new Dhcp6Timer(), 0, dhcpPollInterval, TimeUnit.SECONDS); + packetExecutor = Executors.newFixedThreadPool(DEFAULT_POOL_SIZE, + groupedThreads("onos/dhcprelay", "packet-%d", log)); devEventExecutor = newSingleThreadScheduledExecutor( groupedThreads("onos/dhcprelay-dev-events", "events-%d", log)); @@ -263,8 +263,6 @@ public class DhcpRelayManager implements DhcpRelayService { deviceService.addListener(deviceListener); - - log.info("DHCP-RELAY Started"); } @@ -279,6 +277,9 @@ public class DhcpRelayManager implements DhcpRelayService { timerExecutor.shutdown(); devEventExecutor.shutdownNow(); devEventExecutor = null; + packetExecutor.shutdown(); + timerExecutor = null; + packetExecutor = null; log.info("DHCP-RELAY Stopped"); } @@ -505,9 +506,12 @@ public class DhcpRelayManager implements DhcpRelayService { private class DhcpRelayPacketProcessor implements PacketProcessor { - @Override public void process(PacketContext context) { + packetExecutor.execute(() -> processInternal(context)); + } + + private void processInternal(PacketContext context) { // process the packet and get the payload Ethernet packet = context.inPacket().parsed(); if (packet == null) { diff --git a/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java b/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java index ba943f937e..afda3bdfcc 100644 --- a/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java +++ b/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java @@ -104,9 +104,6 @@ import org.onosproject.net.packet.PacketProcessor; import org.onosproject.net.packet.PacketServiceAdapter; import org.onosproject.store.StoreDelegate; - - - import org.osgi.service.component.ComponentContext; import org.onlab.packet.DHCP6; import org.onlab.packet.IPv6; @@ -129,6 +126,7 @@ import static org.onosproject.dhcprelay.DhcpRelayManager.DHCP_RELAY_APP; public class DhcpRelayManagerTest { private static final int EVENT_PROCESSING_MS = 1000; + private static final int PKT_PROCESSING_MS = 500; private static final short VLAN_LEN = 2; private static final short SEPARATOR_LEN = 1; private static final String CONFIG_FILE_PATH = "dhcp-relay.json"; @@ -399,7 +397,7 @@ public class DhcpRelayManagerTest { verify(mockHostProviderService); reset(mockHostProviderService); - assertEquals(0, mockRouteStore.routes.size()); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(0, mockRouteStore.routes.size())); HostId expectHostId = HostId.hostId(CLIENT_MAC, CLIENT_VLAN); Capture capturedHostDesc = newCapture(); @@ -409,15 +407,15 @@ public class DhcpRelayManagerTest { packetService.processPacket(new TestDhcpAckPacketContext(CLIENT_CP, CLIENT_MAC, CLIENT_VLAN, INTERFACE_IP.ipAddress().getIp4Address(), false)); - verify(mockHostProviderService); - assertEquals(0, mockRouteStore.routes.size()); + assertAfter(PKT_PROCESSING_MS, () -> verify(mockHostProviderService)); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(0, mockRouteStore.routes.size())); HostDescription host = capturedHostDesc.getValue(); - assertEquals(false, host.configured()); - assertEquals(CLIENT_CP.deviceId(), host.location().elementId()); - assertEquals(CLIENT_CP.port(), host.location().port()); - assertEquals(1, host.ipAddress().size()); - assertEquals(IP_FOR_CLIENT, host.ipAddress().iterator().next()); + assertAfter(PKT_PROCESSING_MS, () -> assertFalse(host.configured())); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(CLIENT_CP.deviceId(), host.location().elementId())); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(CLIENT_CP.port(), host.location().port())); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(1, host.ipAddress().size())); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(IP_FOR_CLIENT, host.ipAddress().iterator().next())); } /** @@ -429,12 +427,16 @@ public class DhcpRelayManagerTest { // Assume outer dhcp relay agent exists in store already // send request packetService.processPacket(new TestDhcpRequestPacketContext(CLIENT2_MAC, - CLIENT2_VLAN, - CLIENT2_CP, - INTERFACE_IP.ipAddress().getIp4Address(), - true)); + CLIENT2_VLAN, + CLIENT2_CP, + INTERFACE_IP.ipAddress().getIp4Address(), + true)); // No routes assertEquals(0, mockRouteStore.routes.size()); + + // Make sure the REQUEST packet has been processed before start sending ACK + assertAfter(PKT_PROCESSING_MS, () -> assertNotNull(packetService.emittedPacket)); + // send ack packetService.processPacket(new TestDhcpAckPacketContext(CLIENT2_CP, CLIENT2_MAC, @@ -445,13 +447,12 @@ public class DhcpRelayManagerTest { // won't trigger the host provider service verify(mockHostProviderService); reset(mockHostProviderService); - - assertEquals(1, mockRouteStore.routes.size()); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(1, mockRouteStore.routes.size())); Route route = mockRouteStore.routes.get(0); - assertEquals(OUTER_RELAY_IP, route.nextHop()); - assertEquals(IP_FOR_CLIENT.toIpPrefix(), route.prefix()); - assertEquals(Route.Source.DHCP, route.source()); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(OUTER_RELAY_IP, route.nextHop())); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(IP_FOR_CLIENT.toIpPrefix(), route.prefix())); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(Route.Source.DHCP, route.source())); } @Test @@ -465,25 +466,28 @@ public class DhcpRelayManagerTest { CLIENT2_CP, INTERFACE_IP.ipAddress().getIp4Address(), true)); + assertAfter(PKT_PROCESSING_MS, () -> assertNotNull(packetService.emittedPacket)); OutboundPacket outPacket = packetService.emittedPacket; byte[] outData = outPacket.data().array(); Ethernet eth = Ethernet.deserializer().deserialize(outData, 0, outData.length); IPv4 ip = (IPv4) eth.getPayload(); UDP udp = (UDP) ip.getPayload(); DHCP dhcp = (DHCP) udp.getPayload(); - assertEquals(RELAY_AGENT_IP.toInt(), dhcp.getGatewayIPAddress()); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(RELAY_AGENT_IP.toInt(), dhcp.getGatewayIPAddress())); } @Test public void testArpRequest() throws Exception { packetService.processPacket(new TestArpRequestPacketContext(CLIENT_INTERFACE)); + assertAfter(PKT_PROCESSING_MS, () -> assertNotNull(packetService.emittedPacket)); OutboundPacket outboundPacket = packetService.emittedPacket; byte[] outPacketData = outboundPacket.data().array(); Ethernet eth = Ethernet.deserializer().deserialize(outPacketData, 0, outPacketData.length); - assertEquals(eth.getEtherType(), Ethernet.TYPE_ARP); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(eth.getEtherType(), Ethernet.TYPE_ARP)); ARP arp = (ARP) eth.getPayload(); - assertArrayEquals(arp.getSenderHardwareAddress(), CLIENT_INTERFACE.mac().toBytes()); + assertAfter(PKT_PROCESSING_MS, () -> + assertArrayEquals(arp.getSenderHardwareAddress(), CLIENT_INTERFACE.mac().toBytes())); } /** @@ -665,7 +669,7 @@ public class DhcpRelayManagerTest { verify(mockHostProviderService); reset(mockHostProviderService); - assertEquals(0, mockRouteStore.routes.size()); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(0, mockRouteStore.routes.size())); Capture capturedHostDesc = newCapture(); mockHostProviderService.hostDetected(eq(HostId.hostId(CLIENT_MAC, CLIENT_VLAN)), @@ -677,15 +681,15 @@ public class DhcpRelayManagerTest { CLIENT_VLAN, INTERFACE_IP_V6.ipAddress().getIp6Address(), 0, false, CLIENT_VLAN)); - verify(mockHostProviderService); - assertEquals(0, mockRouteStore.routes.size()); + assertAfter(PKT_PROCESSING_MS, () -> verify(mockHostProviderService)); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(0, mockRouteStore.routes.size())); HostDescription host = capturedHostDesc.getValue(); - assertEquals(CLIENT_VLAN, host.vlan()); - assertEquals(CLIENT_CP.deviceId(), host.location().elementId()); - assertEquals(CLIENT_CP.port(), host.location().port()); - assertEquals(1, host.ipAddress().size()); - assertEquals(IP_FOR_CLIENT_V6, host.ipAddress().iterator().next()); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(CLIENT_VLAN, host.vlan())); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(CLIENT_CP.deviceId(), host.location().elementId())); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(CLIENT_CP.port(), host.location().port())); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(1, host.ipAddress().size())); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(IP_FOR_CLIENT_V6, host.ipAddress().iterator().next())); // send release packetService.processPacket(new TestDhcp6RequestPacketContext(DHCP6.MsgType.RELEASE.value(), @@ -695,7 +699,8 @@ public class DhcpRelayManagerTest { INTERFACE_IP_V6.ipAddress().getIp6Address(), 0)); - assertEquals(null, manager.hostService.getHost(HostId.hostId(CLIENT_MAC, CLIENT_VLAN))); + assertAfter(PKT_PROCESSING_MS, + () -> assertNull(manager.hostService.getHost(HostId.hostId(CLIENT_MAC, CLIENT_VLAN)))); } /** @@ -715,6 +720,9 @@ public class DhcpRelayManagerTest { assertEquals(0, mockRouteStore.routes.size()); + // Make sure the REQUEST packet has been processed before start sending ACK + assertAfter(PKT_PROCESSING_MS, () -> assertNotNull(packetService.emittedPacket)); + // send reply packetService.processPacket(new TestDhcp6ReplyPacketContext(DHCP6.MsgType.REPLY.value(), CLIENT2_CP, CLIENT2_MAC, @@ -725,19 +733,13 @@ public class DhcpRelayManagerTest { // won't trigger the host provider service verify(mockHostProviderService); reset(mockHostProviderService); - assertEquals(2, mockRouteStore.routes.size()); // ipAddress and prefix + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(2, mockRouteStore.routes.size())); // ipAddress and prefix - Route aRoute = mockRouteStore.routes.stream() - .filter(rt -> rt.prefix().contains(IP_FOR_CLIENT_V6)) - .findFirst() - .orElse(null); - assertNotEquals(null, aRoute); + assertAfter(PKT_PROCESSING_MS, () -> + assertTrue(mockRouteStore.routes.stream().anyMatch(rt -> rt.prefix().contains(IP_FOR_CLIENT_V6)))); - aRoute = mockRouteStore.routes.stream() - .filter(rt -> rt.prefix().contains(PREFIX_FOR_CLIENT_V6)) - .findFirst() - .orElse(null); - assertNotEquals(null, aRoute); + assertAfter(PKT_PROCESSING_MS, () -> + assertTrue(mockRouteStore.routes.stream().anyMatch(rt -> rt.prefix().contains(PREFIX_FOR_CLIENT_V6)))); // send release msg packetService.processPacket(new TestDhcp6RequestPacketContext(DHCP6.MsgType.RELEASE.value(), @@ -746,20 +748,13 @@ public class DhcpRelayManagerTest { CLIENT2_CP, OUTER_RELAY_IP_V6, 1)); + assertAfter(PKT_PROCESSING_MS, () -> + assertFalse(mockRouteStore.routes.stream().anyMatch(rt -> rt.prefix().contains(IP_FOR_CLIENT_V6)))); - aRoute = mockRouteStore.routes.stream() - .filter(rt -> rt.prefix().contains(IP_FOR_CLIENT_V6)) - .findFirst() - .orElse(null); - assertEquals(null, aRoute); + assertAfter(PKT_PROCESSING_MS, () -> + assertFalse(mockRouteStore.routes.stream().anyMatch(rt -> rt.prefix().contains(PREFIX_FOR_CLIENT_V6)))); - aRoute = mockRouteStore.routes.stream() - .filter(rt -> rt.prefix().contains(PREFIX_FOR_CLIENT_V6)) - .findFirst() - .orElse(null); - assertEquals(null, aRoute); - - assertEquals(0, mockRouteStore.routes.size()); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(0, mockRouteStore.routes.size())); } @@ -779,7 +774,7 @@ public class DhcpRelayManagerTest { INTERFACE_IP_V6.ipAddress().getIp6Address(), 1)); - assertEquals(0, mockRouteStore.routes.size()); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(0, mockRouteStore.routes.size())); // send reply packetService.processPacket(new TestDhcp6ReplyPacketContext(DHCP6.MsgType.REPLY.value(), @@ -794,7 +789,7 @@ public class DhcpRelayManagerTest { // won't trigger the host provider service verify(mockHostProviderService); reset(mockHostProviderService); - assertEquals(0, mockRouteStore.routes.size()); // ipAddress and prefix + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(0, mockRouteStore.routes.size())); // ipAddress and prefix } @@ -811,13 +806,15 @@ public class DhcpRelayManagerTest { mockHostProviderService.hostDetected(eq(CLIENT_HOST_ID), capture(capturedHostDesc), eq(false)); replay(mockHostProviderService, manager.hostService); packetService.processPacket(packetContext); - verify(mockHostProviderService); + assertAfter(PKT_PROCESSING_MS, () -> verify(mockHostProviderService)); + assertAfter(PKT_PROCESSING_MS, () -> assertTrue(capturedHostDesc.hasCaptured())); HostDescription hostDesc = capturedHostDesc.getValue(); Set hostLocations = hostDesc.locations(); - assertEquals(2, hostLocations.size()); - assertTrue(hostLocations.contains(CLIENT_LOCATION)); - assertTrue(hostLocations.contains(CLIENT_DH_LOCATION)); + + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(2, hostLocations.size())); + assertAfter(PKT_PROCESSING_MS, () -> assertTrue(hostLocations.contains(CLIENT_LOCATION))); + assertAfter(PKT_PROCESSING_MS, () -> assertTrue(hostLocations.contains(CLIENT_DH_LOCATION))); } @Test @@ -848,13 +845,14 @@ public class DhcpRelayManagerTest { expectLastCall().anyTimes(); replay(mockHostProviderService, manager.hostService); packetService.processPacket(packetContext); - verify(mockHostProviderService); + assertAfter(PKT_PROCESSING_MS, () -> verify(mockHostProviderService)); + assertAfter(PKT_PROCESSING_MS, () -> assertTrue(capturedHostDesc.hasCaptured())); HostDescription hostDesc = capturedHostDesc.getValue(); Set hostLocations = hostDesc.locations(); - assertEquals(2, hostLocations.size()); - assertTrue(hostLocations.contains(CLIENT_LOCATION)); - assertTrue(hostLocations.contains(CLIENT_DH_LOCATION)); + assertAfter(PKT_PROCESSING_MS, () -> assertEquals(2, hostLocations.size())); + assertAfter(PKT_PROCESSING_MS, () -> assertTrue(hostLocations.contains(CLIENT_LOCATION))); + assertAfter(PKT_PROCESSING_MS, () -> assertTrue(hostLocations.contains(CLIENT_DH_LOCATION))); } private static class MockDefaultDhcpRelayConfig extends DefaultDhcpRelayConfig { @@ -939,6 +937,7 @@ public class DhcpRelayManagerTest { routes.remove(route); } + @Override public void replaceRoute(Route route) { routes.remove(route); routes.add(route); diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java index 6f541c17ad..a5f8a51bd7 100644 --- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java +++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java @@ -323,6 +323,7 @@ public class SegmentRoutingManager implements SegmentRoutingService { private ScheduledExecutorService routeEventExecutor; private ScheduledExecutorService mcastEventExecutor; private ExecutorService packetExecutor; + ExecutorService neighborExecutor; Map groupHandlerMap = new ConcurrentHashMap<>(); /** @@ -407,6 +408,8 @@ public class SegmentRoutingManager implements SegmentRoutingService { public static final int MIN_DUMMY_VLAN_ID = 2; public static final int MAX_DUMMY_VLAN_ID = 4093; + private static final int DEFAULT_POOL_SIZE = 32; + Instant lastEdgePortEvent = Instant.EPOCH; protected void bindXconnectService(XconnectService xconnectService) { @@ -431,11 +434,17 @@ public class SegmentRoutingManager implements SegmentRoutingService { protected void activate(ComponentContext context) { appId = coreService.registerApplication(APP_NAME); - mainEventExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("sr-event-main", "%d", log)); - hostEventExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("sr-event-host", "%d", log)); - routeEventExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("sr-event-route", "%d", log)); - mcastEventExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("sr-event-mcast", "%d", log)); - packetExecutor = Executors.newSingleThreadExecutor(groupedThreads("sr-packet", "%d", log)); + mainEventExecutor = Executors.newSingleThreadScheduledExecutor( + groupedThreads("onos/sr", "event-main-%d", log)); + hostEventExecutor = Executors.newSingleThreadScheduledExecutor( + groupedThreads("onos/sr", "event-host-%d", log)); + routeEventExecutor = Executors.newSingleThreadScheduledExecutor( + groupedThreads("onos/sr", "event-route-%d", log)); + mcastEventExecutor = Executors.newSingleThreadScheduledExecutor( + groupedThreads("onos/sr", "event-mcast-%d", log)); + packetExecutor = Executors.newSingleThreadExecutor(groupedThreads("onos/sr", "packet-%d", log)); + neighborExecutor = Executors.newFixedThreadPool(DEFAULT_POOL_SIZE, + groupedThreads("onos/sr", "neighbor-%d", log)); log.debug("Creating EC map nsnextobjectivestore"); EventuallyConsistentMapBuilder @@ -598,12 +607,14 @@ public class SegmentRoutingManager implements SegmentRoutingService { routeEventExecutor.shutdown(); mcastEventExecutor.shutdown(); packetExecutor.shutdown(); + neighborExecutor.shutdown(); mainEventExecutor = null; hostEventExecutor = null; routeEventExecutor = null; mcastEventExecutor = null; packetExecutor = null; + neighborExecutor = null; cfgService.removeListener(cfgListener); cfgService.unregisterConfigFactory(deviceConfigFactory); diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingNeighbourDispatcher.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingNeighbourDispatcher.java index f170fae2b8..3cf0e324bb 100644 --- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingNeighbourDispatcher.java +++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingNeighbourDispatcher.java @@ -42,6 +42,10 @@ public class SegmentRoutingNeighbourDispatcher implements NeighbourMessageHandle @Override public void handleMessage(NeighbourMessageContext context, HostService hostService) { + manager.neighborExecutor.execute(() -> handleMessageInternal(context, hostService)); + } + + private void handleMessageInternal(NeighbourMessageContext context, HostService hostService) { log.trace("Received {} packet on {}: {}", context.protocol(), context.inPort(), context.packet()); switch (context.protocol()) {