MeterManager: Use Executor to call provider to mitigate DistributedMeterStore timeouts

Change-Id: Ifc25d50d97829c347b3be65cb95848406efdc46d
This commit is contained in:
Jordi Ortiz 2017-07-19 10:52:26 +02:00 committed by Andrea Campanella
parent fe99be9df0
commit 31d4d38588

View File

@ -19,6 +19,7 @@ import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
@ -54,8 +55,12 @@ import org.slf4j.Logger;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
@ -68,6 +73,16 @@ public class MeterManager
implements MeterService, MeterProviderRegistry {
private static final String METERCOUNTERIDENTIFIER = "meter-id-counter-%s";
private static final String NUM_THREAD = "numThreads";
private static final String WORKER_PATTERN = "installer-%d";
private static final String GROUP_THREAD_NAME = "onos/meter";
private static final int DEFAULT_NUM_THREADS = 4;
@Property(name = NUM_THREAD,
intValue = DEFAULT_NUM_THREADS,
label = "Number of worker threads")
private int numThreads = DEFAULT_NUM_THREADS;
private final Logger log = getLogger(getClass());
private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
@ -85,6 +100,8 @@ public class MeterManager
private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
private ExecutorService executorService;
@Activate
public void activate() {
store.setDelegate(delegate);
@ -104,6 +121,9 @@ public class MeterManager
});
};
executorService = newFixedThreadPool(numThreads,
groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
log.info("Started");
}
@ -111,6 +131,7 @@ public class MeterManager
public void deactivate() {
store.unsetDelegate(delegate);
eventDispatcher.removeSink(MeterEvent.class);
executorService.shutdown();
log.info("Stopped");
}
@ -308,15 +329,14 @@ public class MeterManager
@Override
public void notify(MeterEvent event) {
DeviceId deviceId = event.subject().deviceId();
MeterProvider p = getProvider(event.subject().deviceId());
switch (event.type()) {
case METER_ADD_REQ:
p.performMeterOperation(deviceId, new MeterOperation(event.subject(),
MeterOperation.Type.ADD));
executorService.execute(new MeterInstaller(deviceId, event.subject(),
MeterOperation.Type.ADD));
break;
case METER_REM_REQ:
p.performMeterOperation(deviceId, new MeterOperation(event.subject(),
MeterOperation.Type.REMOVE));
executorService.execute(new MeterInstaller(deviceId, event.subject(),
MeterOperation.Type.REMOVE));
break;
case METER_ADDED:
log.info("Meter added {}", event.subject());
@ -332,5 +352,29 @@ public class MeterManager
}
}
/**
* Task that passes the meter down to the provider.
*/
private class MeterInstaller implements Runnable {
private final DeviceId deviceId;
private final Meter meter;
private final MeterOperation.Type op;
public MeterInstaller(DeviceId deviceId, Meter meter, MeterOperation.Type op) {
this.deviceId = checkNotNull(deviceId);
this.meter = checkNotNull(meter);
this.op = checkNotNull(op);
}
@Override
public void run() {
MeterProvider p = getProvider(this.deviceId);
if (p == null) {
log.error("Unable to recover {}'s provider", deviceId);
return;
}
p.performMeterOperation(deviceId, new MeterOperation(meter, op));
}
}
}