Flow stats collection optimization - Flag introduced to stop periodic collection and collect only in 3 scenarios (Flows Add/Delete/Mod, Switch Add/Change, Mastership Change)

Change-Id: I1ad143a416f34135a622818c60dbc97310fe905e
This commit is contained in:
Shibu Vijayakumar 2019-07-04 07:13:37 -04:00 committed by Pier Luigi Ventre
parent f7a87a675e
commit 2ef2288f72
8 changed files with 154 additions and 35 deletions

View File

@ -55,4 +55,11 @@ public interface OpenFlowSwitchListener {
* @param response role reply from the switch
*/
void receivedRoleReply(Dpid dpid, RoleState requested, RoleState response);
/**
* Notify that role of the switch changed to Master.
*
* @param dpid the switch for which the role is changed
*/
default void roleChangedToMaster(Dpid dpid) {}
}

View File

@ -333,6 +333,7 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
}
// perform role transition after clearing messages queue
this.role = RoleState.MASTER;
this.agent.roleChangedToMaster(dpid);
}
}

View File

@ -109,4 +109,11 @@ public interface OpenFlowAgent {
* @param response role reply from the switch
*/
void returnRoleReply(Dpid dpid, RoleState requested, RoleState response);
/**
* Notify that role of the switch changed to Master.
*
* @param dpid the switch for which the role is changed
*/
default void roleChangedToMaster(Dpid dpid) {}
}

View File

