Migrate HashedWheelTimer to netty 4

- moved potentially time consuming task to
  shared ScheduledThreadPoolExecutor

Change-Id: I8e77041e0f84bd2bdfd6ae6704f4e39b81c721dd
This commit is contained in:
Yuta HIGUCHI 2017-05-20 23:44:17 -07:00 committed by Yuta HIGUCHI
parent d66fe0c1e2
commit 19afc03a4a
12 changed files with 108 additions and 63 deletions

View File

@ -17,6 +17,7 @@ package org.onosproject.dhcp.impl;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@ -25,8 +26,6 @@ import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.onlab.packet.ARP;
import org.onlab.packet.DHCP;
import org.onlab.packet.DHCPOption;
@ -39,7 +38,7 @@ import org.onlab.packet.MacAddress;
import org.onlab.packet.TpPort;
import org.onlab.packet.UDP;
import org.onlab.packet.VlanId;
import org.onlab.util.Timer;
import org.onlab.util.SharedScheduledExecutors;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
@ -84,6 +83,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_DHCPServerIp;
@ -168,7 +168,7 @@ public class DhcpManager implements DhcpService {
private static MacAddress myMAC = valueOf("4e:4f:4f:4f:4f:4f");
private static final Ip4Address IP_BROADCAST = Ip4Address.valueOf("255.255.255.255");
protected Timeout timeout;
protected ScheduledFuture<?> timeout;
protected static int timerDelay = 2;
@Activate
@ -183,7 +183,7 @@ public class DhcpManager implements DhcpService {
hostProviderService = hostProviderRegistry.register(hostProvider);
packetService.addProcessor(processor, PacketProcessor.director(1));
requestPackets();
timeout = Timer.getTimer().newTimeout(new PurgeListTask(), timerDelay, TimeUnit.MINUTES);
timeout = SharedScheduledExecutors.newTimeout(new PurgeListTask(), timerDelay, TimeUnit.MINUTES);
log.info("Started");
}
@ -195,7 +195,7 @@ public class DhcpManager implements DhcpService {
hostProviderRegistry.unregister(hostProvider);
hostProviderService = null;
cancelPackets();
timeout.cancel();
timeout.cancel(true);
log.info("Stopped");
}
@ -729,10 +729,10 @@ public class DhcpManager implements DhcpService {
}
}
private class PurgeListTask implements TimerTask {
private class PurgeListTask implements Runnable {
@Override
public void run(Timeout to) {
public void run() {
IpAssignment ipAssignment;
Date dateNow = new Date();
@ -750,7 +750,7 @@ public class DhcpManager implements DhcpService {
}
}
}
timeout = Timer.getTimer().newTimeout(new PurgeListTask(), timerDelay, TimeUnit.MINUTES);
timeout = SharedScheduledExecutors.newTimeout(new PurgeListTask(), timerDelay, TimeUnit.MINUTES);
}
}
}

View File

