From 31d4d38588d6203f5fb131b448dded7afb8e90b4 Mon Sep 17 00:00:00 2001 From: Jordi Ortiz Date: Wed, 19 Jul 2017 10:52:26 +0200 Subject: [PATCH] MeterManager: Use Executor to call provider to mitigate DistributedMeterStore timeouts Change-Id: Ifc25d50d97829c347b3be65cb95848406efdc46d --- .../net/meter/impl/MeterManager.java | 54 +++++++++++++++++-- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java index 2db8ec24ac..f7418ab117 100644 --- a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java +++ b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java @@ -19,6 +19,7 @@ import com.google.common.collect.Maps; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.Service; @@ -54,8 +55,12 @@ import org.slf4j.Logger; import java.util.Collection; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.concurrent.Executors.newFixedThreadPool; +import static org.onlab.util.Tools.groupedThreads; import static org.slf4j.LoggerFactory.getLogger; /** @@ -68,6 +73,16 @@ public class MeterManager implements MeterService, MeterProviderRegistry { private static final String METERCOUNTERIDENTIFIER = "meter-id-counter-%s"; + private static final String NUM_THREAD = "numThreads"; + private static final String WORKER_PATTERN = "installer-%d"; + private static final String GROUP_THREAD_NAME = "onos/meter"; + + private static final int DEFAULT_NUM_THREADS = 4; + @Property(name = NUM_THREAD, + intValue = DEFAULT_NUM_THREADS, + label = "Number of worker threads") + private int numThreads = DEFAULT_NUM_THREADS; + private final Logger log = getLogger(getClass()); private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate(); @@ -85,6 +100,8 @@ public class MeterManager private TriConsumer onComplete; + private ExecutorService executorService; + @Activate public void activate() { store.setDelegate(delegate); @@ -104,6 +121,9 @@ public class MeterManager }); }; + + executorService = newFixedThreadPool(numThreads, + groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log)); log.info("Started"); } @@ -111,6 +131,7 @@ public class MeterManager public void deactivate() { store.unsetDelegate(delegate); eventDispatcher.removeSink(MeterEvent.class); + executorService.shutdown(); log.info("Stopped"); } @@ -308,15 +329,14 @@ public class MeterManager @Override public void notify(MeterEvent event) { DeviceId deviceId = event.subject().deviceId(); - MeterProvider p = getProvider(event.subject().deviceId()); switch (event.type()) { case METER_ADD_REQ: - p.performMeterOperation(deviceId, new MeterOperation(event.subject(), - MeterOperation.Type.ADD)); + executorService.execute(new MeterInstaller(deviceId, event.subject(), + MeterOperation.Type.ADD)); break; case METER_REM_REQ: - p.performMeterOperation(deviceId, new MeterOperation(event.subject(), - MeterOperation.Type.REMOVE)); + executorService.execute(new MeterInstaller(deviceId, event.subject(), + MeterOperation.Type.REMOVE)); break; case METER_ADDED: log.info("Meter added {}", event.subject()); @@ -332,5 +352,29 @@ public class MeterManager } } + /** + * Task that passes the meter down to the provider. + */ + private class MeterInstaller implements Runnable { + private final DeviceId deviceId; + private final Meter meter; + private final MeterOperation.Type op; + + public MeterInstaller(DeviceId deviceId, Meter meter, MeterOperation.Type op) { + this.deviceId = checkNotNull(deviceId); + this.meter = checkNotNull(meter); + this.op = checkNotNull(op); + } + + @Override + public void run() { + MeterProvider p = getProvider(this.deviceId); + if (p == null) { + log.error("Unable to recover {}'s provider", deviceId); + return; + } + p.performMeterOperation(deviceId, new MeterOperation(meter, op)); + } + } }