@ -851,6 +851,13 @@ public class OpenFlowControllerImpl implements OpenFlowController {
l.receivedRoleReply(dpid, requested, response);
}
}
@Override
public void roleChangedToMaster(Dpid dpid) {
for (OpenFlowSwitchListener l : ofSwitchListener) {
l.roleChangedToMaster(dpid);
}
}
}
/**

View File

@ -217,15 +217,21 @@ class FlowStatsCollector implements SwitchDataCollector {
}
public synchronized void start() {
// Initially start polling quickly. Then drop down to configured value
log.debug("Starting Stats collection thread for {}", sw.getStringId());
loadCounter = new SlidingWindowCounter(HIGH_WINDOW);
pauseTask = new PauseTimerTask();
scheduledPauseTask = executorService.scheduleAtFixedRate(pauseTask, 1 * MS,
1 * MS, TimeUnit.MILLISECONDS);
pollTask = new PollTimerTask();
scheduledPollTask = executorService.scheduleAtFixedRate(pollTask, 1 * MS,
pollInterval * MS, TimeUnit.MILLISECONDS);
if (pollInterval > 0) {
pauseTask = new PauseTimerTask();
scheduledPauseTask = executorService.scheduleAtFixedRate(pauseTask, 1 * MS,
1 * MS, TimeUnit.MILLISECONDS);
pollTask = new PollTimerTask();
// Initially start polling quickly. Then drop down to configured value
scheduledPollTask = executorService.scheduleAtFixedRate(pollTask, 1 * MS,
pollInterval * MS, TimeUnit.MILLISECONDS);
} else {
// Trigger the poll only once
pollTask = new PollTimerTask();
executorService.schedule(pollTask, 0, TimeUnit.MILLISECONDS);
}
}
private synchronized void pause() {

View File

@ -112,6 +112,7 @@ public class NewAdaptiveFlowStatsCollector implements SwitchDataCollector {
private long flowMissingXid = NO_FLOW_MISSING_XID;
private FlowRuleService flowRuleService;
private boolean pollPeriodically = true;
/**
* Creates a new adaptive collector for the given switch and default cal_and_poll frequency.
@ -128,6 +129,9 @@ public class NewAdaptiveFlowStatsCollector implements SwitchDataCollector {
flowRuleService = get(FlowRuleService.class);
initMemberVars(pollInterval);
if (pollInterval == -1) {
pollPeriodically = false;
}
}
/**
@ -394,27 +398,49 @@ public class NewAdaptiveFlowStatsCollector implements SwitchDataCollector {
isFirstTimeStart = true;
// Initially start polling quickly. Then drop down to configured value
calAndShortFlowsTask = new CalAndShortFlowsTask();
calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
calAndShortFlowsTask,
1,
calAndPollInterval,
TimeUnit.SECONDS);
midFlowsTask = new MidFlowsTask();
midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
midFlowsTask,
1,
midPollInterval,
TimeUnit.SECONDS);
longFlowsTask = new LongFlowsTask();
longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
longFlowsTask,
1,
longPollInterval,
TimeUnit.SECONDS);
if (pollPeriodically) {
// Initially start polling quickly. Then drop down to configured value
calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
calAndShortFlowsTask,
1,
calAndPollInterval,
TimeUnit.SECONDS);
midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
midFlowsTask,
1,
midPollInterval,
TimeUnit.SECONDS);
longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
longFlowsTask,
1,
longPollInterval,
TimeUnit.SECONDS);
} else {
// Trigger the polls only once
adaptiveFlowStatsScheduler.schedule(
calAndShortFlowsTask,
0,
TimeUnit.SECONDS);
adaptiveFlowStatsScheduler.schedule(
midFlowsTask,
0,
TimeUnit.SECONDS);
adaptiveFlowStatsScheduler.schedule(
longFlowsTask,
0,
TimeUnit.SECONDS);
}
log.info("Started");
}

View File

@ -135,6 +135,7 @@ public class OpenFlowRuleProvider extends AbstractProvider
private static final int MIN_EXPECTED_BYTE_LEN = 56;
private static final int SKIP_BYTES = 4;
private static final boolean DEFAULT_ADAPTIVE_FLOW_SAMPLING = false;
private static final boolean DEFAULT_POLL_STATS_PERIODICALLY = true;
@Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY,
label = "Frequency (in seconds) for polling flow statistics")
@ -144,6 +145,10 @@ public class OpenFlowRuleProvider extends AbstractProvider
label = "Adaptive Flow Sampling is on or off")
private boolean adaptiveFlowSampling = DEFAULT_ADAPTIVE_FLOW_SAMPLING;
@Property(name = "pollStatsPeriodically", boolValue = DEFAULT_POLL_STATS_PERIODICALLY,
label = "Periodic stats collection")
private boolean pollStatsPeriodically = true;
private FlowRuleProviderService providerService;
private final InternalFlowProvider listener = new InternalFlowProvider();
@ -204,7 +209,6 @@ public class OpenFlowRuleProvider extends AbstractProvider
try {
String s = get(properties, "flowPollFrequency");
newFlowPollFrequency = isNullOrEmpty(s) ? flowPollFrequency : Integer.parseInt(s.trim());
} catch (NumberFormatException | ClassCastException e) {
newFlowPollFrequency = flowPollFrequency;
}
@ -213,22 +217,34 @@ public class OpenFlowRuleProvider extends AbstractProvider
flowPollFrequency = newFlowPollFrequency;
adjustRate();
}
log.info("Settings: flowPollFrequency={}", flowPollFrequency);
boolean newAdaptiveFlowSampling;
String s = get(properties, "adaptiveFlowSampling");
newAdaptiveFlowSampling = isNullOrEmpty(s) ? adaptiveFlowSampling : Boolean.parseBoolean(s.trim());
if (newAdaptiveFlowSampling != adaptiveFlowSampling) {
// stop previous collector
stopCollectors();
adaptiveFlowSampling = newAdaptiveFlowSampling;
// create new collectors
createCollectors();
if (pollStatsPeriodically) {
// create new collectors
createCollectors();
}
}
log.info("Settings: adaptiveFlowSampling={}", adaptiveFlowSampling);
boolean newPollStatsPeriodically;
String flag = get(properties, "pollStatsPeriodically");
newPollStatsPeriodically = isNullOrEmpty(flag) ? pollStatsPeriodically : Boolean.parseBoolean(flag.trim());
if (newPollStatsPeriodically != pollStatsPeriodically) {
// stop previous collector
stopCollectors();
pollStatsPeriodically = newPollStatsPeriodically;
if (pollStatsPeriodically) {
createCollectors();
}
}
log.info("Settings: pollStatsPeriodically={}", pollStatsPeriodically);
}
private Cache<Long, InternalCacheEntry> createBatchCache() {
@ -326,6 +342,11 @@ public class OpenFlowRuleProvider extends AbstractProvider
if (collector != null) {
collector.recordEvents(events);
}
if (!pollStatsPeriodically) {
log.debug("Triggering Flow/Table Stats, Flow Add/Del/Mod event, switch : {}", dpid.toString());
triggerStatsCollection(dpid);
}
}
@Override
@ -452,12 +473,43 @@ public class OpenFlowRuleProvider extends AbstractProvider
flowRuleExtPayLoad.payLoad().length > 0;
}
private void triggerStatsCollection(Dpid dpid) {
OpenFlowSwitch sw = controller.getSwitch(dpid);
if (sw == null) {
return;
}
SwitchDataCollector sdc = adaptiveFlowSampling ? afsCollectors.get(dpid) : simpleCollectors.get(dpid);
if (sdc == null) {
if (adaptiveFlowSampling) {
sdc = new NewAdaptiveFlowStatsCollector(driverService, sw, -1);
afsCollectors.put(dpid, (NewAdaptiveFlowStatsCollector) sdc);
} else {
sdc = new FlowStatsCollector(executorService, sw, -1);
simpleCollectors.put(dpid, (FlowStatsCollector) sdc);
}
}
sdc.start();
TableStatisticsCollector tsc = tableStatsCollectors.get(dpid);
if (tsc == null) {
tsc = new TableStatisticsCollector(executorService, sw, -1);
tableStatsCollectors.put(dpid, tsc);
}
tsc.start();
}
private class InternalFlowProvider
implements OpenFlowSwitchListener, OpenFlowEventListener {
@Override
public void switchAdded(Dpid dpid) {
createCollector(controller.getSwitch(dpid));
if (pollStatsPeriodically) {
createCollector(controller.getSwitch(dpid));
} else {
log.debug("Triggering Flow/Table Stats, Switch: {} added, ", dpid.toString());
triggerStatsCollection(dpid);
}
}
@Override
@ -678,6 +730,14 @@ public class OpenFlowRuleProvider extends AbstractProvider
}
}
@Override
public void roleChangedToMaster(Dpid dpid) {
if (!pollStatsPeriodically) {
log.debug("Triggering Flow/Table Stats, Mastership change: {}, ", dpid.toString());
triggerStatsCollection(dpid);
}
}
private DriverHandler getDriver(DeviceId devId) {
Driver driver = driverService.getDriver(devId);
DriverHandler handler = new DefaultDriverHandler(new DefaultDriverData(driver, devId));

View File

@ -87,11 +87,16 @@ class TableStatisticsCollector implements SwitchDataCollector {
}
public synchronized void start() {
// Initially start polling quickly. Then drop down to configured value
log.debug("Starting Table Stats collection thread for {}", sw.getStringId());
task = new InternalTimerTask();
scheduledTask = executorService.scheduleAtFixedRate(task, 1 * MS,
pollInterval * MS, TimeUnit.MILLISECONDS);
if (pollInterval > 0) {
// Initially start polling quickly. Then drop down to configured value
scheduledTask = executorService.scheduleAtFixedRate(task, 1 * MS,
pollInterval * MS, TimeUnit.MILLISECONDS);
} else {
// Trigger the poll only once
executorService.schedule(task, 0, TimeUnit.MILLISECONDS);
}
}
public synchronized void stop() {