@ -15,8 +15,6 @@
*/
package org.onosproject.net.host.impl;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.onlab.packet.ARP;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IPv6;
@ -26,7 +24,7 @@ import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.packet.ndp.NeighborSolicitation;
import org.onlab.util.Timer;
import org.onlab.util.SharedScheduledExecutors;
import org.onosproject.incubator.net.intf.InterfaceService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Host;
@ -46,6 +44,7 @@ import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@ -56,7 +55,7 @@ import java.util.concurrent.TimeUnit;
* probe for hosts that have not yet been detected (specified by IP address).
* </p>
*/
public class HostMonitor implements TimerTask {
public class HostMonitor implements Runnable {
private Logger log = LoggerFactory.getLogger(getClass());
@ -73,7 +72,7 @@ public class HostMonitor implements TimerTask {
private static final byte[] ZERO_MAC_ADDRESS = MacAddress.ZERO.toBytes();
private long probeRate = DEFAULT_PROBE_RATE;
private Timeout timeout;
private ScheduledFuture<?> timeout;
/**
* Creates a new host monitor.
@ -124,7 +123,7 @@ public class HostMonitor implements TimerTask {
void start() {
synchronized (this) {
if (timeout == null) {
timeout = Timer.getTimer().newTimeout(this, 0, TimeUnit.MILLISECONDS);
timeout = SharedScheduledExecutors.newTimeout(this, 0, TimeUnit.MILLISECONDS);
}
}
}
@ -134,7 +133,7 @@ public class HostMonitor implements TimerTask {
*/
void shutdown() {
synchronized (this) {
timeout.cancel();
timeout.cancel(true);
timeout = null;
}
}
@ -157,11 +156,11 @@ public class HostMonitor implements TimerTask {
}
@Override
public void run(Timeout timeout) throws Exception {
public void run() {
monitoredAddresses.forEach(this::probe);
synchronized (this) {
this.timeout = Timer.getTimer().newTimeout(this, probeRate, TimeUnit.MILLISECONDS);
this.timeout = SharedScheduledExecutors.newTimeout(this, probeRate, TimeUnit.MILLISECONDS);
}
}

View File

@ -131,7 +131,7 @@ public class HostMonitorTest {
hostMonitor.registerHostProvider(hostProvider);
hostMonitor.addMonitoringFor(hostIp);
hostMonitor.run(null);
hostMonitor.run();
verify(hostProvider);
}
@ -176,7 +176,7 @@ public class HostMonitorTest {
hostMonitor = new HostMonitor(packetService, hostManager, interfaceService, edgePortService);
hostMonitor.addMonitoringFor(TARGET_IPV4_ADDR);
hostMonitor.run(null);
hostMonitor.run();
// Check that a packet was sent to our PacketService and that it has
@ -245,7 +245,7 @@ public class HostMonitorTest {
hostMonitor = new HostMonitor(packetService, hostManager, interfaceService, edgePortService);
hostMonitor.addMonitoringFor(TARGET_IPV6_ADDR);
hostMonitor.run(null);
hostMonitor.run();
// Check that a packet was sent to our PacketService and that it has
@ -316,7 +316,7 @@ public class HostMonitorTest {
hostMonitor = new HostMonitor(packetService, hostManager, interfaceService, edgePortService);
hostMonitor.addMonitoringFor(TARGET_IPV4_ADDR);
hostMonitor.run(null);
hostMonitor.run();
// Check that a packet was sent to our PacketService and that it has
@ -386,7 +386,7 @@ public class HostMonitorTest {
hostMonitor = new HostMonitor(packetService, hostManager, interfaceService, edgePortService);
hostMonitor.addMonitoringFor(TARGET_IPV6_ADDR);
hostMonitor.run(null);
hostMonitor.run();
// Check that a packet was sent to our PacketService and that it has

View File

@ -17,8 +17,9 @@ package org.onosproject.provider.lldpcommon;
import com.google.common.collect.Sets;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.onlab.packet.Ethernet;
import org.onlab.packet.MacAddress;
import org.onlab.packet.ONOSLLDP;
@ -109,7 +110,7 @@ public class LinkDiscovery implements TimerTask {
public synchronized void start() {
if (isStopped) {
isStopped = false;
timeout = Timer.getTimer().newTimeout(this, 0, MILLISECONDS);
timeout = Timer.newTimeout(this, 0, MILLISECONDS);
} else {
log.warn("LinkDiscovery started multiple times?");
}
@ -218,7 +219,7 @@ public class LinkDiscovery implements TimerTask {
}
if (!isStopped()) {
timeout = Timer.getTimer().newTimeout(this, context.probeRate(), MILLISECONDS);
timeout = t.timer().newTimeout(this, context.probeRate(), MILLISECONDS);
}
}

View File

@ -16,9 +16,10 @@
package org.onosproject.provider.nil;
import com.google.common.collect.Sets;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.onlab.util.Timer;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
@ -52,7 +53,6 @@ class NullFlowRuleProvider extends NullProviders.AbstractNullProvider
private FlowRuleProviderService providerService;
private HashedWheelTimer timer = Timer.getTimer();
private Timeout timeout;
/**
@ -62,7 +62,7 @@ class NullFlowRuleProvider extends NullProviders.AbstractNullProvider
*/
void start(FlowRuleProviderService providerService) {
this.providerService = providerService;
timeout = timer.newTimeout(new StatisticTask(), 5, TimeUnit.SECONDS);
timeout = Timer.newTimeout(new StatisticTask(), 5, TimeUnit.SECONDS);
}
/**
@ -126,7 +126,7 @@ class NullFlowRuleProvider extends NullProviders.AbstractNullProvider
flowTable.getOrDefault(devId, Collections.emptySet());
providerService.pushFlowMetrics(devId, entries);
}
timeout = timer.newTimeout(to.getTask(), 5, TimeUnit.SECONDS);
timeout = to.timer().newTimeout(to.task(), 5, TimeUnit.SECONDS);
}
}
}

View File

@ -15,9 +15,6 @@
*/
package org.onosproject.provider.nil;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.onlab.packet.Ethernet;
import org.onlab.packet.ICMP;
import org.onlab.util.Timer;
@ -34,6 +31,9 @@ import org.onosproject.net.packet.PacketProvider;
import org.onosproject.net.packet.PacketProviderService;
import org.slf4j.Logger;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -69,7 +69,6 @@ class NullPacketProvider extends NullProviders.AbstractNullProvider
private List<Device> devices;
private int currentDevice = 0;
private HashedWheelTimer timer = Timer.getTimer();
private Timeout timeout;
/**
@ -91,7 +90,7 @@ class NullPacketProvider extends NullProviders.AbstractNullProvider
.collect(Collectors.toList());
adjustRate(packetRate);
timeout = timer.newTimeout(new PacketDriverTask(), INITIAL_DELAY, SECONDS);
timeout = Timer.newTimeout(new PacketDriverTask(), INITIAL_DELAY, SECONDS);
}
/**
@ -103,7 +102,7 @@ class NullPacketProvider extends NullProviders.AbstractNullProvider
boolean needsRestart = delay == 0 && packetRate > 0;
delay = packetRate > 0 ? 1000 / packetRate : 0;
if (needsRestart) {
timeout = timer.newTimeout(new PacketDriverTask(), 1, MILLISECONDS);
timeout = Timer.newTimeout(new PacketDriverTask(), 1, MILLISECONDS);
}
log.info("Settings: packetRate={}, delay={}", packetRate, delay);
}
@ -144,7 +143,7 @@ class NullPacketProvider extends NullProviders.AbstractNullProvider
if (!devices.isEmpty() && !to.isCancelled() && delay > 0) {
sendEvent(devices.get(Math.min(currentDevice, devices.size() - 1)));
currentDevice = (currentDevice + 1) % devices.size();
timeout = timer.newTimeout(to.getTask(), delay, TimeUnit.MILLISECONDS);
timeout = to.timer().newTimeout(to.task(), delay, TimeUnit.MILLISECONDS);
}
}

View File

@ -16,9 +16,6 @@
package org.onosproject.provider.of.group.impl;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.onlab.util.Timer;
import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.onosproject.openflow.controller.RoleState;
@ -27,6 +24,9 @@ import org.projectfloodlight.openflow.protocol.OFGroupStatsRequest;
import org.projectfloodlight.openflow.types.OFGroup;
import org.slf4j.Logger;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.concurrent.TimeUnit;
import static org.slf4j.LoggerFactory.getLogger;
@ -36,7 +36,6 @@ import static org.slf4j.LoggerFactory.getLogger;
*/
public class GroupStatsCollector implements TimerTask {
private final HashedWheelTimer timer = Timer.getTimer();
private final OpenFlowSwitch sw;
private final Logger log = getLogger(getClass());
private final int refreshInterval;
@ -65,7 +64,7 @@ public class GroupStatsCollector implements TimerTask {
if (!this.stopTimer) {
log.trace("Scheduling stats collection in {} seconds for {}",
this.refreshInterval, this.sw.getStringId());
timeout.getTimer().newTimeout(this, refreshInterval,
timeout.timer().newTimeout(this, refreshInterval,
TimeUnit.SECONDS);
}
}
@ -100,7 +99,7 @@ public class GroupStatsCollector implements TimerTask {
*/
public void start() {
log.info("Starting Group Stats collection thread for {}", sw.getStringId());
timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
timeout = Timer.newTimeout(this, 1, TimeUnit.SECONDS);
}
/**

View File

@ -16,15 +16,15 @@
package org.onosproject.provider.of.meter.impl;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.onlab.util.Timer;
import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.onosproject.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFMeterStatsRequest;
import org.slf4j.Logger;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.concurrent.TimeUnit;
import static org.slf4j.LoggerFactory.getLogger;
@ -34,7 +34,6 @@ import static org.slf4j.LoggerFactory.getLogger;
*/
public class MeterStatsCollector implements TimerTask {
private final HashedWheelTimer timer = Timer.getTimer();
private final OpenFlowSwitch sw;
private final Logger log = getLogger(getClass());
private final int refreshInterval;
@ -68,7 +67,7 @@ public class MeterStatsCollector implements TimerTask {
if (!this.stopTimer) {
log.trace("Scheduling stats collection in {} seconds for {}",
this.refreshInterval, this.sw.getStringId());
timeout.getTimer().newTimeout(this, refreshInterval,
timeout.timer().newTimeout(this, refreshInterval,
TimeUnit.SECONDS);
}
}
@ -94,7 +93,7 @@ public class MeterStatsCollector implements TimerTask {
*/
public void start() {
log.info("Starting Meter Stats collection thread for {}", sw.getStringId());
timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
timeout = Timer.newTimeout(this, 1, TimeUnit.SECONDS);
}
/**

View File

@ -19,14 +19,14 @@ package org.onosproject.provider.pcep.tunnel.impl;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.onlab.util.Timer;
import org.onosproject.pcep.api.PcepController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.concurrent.TimeUnit;
/*
@ -39,7 +39,6 @@ public class TunnelStatsCollector implements TimerTask {
protected PcepController controller;
private int refreshInterval;
private final HashedWheelTimer timer = Timer.getTimer();
private String pcepTunnelId;
private Timeout timeout;
@ -47,7 +46,7 @@ public class TunnelStatsCollector implements TimerTask {
/**
* Greate a tunnel status collector object.
* Create a tunnel status collector object.
*
* @param id tunnel whose status data will be collected
* @param refreshInterval time interval for collecting statistic
@ -68,7 +67,7 @@ public class TunnelStatsCollector implements TimerTask {
if (!stopped && !timeout.isCancelled()) {
log.trace("Scheduling stats collection in {} seconds for {}",
this.refreshInterval, pcepTunnelId);
timeout.getTimer().newTimeout(this, refreshInterval, TimeUnit.SECONDS);
timeout.timer().newTimeout(this, refreshInterval, TimeUnit.SECONDS);
}
}
@ -88,7 +87,7 @@ public class TunnelStatsCollector implements TimerTask {
public synchronized void start() {
log.info("Starting Tunnel Stats collection thread for {}", pcepTunnelId);
stopped = false;
timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
timeout = Timer.newTimeout(this, 1, TimeUnit.SECONDS);
}
/**

View File

@ -56,6 +56,11 @@
<artifactId>netty</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>

View File

@ -20,6 +20,9 @@ import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* Utility for managing a set of shared execution resources, such as a single
* thread scheduled executor and thread pool scheduled executor for use by
@ -58,6 +61,20 @@ public final class SharedScheduledExecutors {
return singleThreadExecutor;
}
/**
* Executes one-shot timer task on shared thread pool.
*
* @param task timer task to execute
* @param delay before executing the task
* @param unit of delay
* @return a ScheduledFuture representing pending completion of the task
* and whose get() method will return null upon completion
*/
public static ScheduledFuture<?> newTimeout(Runnable task, long delay, TimeUnit unit) {
return SharedScheduledExecutors.getPoolThreadExecutor()
.schedule(task, delay, unit);
}
/**
* Returns the shared scheduled thread pool executor.
*

View File

@ -15,7 +15,14 @@
*/
package org.onlab.util;
import org.jboss.netty.util.HashedWheelTimer;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import com.google.common.base.Suppliers;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
/**
* Hashed-wheel timer singleton. Care must be taken to shutdown the timer
@ -23,18 +30,37 @@ import org.jboss.netty.util.HashedWheelTimer;
*/
public final class Timer {
private static volatile HashedWheelTimer timer;
private static volatile org.jboss.netty.util.HashedWheelTimer timer;
private static final Supplier<HashedWheelTimer> TIMER =
Suppliers.memoize(HashedWheelTimer::new);
// Ban public construction
private Timer() {
}
/**
* Executes one-shot timer task on shared thread pool.
*
* @param task timer task to execute
* @param delay before executing the task
* @param unit of delay
* @return a handle which is associated with the specified task
*/
public static Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
return TIMER.get().newTimeout(task, delay, unit);
}
/**
* Returns the singleton hashed-wheel timer.
*
* @return hashed-wheel timer
*
* @deprecated in 1.11.0
*/
public static HashedWheelTimer getTimer() {
@Deprecated
public static org.jboss.netty.util.HashedWheelTimer getTimer() {
if (Timer.timer == null) {
initTimer();
}
@ -43,7 +69,8 @@ public final class Timer {
private static synchronized void initTimer() {
if (Timer.timer == null) {
HashedWheelTimer hwTimer = new HashedWheelTimer();
org.jboss.netty.util.HashedWheelTimer hwTimer =
new org.jboss.netty.util.HashedWheelTimer();
hwTimer.start();
Timer.timer = hwTimer;
}