diff --git a/core/api/src/main/java/org/onosproject/net/flow/DefaultTypedFlowEntry.java b/core/api/src/main/java/org/onosproject/net/flow/DefaultTypedFlowEntry.java new file mode 100644 index 0000000000..afceb14e76 --- /dev/null +++ b/core/api/src/main/java/org/onosproject/net/flow/DefaultTypedFlowEntry.java @@ -0,0 +1,122 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onosproject.net.flow; + +import static com.google.common.base.MoreObjects.toStringHelper; + +/** + * Default flow entry class with FlowLiveType value, IMMEDIATE_FLOW, SHORT_FLOW, MID_FLOW, LONG_FLOW. + */ +public class DefaultTypedFlowEntry extends DefaultFlowEntry + implements TypedStoredFlowEntry { + private FlowLiveType liveType; + + /** + * Creates a typed flow entry from flow rule and its statistics, with default flow live type(IMMEDIATE_FLOW). + * + * @param rule the flow rule + * @param state the flow state + * @param life the flow duration since creation + * @param packets the flow packets count + * @param bytes the flow bytes count + * + */ + public DefaultTypedFlowEntry(FlowRule rule, FlowEntryState state, + long life, long packets, long bytes) { + super(rule, state, life, packets, bytes); + this.liveType = FlowLiveType.IMMEDIATE_FLOW; + } + + /** + * Creates a typed flow entry from flow rule, with default flow live type(IMMEDIATE_FLOW). + * + * @param rule the flow rule + * + */ + public DefaultTypedFlowEntry(FlowRule rule) { + super(rule); + this.liveType = FlowLiveType.IMMEDIATE_FLOW; + } + + /** + * Creates a typed flow entry from flow entry, with default flow live type(IMMEDIATE_FLOW). + * + * @param fe the flow entry + * + */ + public DefaultTypedFlowEntry(FlowEntry fe) { + super(fe, fe.state(), fe.life(), fe.packets(), fe.bytes()); + this.liveType = FlowLiveType.IMMEDIATE_FLOW; + } + + /** + * Creates a typed flow entry from flow rule and flow live type. + * + * @param rule the flow rule + * @param liveType the flow live type + * + */ + public DefaultTypedFlowEntry(FlowRule rule, FlowLiveType liveType) { + super(rule); + this.liveType = liveType; + } + + /** + * Creates a typed flow entry from flow entry and flow live type. + * + * @param fe the flow rule + * @param liveType the flow live type + * + */ + public DefaultTypedFlowEntry(FlowEntry fe, FlowLiveType liveType) { + super(fe, fe.state(), fe.life(), fe.packets(), fe.bytes()); + this.liveType = liveType; + } + + /** + * Creates a typed flow entry from flow rule, error code and flow live type. + * + * @param rule the flow rule + * @param errType the flow error type + * @param errCode the flow error code + * @param liveType the flow live type + * + */ + public DefaultTypedFlowEntry(FlowRule rule, int errType, int errCode, FlowLiveType liveType) { + super(rule, errType, errCode); + this.liveType = liveType; + } + + @Override + public FlowLiveType flowLiveType() { + return this.liveType; + } + + @Override + public void setFlowLiveType(FlowLiveType liveType) { + this.liveType = liveType; + } + + @Override + public String toString() { + return toStringHelper(this) + .add("entry", super.toString()) + .add("type", liveType) + .toString(); + } +} + diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java index 8a36a9218a..48aa5047ac 100644 --- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java +++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java @@ -40,6 +40,15 @@ public interface FlowRuleProviderService extends ProviderService flowEntries); + /** + * Pushes the collection of flow entries currently applied on the given + * device without flowMissing process. + * + * @param deviceId device identifier + * @param flowEntries collection of flow rules + */ + void pushFlowMetricsWithoutFlowMissing(DeviceId deviceId, Iterable flowEntries); + /** * Indicates to the core that the requested batch operation has * been completed. diff --git a/core/api/src/main/java/org/onosproject/net/flow/TypedStoredFlowEntry.java b/core/api/src/main/java/org/onosproject/net/flow/TypedStoredFlowEntry.java new file mode 100644 index 0000000000..a93dc071a8 --- /dev/null +++ b/core/api/src/main/java/org/onosproject/net/flow/TypedStoredFlowEntry.java @@ -0,0 +1,65 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onosproject.net.flow; + +/** + * Represents a flow live type for a given flow entry. + */ +public interface TypedStoredFlowEntry extends StoredFlowEntry { + enum FlowLiveType { + /** + * Indicates that this rule has been submitted for addition immediately. + * Not necessarily collecting flow stats. + */ + IMMEDIATE_FLOW, + + /** + * Indicates that this rule has been submitted for a short time. + * Necessarily collecting flow stats every calAndPollInterval. + */ + SHORT_FLOW, + + /** + * Indicates that this rule has been submitted for a mid time. + * Necessarily collecting flow stats every midPollInterval. + */ + MID_FLOW, + + /** + * Indicates that this rule has been submitted for a long time. + * Necessarily collecting flow stats every longPollInterval. + */ + LONG_FLOW, + + /** + * Indicates that this rule has been submitted for UNKNOWN or ERROR. + * Not necessarily collecting flow stats. + */ + UNKNOWN_FLOW + } + + /** + * Gets the flow live type for this entry. + */ + FlowLiveType flowLiveType(); + + /** + * Sets the new flow live type for this entry. + * @param liveType new flow live type. + */ + void setFlowLiveType(FlowLiveType liveType); +} diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java index a1d046c555..9bbd0aaf50 100644 --- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java +++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java @@ -388,6 +388,16 @@ public class FlowRuleManager @Override public void pushFlowMetrics(DeviceId deviceId, Iterable flowEntries) { + pushFlowMetricsInternal(deviceId, flowEntries, true); + } + + @Override + public void pushFlowMetricsWithoutFlowMissing(DeviceId deviceId, Iterable flowEntries) { + pushFlowMetricsInternal(deviceId, flowEntries, false); + } + + private void pushFlowMetricsInternal(DeviceId deviceId, Iterable flowEntries, + boolean useMissingFlow) { Map storedRules = Maps.newHashMap(); store.getFlowEntries(deviceId).forEach(f -> storedRules.put(f, f)); @@ -415,17 +425,20 @@ public class FlowRuleManager continue; } } - for (FlowEntry rule : storedRules.keySet()) { - try { - // there are rules in the store that aren't on the switch - log.debug("Adding rule in store, but not on switch {}", rule); - flowMissing(rule); - } catch (Exception e) { - log.debug("Can't add missing flow rule {}", e.getMessage()); - continue; + + // DO NOT reinstall + if (useMissingFlow) { + for (FlowEntry rule : storedRules.keySet()) { + try { + // there are rules in the store that aren't on the switch + log.debug("Adding rule in store, but not on switch {}", rule); + flowMissing(rule); + } catch (Exception e) { + log.debug("Can't add missing flow rule {}", e.getMessage()); + continue; + } } } - } @Override diff --git a/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java index 99ca6b2b6a..a25b7a8039 100644 --- a/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java +++ b/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java @@ -273,6 +273,7 @@ public class OpenFlowControllerImpl implements OpenFlowController { OFFlowStatsReply.Builder rep = OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply(); rep.setEntries(Lists.newLinkedList(flowStats)); + rep.setXid(reply.getXid()); executorMsgs.submit(new OFMessageHandler(dpid, rep.build())); } break; diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/NewAdaptiveFlowStatsCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/NewAdaptiveFlowStatsCollector.java new file mode 100644 index 0000000000..181ba00df4 --- /dev/null +++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/NewAdaptiveFlowStatsCollector.java @@ -0,0 +1,897 @@ +/* + * Copyright 2015 Open Networking Laboratory + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onosproject.provider.of.flow.impl; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.onosproject.net.flow.DefaultTypedFlowEntry; +import org.onosproject.net.flow.FlowEntry; +import org.onosproject.net.flow.FlowId; +import org.onosproject.net.flow.FlowRule; +import org.onosproject.net.flow.StoredFlowEntry; +import org.onosproject.net.flow.TypedStoredFlowEntry; +import org.onosproject.net.flow.instructions.Instruction; +import org.onosproject.net.flow.instructions.Instructions; +import org.onosproject.openflow.controller.OpenFlowSwitch; +import org.onosproject.openflow.controller.RoleState; +import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest; +import org.projectfloodlight.openflow.protocol.match.Match; +import org.projectfloodlight.openflow.types.OFPort; +import org.projectfloodlight.openflow.types.TableId; +import org.slf4j.Logger; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.onlab.util.Tools.groupedThreads; +import static org.onosproject.net.flow.TypedStoredFlowEntry.*; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Efficiently and adaptively collects flow statistics for the specified switch. + */ +public class NewAdaptiveFlowStatsCollector { + + private final Logger log = getLogger(getClass()); + + private final OpenFlowSwitch sw; + + private ScheduledExecutorService adaptiveFlowStatsScheduler = + Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d")); + private ScheduledFuture calAndShortFlowsThread; + private ScheduledFuture midFlowsThread; + private ScheduledFuture longFlowsThread; + + // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval + private CalAndShortFlowsTask calAndShortFlowsTask; + // Task that collects stats MID flows every 2*calAndPollInterval + private MidFlowsTask midFlowsTask; + // Task that collects stats LONG flows every 3*calAndPollInterval + private LongFlowsTask longFlowsTask; + + private static final int CAL_AND_POLL_TIMES = 1; // must be always 0 + private static final int MID_POLL_TIMES = 2; // variable greater or equal than 1 + private static final int LONG_POLL_TIMES = 3; // variable greater or equal than MID_POLL_TIMES + //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable + // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES + private static final int ENTIRE_POLL_TIMES = 6; + + private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5; + private static final int MIN_CAL_AND_POLL_FREQUENCY = 2; + private static final int MAX_CAL_AND_POLL_FREQUENCY = 60; + + private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY; + private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY; + private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY; + // only used for checking condition at each task if it collects entire flows from a given switch or not + private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY; + + // Number of call count of each Task, + // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask + private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called + private int callCountMidFlowsTask = 0; // increased MID_POLL_TIMES whenever Task is called + private int callCountLongFlowsTask = 0; // increased LONG_POLL_TIMES whenever Task is called + + private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable(); + + private boolean isFirstTimeStart = true; + + public static final long NO_FLOW_MISSING_XID = (-1); + private long flowMissingXid = NO_FLOW_MISSING_XID; + + /** + * Creates a new adaptive collector for the given switch and default cal_and_poll frequency. + * + * @param sw switch to pull + * @param pollInterval cal and immediate poll frequency in seconds + */ + NewAdaptiveFlowStatsCollector(OpenFlowSwitch sw, int pollInterval) { + this.sw = sw; + + initMemberVars(pollInterval); + } + + // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count + private void initMemberVars(int pollInterval) { + if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) { + this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY; + } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) { + this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY; + } else { + this.calAndPollInterval = pollInterval; + } + + calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval; + midPollInterval = MID_POLL_TIMES * calAndPollInterval; + longPollInterval = LONG_POLL_TIMES * calAndPollInterval; + entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval; + + callCountCalAndShortFlowsTask = 0; + callCountMidFlowsTask = 0; + callCountLongFlowsTask = 0; + + flowMissingXid = NO_FLOW_MISSING_XID; + } + + /** + * Adjusts adaptive poll frequency. + * + * @param pollInterval poll frequency in seconds + */ + synchronized void adjustCalAndPollInterval(int pollInterval) { + initMemberVars(pollInterval); + + if (calAndShortFlowsThread != null) { + calAndShortFlowsThread.cancel(false); + } + if (midFlowsThread != null) { + midFlowsThread.cancel(false); + } + if (longFlowsThread != null) { + longFlowsThread.cancel(false); + } + + calAndShortFlowsTask = new CalAndShortFlowsTask(); + calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( + calAndShortFlowsTask, + 0, + calAndPollInterval, + TimeUnit.SECONDS); + + midFlowsTask = new MidFlowsTask(); + midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( + midFlowsTask, + 0, + midPollInterval, + TimeUnit.SECONDS); + + longFlowsTask = new LongFlowsTask(); + longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( + longFlowsTask, + 0, + longPollInterval, + TimeUnit.SECONDS); + + log.debug("calAndPollInterval=" + calAndPollInterval + "is adjusted"); + } + + private class CalAndShortFlowsTask implements Runnable { + @Override + public void run() { + if (sw.getRole() == RoleState.MASTER) { + log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId()); + + if (isFirstTimeStart) { + // isFirstTimeStart, get entire flow stats from a given switch sw + log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}", + sw.getStringId()); + ofFlowStatsRequestAllSend(); + + callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES; + isFirstTimeStart = false; + } else if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) { + // entire_poll_times, get entire flow stats from a given switch sw + log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId()); + ofFlowStatsRequestAllSend(); + + callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES; + //TODO: check flows deleted in switch, but exist in controller flow table, then remove them + // + } else { + calAndShortFlowsTaskInternal(); + callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES; + } + } + } + } + + // send openflow flow stats request message with getting all flow entries to a given switch sw + private void ofFlowStatsRequestAllSend() { + OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest() + .setMatch(sw.factory().matchWildcardAll()) + .setTableId(TableId.ALL) + .setOutPort(OFPort.NO_MASK) + .build(); + + synchronized (this) { + // set the request xid to check the reply in OpenFlowRuleProvider + // After processing the reply of this request message, + // this must be set to NO_FLOW_MISSING_XID(-1) by provider + setFlowMissingXid(request.getXid()); + log.debug("ofFlowStatsRequestAllSend,Request={},for {}", request.toString(), sw.getStringId()); + + sw.sendMsg(request); + } + } + + // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw + private void ofFlowStatsRequestFlowSend(FlowEntry fe) { + // set find match + Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty()).buildMatch(); + // set find tableId + TableId tableId = TableId.of(fe.tableId()); + // set output port + Instruction ins = fe.treatment().allInstructions().stream() + .filter(i -> (i.type() == Instruction.Type.OUTPUT)) + .findFirst() + .orElse(null); + OFPort ofPort = OFPort.NO_MASK; + if (ins != null) { + Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins; + ofPort = OFPort.of((int) ((out.port().toLong()))); + } + + OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest() + .setMatch(match) + .setTableId(tableId) + .setOutPort(ofPort) + .build(); + + synchronized (this) { + if (getFlowMissingXid() != NO_FLOW_MISSING_XID) { + log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet," + + " set no flow missing xid anyway, for {}", + sw.getStringId()); + setFlowMissingXid(NO_FLOW_MISSING_XID); + } + + sw.sendMsg(request); + } + } + + private void calAndShortFlowsTaskInternal() { + deviceFlowTable.checkAndMoveLiveFlowAll(); + + deviceFlowTable.getShortFlows().forEach(fe -> { + ofFlowStatsRequestFlowSend(fe); + }); + } + + private class MidFlowsTask implements Runnable { + @Override + public void run() { + if (sw.getRole() == RoleState.MASTER) { + log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId()); + + // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw + if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) { + callCountMidFlowsTask = MID_POLL_TIMES; + } else { + midFlowsTaskInternal(); + callCountMidFlowsTask += MID_POLL_TIMES; + } + } + } + } + + private void midFlowsTaskInternal() { + deviceFlowTable.getMidFlows().forEach(fe -> { + ofFlowStatsRequestFlowSend(fe); + }); + } + + private class LongFlowsTask implements Runnable { + @Override + public void run() { + if (sw.getRole() == RoleState.MASTER) { + log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId()); + + // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw + if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) { + callCountLongFlowsTask = LONG_POLL_TIMES; + } else { + longFlowsTaskInternal(); + callCountLongFlowsTask += LONG_POLL_TIMES; + } + } + } + } + + private void longFlowsTaskInternal() { + deviceFlowTable.getLongFlows().forEach(fe -> { + ofFlowStatsRequestFlowSend(fe); + }); + } + + /** + * start adaptive flow statistic collection. + * + */ + public synchronized void start() { + log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId()); + callCountCalAndShortFlowsTask = 0; + callCountMidFlowsTask = 0; + callCountLongFlowsTask = 0; + + isFirstTimeStart = true; + + // Initially start polling quickly. Then drop down to configured value + calAndShortFlowsTask = new CalAndShortFlowsTask(); + calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( + calAndShortFlowsTask, + 1, + calAndPollInterval, + TimeUnit.SECONDS); + + midFlowsTask = new MidFlowsTask(); + midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( + midFlowsTask, + 1, + midPollInterval, + TimeUnit.SECONDS); + + longFlowsTask = new LongFlowsTask(); + longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( + longFlowsTask, + 1, + longPollInterval, + TimeUnit.SECONDS); + + log.info("Started"); + } + + /** + * stop adaptive flow statistic collection. + * + */ + public synchronized void stop() { + log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId()); + if (calAndShortFlowsThread != null) { + calAndShortFlowsThread.cancel(true); + } + if (midFlowsThread != null) { + midFlowsThread.cancel(true); + } + if (longFlowsThread != null) { + longFlowsThread.cancel(true); + } + + adaptiveFlowStatsScheduler.shutdownNow(); + + isFirstTimeStart = false; + + log.info("Stopped"); + } + + /** + * add typed flow entry from flow rule into the internal flow table. + * + * @param flowRules the flow rules + * + */ + public synchronized void addWithFlowRule(FlowRule... flowRules) { + for (FlowRule fr : flowRules) { + // First remove old entry unconditionally, if exist + deviceFlowTable.remove(fr); + + // add new flow entry, we suppose IMMEDIATE_FLOW + TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr, + FlowLiveType.IMMEDIATE_FLOW); + deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry); + } + } + + /** + * add or update typed flow entry from flow entry into the internal flow table. + * + * @param flowEntries the flow entries + * + */ + public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) { + for (FlowEntry fe : flowEntries) { + // check if this new rule is an update to an existing entry + TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe); + + if (stored != null) { + // duplicated flow entry is collected!, just skip + if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets() + && fe.life() == stored.life()) { + log.debug("addOrUpdateFlows:, FlowId=" + Long.toHexString(fe.id().value()) + + ",is DUPLICATED stats collection, just skip." + + " AdaptiveStats collection thread for {}", + sw.getStringId()); + + stored.setLastSeen(); + continue; + } else if (fe.life() < stored.life()) { + // Invalid updates the stats values, i.e., bytes, packets, durations ... + log.debug("addOrUpdateFlows():" + + " Invalid Flow Update! The new life is SMALLER than the previous one, jus skip." + + " new flowId=" + Long.toHexString(fe.id().value()) + + ", old flowId=" + Long.toHexString(stored.id().value()) + + ", new bytes=" + fe.bytes() + ", old bytes=" + stored.bytes() + + ", new life=" + fe.life() + ", old life=" + stored.life() + + ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen()); + // go next + stored.setLastSeen(); + continue; + } + + // update now + stored.setLife(fe.life()); + stored.setPackets(fe.packets()); + stored.setBytes(fe.bytes()); + stored.setLastSeen(); + if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) { + // flow is really RULE_ADDED + stored.setState(FlowEntry.FlowEntryState.ADDED); + } + // flow is RULE_UPDATED, skip adding and just updating flow live table + //deviceFlowTable.calAndSetFlowLiveType(stored); + continue; + } + + // add new flow entry, we suppose IMMEDIATE_FLOW + TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe, + FlowLiveType.IMMEDIATE_FLOW); + deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry); + } + } + + /** + * remove typed flow entry from the internal flow table. + * + * @param flowRules the flow entries + * + */ + public synchronized void removeFlows(FlowRule... flowRules) { + for (FlowRule rule : flowRules) { + deviceFlowTable.remove(rule); + } + } + + // same as removeFlows() function + /** + * remove typed flow entry from the internal flow table. + * + * @param flowRules the flow entries + * + */ + public void flowRemoved(FlowRule... flowRules) { + removeFlows(flowRules); + } + + // same as addOrUpdateFlows() function + /** + * add or update typed flow entry from flow entry into the internal flow table. + * + * @param flowEntries the flow entry list + * + */ + public void pushFlowMetrics(List flowEntries) { + flowEntries.forEach(fe -> { + addOrUpdateFlows(fe); + }); + } + + /** + * returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)). + * + */ + public long getFlowMissingXid() { + return flowMissingXid; + } + + /** + * set flowMissingXid, namely OFFlowStatsRequest match any ALL message Id. + * + * @param flowMissingXid the OFFlowStatsRequest message Id + * + */ + public void setFlowMissingXid(long flowMissingXid) { + this.flowMissingXid = flowMissingXid; + } + + private class InternalDeviceFlowTable { + + private final Map> + flowEntries = Maps.newConcurrentMap(); + + private final Set shortFlows = new HashSet<>(); + private final Set midFlows = new HashSet<>(); + private final Set longFlows = new HashSet<>(); + + // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply + private final long latencyFlowStatsRequestAndReplyMillis = 500; + + + // Statistics for table operation + private long addCount = 0, addWithSetFlowLiveTypeCount = 0; + private long removeCount = 0; + + /** + * Resets all count values with zero. + * + */ + public void resetAllCount() { + addCount = 0; + addWithSetFlowLiveTypeCount = 0; + removeCount = 0; + } + + // get set of flow entries for the given flowId + private Set getFlowEntriesInternal(FlowId flowId) { + return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet()); + } + + // get flow entry for the given flow rule + private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) { + Set flowEntries = getFlowEntriesInternal(rule.id()); + return flowEntries.stream() + .filter(entry -> Objects.equal(entry, rule)) + .findAny() + .orElse(null); + } + + // get the flow entries for all flows in flow table + private Set getFlowEntriesInternal() { + Set result = Sets.newHashSet(); + + flowEntries.values().forEach(result::addAll); + return result; + } + + /** + * Gets the number of flow entry in flow table. + * + * @return the number of flow entry. + * + */ + public long getFlowCount() { + return flowEntries.values().stream().mapToLong(Set::size).sum(); + } + + /** + * Gets the number of flow entry in flow table. + * + * @param rule the flow rule + * @return the typed flow entry. + * + */ + public TypedStoredFlowEntry getFlowEntry(FlowRule rule) { + checkNotNull(rule); + + return getFlowEntryInternal(rule); + } + + /** + * Gets the all typed flow entries in flow table. + * + * @return the set of typed flow entry. + * + */ + public Set getFlowEntries() { + return getFlowEntriesInternal(); + } + + /** + * Gets the short typed flow entries in flow table. + * + * @return the set of typed flow entry. + * + */ + public Set getShortFlows() { + return ImmutableSet.copyOf(shortFlows); //Sets.newHashSet(shortFlows); + } + + /** + * Gets the mid typed flow entries in flow table. + * + * @return the set of typed flow entry. + * + */ + public Set getMidFlows() { + return ImmutableSet.copyOf(midFlows); //Sets.newHashSet(midFlows); + } + + /** + * Gets the long typed flow entries in flow table. + * + * @return the set of typed flow entry. + * + */ + public Set getLongFlows() { + return ImmutableSet.copyOf(longFlows); //Sets.newHashSet(longFlows); + } + + /** + * Add typed flow entry into table only. + * + * @param rule the flow rule + * + */ + public synchronized void add(TypedStoredFlowEntry rule) { + checkNotNull(rule); + + //rule have to be new DefaultTypedFlowEntry + boolean result = getFlowEntriesInternal(rule.id()).add(rule); + + if (result) { + addCount++; + } + } + + /** + * Calculates and set the flow live type at the first time, + * and then add it into a corresponding typed flow table. + * + * @param rule the flow rule + * + */ + public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) { + checkNotNull(rule); + + calAndSetFlowLiveTypeInternal(rule); + } + + /** + * Add the typed flow entry into table, and calculates and set the flow live type, + * and then add it into a corresponding typed flow table. + * + * @param rule the flow rule + * + */ + public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) { + checkNotNull(rule); + + //rule have to be new DefaultTypedFlowEntry + boolean result = getFlowEntriesInternal(rule.id()).add(rule); + if (result) { + calAndSetFlowLiveTypeInternal(rule); + addWithSetFlowLiveTypeCount++; + } else { + log.debug("addWithCalAndSetFlowLiveType, FlowId=" + Long.toHexString(rule.id().value()) + + " ADD Failed, cause it may already exists in table !!!," + + " AdaptiveStats collection thread for {}", + sw.getStringId()); + } + } + + // In real, calculates and set the flow live type at the first time, + // and then add it into a corresponding typed flow table + private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) { + long life = rule.life(); + FlowLiveType prevFlowLiveType = rule.flowLiveType(); + + if (life >= longPollInterval) { + rule.setFlowLiveType(FlowLiveType.LONG_FLOW); + longFlows.add(rule); + } else if (life >= midPollInterval) { + rule.setFlowLiveType(FlowLiveType.MID_FLOW); + midFlows.add(rule); + } else if (life >= calAndPollInterval) { + rule.setFlowLiveType(FlowLiveType.SHORT_FLOW); + shortFlows.add(rule); + } else if (life >= 0) { + rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW); + } else { // life < 0 + rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW); + } + + if (rule.flowLiveType() != prevFlowLiveType) { + switch (prevFlowLiveType) { + // delete it from previous flow table + case SHORT_FLOW: + shortFlows.remove(rule); + break; + case MID_FLOW: + midFlows.remove(rule); + break; + case LONG_FLOW: + longFlows.remove(rule); + break; + default: + break; + } + } + } + + + // check the flow live type based on current time, then set and add it into corresponding table + private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) { + long curTime = (cTime > 0 ? cTime : System.currentTimeMillis()); + // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply + long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000); + + // fe.life() unit is SECOND! + long liveTime = fe.life() + fromLastSeen; + + // check flow timeout + if (fe.timeout() > calAndPollInterval && fromLastSeen > fe.timeout()) { + if (!fe.isPermanent()) { + log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value()) + + ", liveType=" + fe.flowLiveType() + + ", liveTime=" + liveTime + + ", life=" + fe.life() + + ", fromLastSeen=" + fromLastSeen + + ", timeout=" + fe.timeout() + + ", isPermanent=" + fe.isPermanent() + + " AdaptiveStats collection thread for {}", + sw.getStringId()); + return false; + } + } + + switch (fe.flowLiveType()) { + case IMMEDIATE_FLOW: + if (liveTime >= longPollInterval) { + fe.setFlowLiveType(FlowLiveType.LONG_FLOW); + longFlows.add(fe); + } else if (liveTime >= midPollInterval) { + fe.setFlowLiveType(FlowLiveType.MID_FLOW); + midFlows.add(fe); + } else if (liveTime >= calAndPollInterval) { + fe.setFlowLiveType(FlowLiveType.SHORT_FLOW); + shortFlows.add(fe); + } + break; + case SHORT_FLOW: + if (liveTime >= longPollInterval) { + fe.setFlowLiveType(FlowLiveType.LONG_FLOW); + shortFlows.remove(fe); + longFlows.add(fe); + } else if (liveTime >= midPollInterval) { + fe.setFlowLiveType(FlowLiveType.MID_FLOW); + shortFlows.remove(fe); + midFlows.add(fe); + } + break; + case MID_FLOW: + if (liveTime >= longPollInterval) { + fe.setFlowLiveType(FlowLiveType.LONG_FLOW); + midFlows.remove(fe); + longFlows.add(fe); + } + break; + case LONG_FLOW: + if (fromLastSeen > entirePollInterval) { + log.trace("checkAndMoveLiveFlowInternal, flow is already removed at switch."); + return false; + } + break; + case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through + default : + // Error Unknown Live Type + log.error("checkAndMoveLiveFlowInternal, Unknown Live Type error!" + + "AdaptiveStats collection thread for {}", + sw.getStringId()); + return false; + } + + log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value()) + + ", state=" + fe.state() + + ", After liveType=" + fe.flowLiveType() + + ", liveTime=" + liveTime + + ", life=" + fe.life() + + ", bytes=" + fe.bytes() + + ", packets=" + fe.packets() + + ", fromLastSeen=" + fromLastSeen + + ", priority=" + fe.priority() + + ", selector=" + fe.selector().criteria() + + ", treatment=" + fe.treatment() + + " AdaptiveStats collection thread for {}", + sw.getStringId()); + + return true; + } + + /** + * Check and move live type for all type flow entries in table at every calAndPollInterval time. + * + */ + public void checkAndMoveLiveFlowAll() { + Set typedFlowEntries = getFlowEntriesInternal(); + + long calCurTime = System.currentTimeMillis(); + typedFlowEntries.forEach(fe -> { + if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) { + remove(fe); + } + }); + + // print table counts for debug + if (log.isDebugEnabled()) { + synchronized (this) { + long totalFlowCount = getFlowCount(); + long shortFlowCount = shortFlows.size(); + long midFlowCount = midFlows.size(); + long longFlowCount = longFlows.size(); + long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount; + long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount; + + log.debug("--------------------------------------------------------------------------- for {}", + sw.getStringId()); + log.debug("checkAndMoveLiveFlowAll, Total Flow_Count=" + totalFlowCount + + ", add - remove_Count=" + calTotalCount + + ", IMMEDIATE_FLOW_Count=" + immediateFlowCount + + ", SHORT_FLOW_Count=" + shortFlowCount + + ", MID_FLOW_Count=" + midFlowCount + + ", LONG_FLOW_Count=" + longFlowCount + + ", add_Count=" + addCount + + ", addWithSetFlowLiveType_Count=" + addWithSetFlowLiveTypeCount + + ", remove_Count=" + removeCount + + " AdaptiveStats collection thread for {}", sw.getStringId()); + log.debug("--------------------------------------------------------------------------- for {}", + sw.getStringId()); + if (totalFlowCount != calTotalCount) { + log.error("checkAndMoveLiveFlowAll, Real total flow count and " + + "calculated total flow count do NOT match, something is wrong internally " + + "or check counter value bound is over!"); + } + if (immediateFlowCount < 0) { + log.error("checkAndMoveLiveFlowAll, IMMEDIATE_FLOW count is negative, " + + "something is wrong internally " + + "or check counter value bound is over!"); + } + } + } + log.trace("checkAndMoveLiveFlowAll, AdaptiveStats for {}", sw.getStringId()); + } + + /** + * Remove the typed flow entry from table. + * + * @param rule the flow rule + * + */ + public synchronized void remove(FlowRule rule) { + checkNotNull(rule); + + TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule); + if (removeStore != null) { + removeLiveFlowsInternal((TypedStoredFlowEntry) removeStore); + boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore); + + if (result) { + removeCount++; + } + } + } + + // Remove the typed flow entry from corresponding table + private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) { + switch (fe.flowLiveType()) { + case IMMEDIATE_FLOW: + // do nothing + break; + case SHORT_FLOW: + shortFlows.remove(fe); + break; + case MID_FLOW: + midFlows.remove(fe); + break; + case LONG_FLOW: + longFlows.remove(fe); + break; + default: // error in Flow Live Type + log.error("removeLiveFlowsInternal, Unknown Live Type error!"); + break; + } + } + } +} diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java index de079e0303..949c65766c 100644 --- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java +++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2014 Open Networking Laboratory + * Copyright 2015 Open Networking Laboratory * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,6 +76,7 @@ import java.util.Timer; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Strings.isNullOrEmpty; import static org.onlab.util.Tools.get; import static org.slf4j.LoggerFactory.getLogger; @@ -99,11 +100,16 @@ public class OpenFlowRuleProvider extends AbstractProvider @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ComponentConfigService cfgService; - private static final int DEFAULT_POLL_FREQUENCY = 10; + private static final int DEFAULT_POLL_FREQUENCY = 5; @Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY, label = "Frequency (in seconds) for polling flow statistics") private int flowPollFrequency = DEFAULT_POLL_FREQUENCY; + private static final boolean DEFAULT_ADAPTIVE_FLOW_SAMPLING = true; + @Property(name = "adaptiveFlowSampling", boolValue = DEFAULT_ADAPTIVE_FLOW_SAMPLING, + label = "Adaptive Flow Sampling is on or off") + private boolean adaptiveFlowSampling = DEFAULT_ADAPTIVE_FLOW_SAMPLING; + private FlowRuleProviderService providerService; private final InternalFlowProvider listener = new InternalFlowProvider(); @@ -111,7 +117,10 @@ public class OpenFlowRuleProvider extends AbstractProvider private Cache pendingBatches; private final Timer timer = new Timer("onos-openflow-collector"); - private final Map collectors = Maps.newHashMap(); + private final Map simpleCollectors = Maps.newHashMap(); + + // NewAdaptiveFlowStatsCollector Set + private final Map afsCollectors = Maps.newHashMap(); /** * Creates an OpenFlow host provider. @@ -128,9 +137,11 @@ public class OpenFlowRuleProvider extends AbstractProvider controller.addEventListener(listener); pendingBatches = createBatchCache(); + createCollectors(); - log.info("Started"); + log.info("Started with flowPollFrequency = {}, adaptiveFlowSampling = {}", + flowPollFrequency, adaptiveFlowSampling); } @Deactivate @@ -161,6 +172,20 @@ public class OpenFlowRuleProvider extends AbstractProvider } log.info("Settings: flowPollFrequency={}", flowPollFrequency); + + boolean newAdaptiveFlowSampling; + String s = get(properties, "adaptiveFlowSampling"); + newAdaptiveFlowSampling = isNullOrEmpty(s) ? adaptiveFlowSampling : Boolean.parseBoolean(s.trim()); + + if (newAdaptiveFlowSampling != adaptiveFlowSampling) { + // stop previous collector + stopCollectors(); + adaptiveFlowSampling = newAdaptiveFlowSampling; + // create new collectors + createCollectors(); + } + + log.info("Settings: adaptiveFlowSampling={}", adaptiveFlowSampling); } private Cache createBatchCache() { @@ -179,19 +204,38 @@ public class OpenFlowRuleProvider extends AbstractProvider } private void createCollector(OpenFlowSwitch sw) { - FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency); - fsc.start(); - collectors.put(new Dpid(sw.getId()), fsc); + if (adaptiveFlowSampling) { + // NewAdaptiveFlowStatsCollector Constructor + NewAdaptiveFlowStatsCollector fsc = new NewAdaptiveFlowStatsCollector(sw, flowPollFrequency); + fsc.start(); + afsCollectors.put(new Dpid(sw.getId()), fsc); + } else { + FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency); + fsc.start(); + simpleCollectors.put(new Dpid(sw.getId()), fsc); + } } private void stopCollectors() { - collectors.values().forEach(FlowStatsCollector::stop); - collectors.clear(); + if (adaptiveFlowSampling) { + // NewAdaptiveFlowStatsCollector Destructor + afsCollectors.values().forEach(NewAdaptiveFlowStatsCollector::stop); + afsCollectors.clear(); + } else { + simpleCollectors.values().forEach(FlowStatsCollector::stop); + simpleCollectors.clear(); + } } private void adjustRate() { DefaultLoad.setPollInterval(flowPollFrequency); - collectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency)); + + if (adaptiveFlowSampling) { + // NewAdaptiveFlowStatsCollector calAndPollInterval + afsCollectors.values().forEach(fsc -> fsc.adjustCalAndPollInterval(flowPollFrequency)); + } else { + simpleCollectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency)); + } } @Override @@ -202,8 +246,9 @@ public class OpenFlowRuleProvider extends AbstractProvider } private void applyRule(FlowRule flowRule) { - OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId() - .uri())); + Dpid dpid = Dpid.dpid(flowRule.deviceId().uri()); + OpenFlowSwitch sw = controller.getSwitch(dpid); + FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad(); if (hasPayload(flowRuleExtPayLoad)) { OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad()); @@ -212,6 +257,11 @@ public class OpenFlowRuleProvider extends AbstractProvider } sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(), Optional.empty()).buildFlowAdd()); + + if (adaptiveFlowSampling) { + // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector + afsCollectors.get(dpid).addWithFlowRule(flowRule); + } } @Override @@ -222,8 +272,9 @@ public class OpenFlowRuleProvider extends AbstractProvider } private void removeRule(FlowRule flowRule) { - OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId() - .uri())); + Dpid dpid = Dpid.dpid(flowRule.deviceId().uri()); + OpenFlowSwitch sw = controller.getSwitch(dpid); + FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad(); if (hasPayload(flowRuleExtPayLoad)) { OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad()); @@ -232,6 +283,11 @@ public class OpenFlowRuleProvider extends AbstractProvider } sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(), Optional.empty()).buildFlowDel()); + + if (adaptiveFlowSampling) { + // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector + afsCollectors.get(dpid).removeFlows(flowRule); + } } @Override @@ -242,11 +298,12 @@ public class OpenFlowRuleProvider extends AbstractProvider @Override public void executeBatch(FlowRuleBatchOperation batch) { + checkNotNull(batch); pendingBatches.put(batch.id(), new InternalCacheEntry(batch)); - OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId() - .uri())); + Dpid dpid = Dpid.dpid(batch.deviceId().uri()); + OpenFlowSwitch sw = controller.getSwitch(dpid); OFFlowMod mod; for (FlowRuleBatchEntry fbe : batch.getOperations()) { // flow is the third party privacy flow @@ -262,16 +319,32 @@ public class OpenFlowRuleProvider extends AbstractProvider switch (fbe.operator()) { case ADD: mod = builder.buildFlowAdd(); + + if (adaptiveFlowSampling) { + // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector + afsCollectors.get(dpid).addWithFlowRule(fbe.target()); + } break; case REMOVE: mod = builder.buildFlowDel(); + + if (adaptiveFlowSampling) { + // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector + afsCollectors.get(dpid).removeFlows(fbe.target()); + } break; case MODIFY: mod = builder.buildFlowMod(); + + if (adaptiveFlowSampling) { + // Add or Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector + // afsCollectors.get(dpid).addWithFlowRule(fbe.target()); //check if add is good or not + afsCollectors.get(dpid).addOrUpdateFlows((FlowEntry) fbe.target()); + } break; default: log.error("Unsupported batch operation {}; skipping flowmod {}", - fbe.operator(), fbe); + fbe.operator(), fbe); continue; } sw.sendMsg(mod); @@ -292,14 +365,24 @@ public class OpenFlowRuleProvider extends AbstractProvider @Override public void switchAdded(Dpid dpid) { + + OpenFlowSwitch sw = controller.getSwitch(dpid); + createCollector(controller.getSwitch(dpid)); } @Override public void switchRemoved(Dpid dpid) { - FlowStatsCollector collector = collectors.remove(dpid); - if (collector != null) { - collector.stop(); + if (adaptiveFlowSampling) { + NewAdaptiveFlowStatsCollector collector = afsCollectors.remove(dpid); + if (collector != null) { + collector.stop(); + } + } else { + FlowStatsCollector collector = simpleCollectors.remove(dpid); + if (collector != null) { + collector.stop(); + } } } @@ -321,6 +404,11 @@ public class OpenFlowRuleProvider extends AbstractProvider FlowEntry fr = new FlowEntryBuilder(dpid, removed).build(); providerService.flowRemoved(fr); + + if (adaptiveFlowSampling) { + // Removed TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector + afsCollectors.get(dpid).flowRemoved(fr); + } break; case STATS_REPLY: if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) { @@ -370,11 +458,10 @@ public class OpenFlowRuleProvider extends AbstractProvider + " tell us which one."); } } - break; + default: log.debug("Unhandled message type: {}", msg.getType()); } - } @Override @@ -392,7 +479,38 @@ public class OpenFlowRuleProvider extends AbstractProvider .map(entry -> new FlowEntryBuilder(dpid, entry).build()) .collect(Collectors.toList()); - providerService.pushFlowMetrics(did, flowEntries); + if (adaptiveFlowSampling) { + NewAdaptiveFlowStatsCollector afsc = afsCollectors.get(dpid); + + synchronized (afsc) { + if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) { + log.debug("OpenFlowRuleProvider:pushFlowMetrics, flowMissingXid={}, " + + "OFFlowStatsReply Xid={}, for {}", + afsc.getFlowMissingXid(), replies.getXid(), dpid); + } + + // Check that OFFlowStatsReply Xid is same with the one of OFFlowStatsRequest? + if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) { + if (afsc.getFlowMissingXid() == replies.getXid()) { + // call entire flow stats update with flowMissing synchronization. + // used existing pushFlowMetrics + providerService.pushFlowMetrics(did, flowEntries); + } + // reset flowMissingXid to NO_FLOW_MISSING_XID + afsc.setFlowMissingXid(NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID); + + } else { + // call individual flow stats update + providerService.pushFlowMetricsWithoutFlowMissing(did, flowEntries); + } + + // Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector + afsc.pushFlowMetrics(flowEntries); + } + } else { + // call existing entire flow stats update with flowMissing synchronization + providerService.pushFlowMetrics(did, flowEntries); + } } }