mirror of
https://github.com/opennetworkinglab/onos.git
synced 2025-11-02 09:11:29 +01:00
Shared system timer and executor services - monitoring
Change-Id: Ieaa889447dbcb78e4d27fe7409fae463177372b8
This commit is contained in:
parent
b54e8ba3e9
commit
dc17f7bd28
@ -23,6 +23,7 @@ import org.apache.felix.scr.annotations.Property;
|
||||
import org.apache.felix.scr.annotations.Reference;
|
||||
import org.apache.felix.scr.annotations.ReferenceCardinality;
|
||||
import org.apache.felix.scr.annotations.Service;
|
||||
import org.onlab.metrics.MetricsService;
|
||||
import org.onlab.util.SharedExecutors;
|
||||
import org.onosproject.app.ApplicationService;
|
||||
import org.onosproject.cfg.ComponentConfigService;
|
||||
@ -80,6 +81,9 @@ public class CoreManager implements CoreService {
|
||||
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
|
||||
protected EventDeliveryService eventDeliveryService;
|
||||
|
||||
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
|
||||
protected MetricsService metricsService;
|
||||
|
||||
private static final int DEFAULT_POOL_SIZE = 30;
|
||||
@Property(name = "sharedThreadPoolSize", intValue = DEFAULT_POOL_SIZE,
|
||||
label = "Configure shared pool maximum size ")
|
||||
@ -90,6 +94,12 @@ public class CoreManager implements CoreService {
|
||||
label = "Maximum number of millis an event sink has to process an event")
|
||||
private int maxEventTimeLimit = DEFAULT_EVENT_TIME;
|
||||
|
||||
private static final boolean DEFAULT_PERFORMANCE_CHECK = false;
|
||||
@Property(name = "sharedThreadPerformanceCheck", boolValue = DEFAULT_PERFORMANCE_CHECK,
|
||||
label = "Enable queue performance check on shared pool")
|
||||
private boolean calculatePoolPerformance = DEFAULT_PERFORMANCE_CHECK;
|
||||
|
||||
|
||||
@Activate
|
||||
public void activate() {
|
||||
registerApplication(CORE_APP_NAME);
|
||||
@ -177,8 +187,14 @@ public class CoreManager implements CoreService {
|
||||
log.warn("maxEventTimeLimit must be greater than 1");
|
||||
}
|
||||
|
||||
log.info("Settings: sharedThreadPoolSize={}, maxEventTimeLimit={}",
|
||||
sharedThreadPoolSize, maxEventTimeLimit);
|
||||
Boolean performanceCheck = isPropertyEnabled(properties, "sharedThreadPerformanceCheck");
|
||||
if (performanceCheck != null) {
|
||||
calculatePoolPerformance = performanceCheck;
|
||||
SharedExecutors.setCalculatePoolPerformance(calculatePoolPerformance, metricsService);
|
||||
}
|
||||
|
||||
log.info("Settings: sharedThreadPoolSize={}, maxEventTimeLimit={}, calculatePoolPerformance={}",
|
||||
sharedThreadPoolSize, maxEventTimeLimit, calculatePoolPerformance);
|
||||
}
|
||||
|
||||
|
||||
@ -202,5 +218,26 @@ public class CoreManager implements CoreService {
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check property name is defined and set to true.
|
||||
*
|
||||
* @param properties properties to be looked up
|
||||
* @param propertyName the name of the property to look up
|
||||
* @return value when the propertyName is defined or return null
|
||||
*/
|
||||
private static Boolean isPropertyEnabled(Dictionary<?, ?> properties,
|
||||
String propertyName) {
|
||||
Boolean value = null;
|
||||
try {
|
||||
String s = (String) properties.get(propertyName);
|
||||
value = isNullOrEmpty(s) ? null : s.trim().equals("true");
|
||||
} catch (ClassCastException e) {
|
||||
// No propertyName defined.
|
||||
value = null;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -15,6 +15,10 @@
|
||||
*/
|
||||
package org.onlab.util;
|
||||
|
||||
import org.onlab.metrics.MetricsComponent;
|
||||
import org.onlab.metrics.MetricsFeature;
|
||||
import org.onlab.metrics.MetricsService;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
@ -23,6 +27,9 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import com.codahale.metrics.Timer;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Executor service wrapper for shared executors with safeguards on shutdown
|
||||
@ -34,6 +41,13 @@ class SharedExecutorService implements ExecutorService {
|
||||
|
||||
private ExecutorService executor;
|
||||
|
||||
private MetricsService metricsService = null;
|
||||
|
||||
private MetricsComponent executorMetrics;
|
||||
private Timer queueMetrics = null;
|
||||
private Timer delayMetrics = null;
|
||||
|
||||
|
||||
/**
|
||||
* Creates a wrapper for the given executor service.
|
||||
*
|
||||
@ -63,6 +77,7 @@ class SharedExecutorService implements ExecutorService {
|
||||
oldExecutor.shutdown();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
throw new UnsupportedOperationException(NOT_ALLOWED);
|
||||
@ -91,7 +106,31 @@ class SharedExecutorService implements ExecutorService {
|
||||
|
||||
@Override
|
||||
public <T> Future<T> submit(Callable<T> task) {
|
||||
return executor.submit(task);
|
||||
Counter taskCounter = new Counter();
|
||||
taskCounter.reset();
|
||||
return executor.submit(() -> {
|
||||
T t = null;
|
||||
long queueWaitTime = (long) taskCounter.duration();
|
||||
String className;
|
||||
if (task instanceof CallableExtended) {
|
||||
className = ((CallableExtended) task).getRunnable().getClass().toString();
|
||||
} else {
|
||||
className = task.getClass().toString();
|
||||
}
|
||||
if (queueMetrics != null) {
|
||||
queueMetrics.update(queueWaitTime, TimeUnit.SECONDS);
|
||||
}
|
||||
taskCounter.reset();
|
||||
try {
|
||||
t = task.call();
|
||||
} catch (Exception e) { }
|
||||
long taskwaittime = (long) taskCounter.duration();
|
||||
if (delayMetrics != null) {
|
||||
delayMetrics.update(taskwaittime, TimeUnit.SECONDS);
|
||||
}
|
||||
return t;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -135,4 +174,47 @@ class SharedExecutorService implements ExecutorService {
|
||||
executor.execute(command);
|
||||
}
|
||||
|
||||
public void setCalculatePoolPerformance(boolean calculatePoolPerformance, MetricsService metricsSrv) {
|
||||
this.metricsService = metricsSrv;
|
||||
if (calculatePoolPerformance) {
|
||||
if (metricsService != null) {
|
||||
executorMetrics = metricsService.registerComponent("SharedExecutor");
|
||||
MetricsFeature mf = executorMetrics.registerFeature("*");
|
||||
queueMetrics = metricsService.createTimer(executorMetrics, mf, "Queue");
|
||||
delayMetrics = metricsService.createTimer(executorMetrics, mf, "Delay");
|
||||
}
|
||||
} else {
|
||||
metricsService = null;
|
||||
queueMetrics = null;
|
||||
delayMetrics = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* CallableExtended class is used to get Runnable Object
|
||||
* from Callable Object.
|
||||
*
|
||||
*/
|
||||
class CallableExtended implements Callable {
|
||||
|
||||
private Runnable runnable;
|
||||
|
||||
/**
|
||||
* Wrapper for Callable object .
|
||||
* @param runnable Runnable object
|
||||
*/
|
||||
public CallableExtended(Runnable runnable) {
|
||||
this.runnable = runnable;
|
||||
}
|
||||
public Runnable getRunnable() {
|
||||
return runnable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object call() throws Exception {
|
||||
runnable.run();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -16,6 +16,8 @@
|
||||
|
||||
package org.onlab.util;
|
||||
|
||||
import org.onlab.metrics.MetricsService;
|
||||
|
||||
import java.util.Timer;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
@ -93,6 +95,11 @@ public final class SharedExecutors {
|
||||
"onos-pool-executor-%d")));
|
||||
}
|
||||
|
||||
|
||||
public static void setCalculatePoolPerformance(boolean calculatePoolPerformance, MetricsService metricsService) {
|
||||
poolThreadExecutor.setCalculatePoolPerformance(calculatePoolPerformance, metricsService);
|
||||
}
|
||||
|
||||
/**
|
||||
* Shuts down all shared timers and executors and therefore should be
|
||||
* called only by the framework.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user