Support monitoring underlay network using openstack telemetry app

Change-Id: I84f8735a700a89f28124fe3a76fafab339e3dbc1
This commit is contained in:
Jian Li 2018-08-27 18:49:04 +09:00
parent 3ed7f30db3
commit f8b8c7fddb
4 changed files with 381 additions and 63 deletions

View File

@ -42,11 +42,18 @@ public interface StatsFlowRuleAdminService {
void createStatFlowRule(StatsFlowRule statFlowRule);
/**
* Gets a set of flow infos.
* Gets a set of flow infos collected from overlay network.
*
* @return a set of flow infos
*/
Set<FlowInfo> getFlowInfos();
Set<FlowInfo> getOverlayFlowInfos();
/**
* Gets a set of flow infos collected from underlay network.
*
* @return a set of flow infos
*/
Set<FlowInfo> getUnderlayFlowInfos();
/**
* Deletes stat flow rule.

View File

@ -156,21 +156,17 @@ public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
BatchPoints batchPoints = BatchPoints.database(database).build();
for (FlowInfo flowInfo: record.flowInfos()) {
Point point = Point
Point.Builder pointBuilder = Point
.measurement((measurement == null) ? record.measurement() : measurement)
.tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
.tag(DEVICE_ID, flowInfo.deviceId().toString())
.tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
.tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
.tag(VLAN_ID, flowInfo.vlanId().toString())
.tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
.tag(SRC_IP, flowInfo.srcIp().toString())
.tag(DST_IP, flowInfo.dstIp().toString())
.tag(SRC_PORT, getTpPort(flowInfo.srcPort()))
.tag(DST_PORT, getTpPort(flowInfo.dstPort()))
.tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
.tag(SRC_MAC, flowInfo.srcMac().toString())
.tag(DST_MAC, flowInfo.dstMac().toString())
.addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
.addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
.addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
@ -179,9 +175,21 @@ public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
.addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
.addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
.addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
.addField(DROP_PKTS, flowInfo.statsInfo().dropPkts())
.build();
batchPoints.point(point);
.addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
if (flowInfo.vlanId() != null) {
pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
}
if (flowInfo.srcPort() != null) {
pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
}
if (flowInfo.dstPort() != null) {
pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
}
batchPoints.point(pointBuilder.build());
}
producer.write(batchPoints);
}

View File

@ -34,11 +34,15 @@ import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
@ -49,8 +53,10 @@ import org.onosproject.net.flow.FlowRuleOperationsContext;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.Criterion;
import org.onosproject.net.flow.criteria.IPCriterion;
import org.onosproject.net.flow.criteria.IPProtocolCriterion;
import org.onosproject.net.flow.criteria.PortCriterion;
import org.onosproject.net.flow.criteria.TcpPortCriterion;
import org.onosproject.net.flow.criteria.UdpPortCriterion;
import org.onosproject.net.host.HostService;
@ -68,6 +74,7 @@ import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Dictionary;
import java.util.LinkedList;
import java.util.List;
@ -82,6 +89,7 @@ import java.util.stream.Collectors;
import static org.onlab.packet.Ethernet.TYPE_IPV4;
import static org.onlab.packet.IPv4.PROTOCOL_TCP;
import static org.onlab.packet.IPv4.PROTOCOL_UDP;
import static org.onosproject.net.Device.Type.SWITCH;
import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_DST;
import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_SRC;
import static org.onosproject.net.flow.criteria.Criterion.Type.IP_PROTO;
@ -96,6 +104,7 @@ import static org.onosproject.openstacknetworking.api.Constants.VTAP_FLAT_OUTBOU
import static org.onosproject.openstacknetworking.api.Constants.VTAP_INBOUND_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_TABLE;
import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.CONTROLLER;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_DATA_POINT_SIZE;
import static org.onosproject.openstacktelemetry.api.Constants.FLAT;
import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
@ -123,13 +132,22 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
private static final String EGRESS_STATS = "egressStats";
private static final String PORT_STATS = "portStats";
private static final String MONITOR_OVERLAY = "monitorOverlay";
private static final String MONITOR_UNDERLAY = "monitorUnderlay";
private static final String OVS_DRIVER_NAME = "ovs";
private static final boolean DEFAULT_REVERSE_PATH_STATS = false;
private static final boolean DEFAULT_EGRESS_STATS = false;
private static final boolean DEFAULT_PORT_STATS = true;
private static final boolean DEFAULT_MONITOR_OVERLAY = true;
private static final boolean DEFAULT_MONITOR_UNDERLAY = true;
private static final String ARBITRARY_IP = "0.0.0.0/32";
private static final int ARBITRARY_LENGTH = 32;
private static final String ARBITRARY_MAC = "00:00:00:00:00:00";
private static final IpAddress NO_HOST_IP = IpAddress.valueOf("255.255.255.255");
private static final MacAddress NO_HOST_MAC = MacAddress.valueOf(ARBITRARY_MAC);
private static final int ARBITRARY_IN_INTF = 0;
private static final int ARBITRARY_OUT_INTF = 0;
@ -148,6 +166,9 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DriverService driverService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService componentConfigService;
@ -180,7 +201,15 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
label = "A flag which indicates whether to collect port TX & RX stats.")
private boolean portStats = DEFAULT_PORT_STATS;
private ApplicationId appId;
@Property(name = MONITOR_OVERLAY, boolValue = DEFAULT_MONITOR_OVERLAY,
label = "A flag which indicates whether to monitor overlay network port stats.")
private boolean monitorOverlay = DEFAULT_MONITOR_OVERLAY;
@Property(name = MONITOR_UNDERLAY, boolValue = DEFAULT_MONITOR_UNDERLAY,
label = "A flag which indicates whether to monitor underlay network port stats.")
private boolean monitorUnderlay = DEFAULT_MONITOR_UNDERLAY;
private ApplicationId telemetryAppId;
private TelemetryCollector collector;
private ScheduledFuture result;
@ -195,7 +224,8 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
@Activate
protected void activate() {
appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
telemetryAppId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
componentConfigService.registerProperties(getClass());
start();
@ -205,7 +235,7 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
@Deactivate
protected void deactivate() {
componentConfigService.unregisterProperties(getClass(), false);
flowRuleService.removeFlowRulesById(appId);
flowRuleService.removeFlowRulesById(telemetryAppId);
stop();
log.info("Stopped");
@ -247,20 +277,77 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
setStatFlowRule(statsFlowRule, false);
}
/**
* Gets a set of the flow infos.
*
* @return a set of flow infos
*/
@Override
public Set<FlowInfo> getFlowInfos() {
public Map<String, Queue<FlowInfo>> getFlowInfoMap() {
return flowInfoMap;
}
@Override
public Set<FlowInfo> getUnderlayFlowInfos() {
Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
for (Device device : getUnderlayDevices()) {
if (!isEdgeSwitch(device.id())) {
continue;
}
for (FlowEntry entry : flowRuleService.getFlowEntries(device.id())) {
FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
TrafficSelector selector = entry.selector();
Criterion inPort = selector.getCriterion(Criterion.Type.IN_PORT);
Criterion dstIpCriterion = selector.getCriterion(Criterion.Type.IPV4_DST);
if (inPort != null && dstIpCriterion != null) {
IpAddress srcIp = getIpAddress(device, (PortCriterion) inPort);
IpAddress dstIp = ((IPCriterion) dstIpCriterion).ip().address();
if (srcIp == null) {
continue;
}
fBuilder.withFlowType(FLOW_TYPE_SONA)
.withSrcIp(IpPrefix.valueOf(srcIp, ARBITRARY_LENGTH))
.withDstIp(IpPrefix.valueOf(dstIp, ARBITRARY_LENGTH))
.withSrcMac(getMacAddress(srcIp))
.withDstMac(getMacAddress(dstIp))
.withInputInterfaceId(getInterfaceId(srcIp))
.withOutputInterfaceId(getInterfaceId(dstIp))
.withDeviceId(entry.deviceId());
StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
sBuilder.withStartupTime(System.currentTimeMillis())
.withFstPktArrTime(System.currentTimeMillis())
.withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
.withCurrAccPkts((int) entry.packets())
.withCurrAccBytes(entry.bytes())
.withErrorPkts((short) 0)
.withDropPkts((short) 0);
fBuilder.withStatsInfo(sBuilder.build());
FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
flowInfos.add(flowInfo);
}
}
}
return flowInfos;
}
@Override
public Set<FlowInfo> getOverlayFlowInfos() {
Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
// obtain all flow rule entries installed by telemetry app
for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
for (FlowEntry entry : flowRuleService.getFlowEntriesById(telemetryAppId)) {
FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
TrafficSelector selector = entry.selector();
IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
IPProtocolCriterion ipProtocol =
@ -301,8 +388,6 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
// TODO: need to collect error and drop packets stats
// TODO: need to make the refresh interval configurable
sBuilder.withStartupTime(System.currentTimeMillis())
.withFstPktArrTime(System.currentTimeMillis())
.withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
@ -324,11 +409,11 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
}
/**
* Gets a set of flow infos by referring to destination VM port.
* Gets a set of flow infos by referring to overlay destination VM port.
*
* @return flow infos
*/
private Set<FlowInfo> getDstPortBasedFlowInfos() {
private Set<FlowInfo> getOverlayDstPortBasedFlowInfos() {
Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
Set<PortNumber> instPortNums = instPortService.instancePorts()
.stream()
@ -348,8 +433,8 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
stats.forEach(s -> {
InstancePort instPort = getInstancePort(d, s.portNumber());
flowInfos.add(buildTxPortInfo(instPort, s));
flowInfos.add(buildRxPortInfo(instPort, s));
flowInfos.add(buildTxFlowInfoFromInstancePort(instPort, s));
flowInfos.add(buildRxFlowInfoFromInstancePort(instPort, s));
});
});
@ -357,21 +442,151 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
}
/**
* Obtains the flow info generated by TX port.
* Gets a set of flow infos by referring to underlay destination port.
*
* @return flow infos
*/
private Set<FlowInfo> getUnderlayDstPortBasedFlowInfos() {
Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
for (Device d : getUnderlayDevices()) {
List<PortStatistics> stats =
new ArrayList<>(deviceService.getPortStatistics(d.id()));
stats.forEach(s -> {
Host host = hostService.getConnectedHosts(new ConnectPoint(d.id(), s.portNumber()))
.stream().findFirst().orElse(null);
if (host != null) {
flowInfos.add(buildTxFlowInfoFromHost(host, s));
flowInfos.add(buildRxFlowInfoFromHost(host, s));
}
});
}
return flowInfos;
}
/**
* Obtains a set of device instances which construct underlay network.
*
* @return a set of device instances
*/
private Set<Device> getUnderlayDevices() {
Set<Device> underlayDevices = Sets.newConcurrentHashSet();
Set<DeviceId> overlayDeviceIds = osNodeService.completeNodes()
.stream()
.filter(n -> n.type() != CONTROLLER)
.map(OpenstackNode::intgBridge)
.collect(Collectors.toSet());
for (Device d : deviceService.getAvailableDevices(SWITCH)) {
if (overlayDeviceIds.contains(d.id())) {
continue;
}
underlayDevices.add(d);
}
return underlayDevices;
}
/**
* Checks whether the given drivers contains OVS driver.
*
* @param drivers a set of drivers
* @return true if the given drivers contain any OVS driver, false otherwise
*/
private boolean hasOvsDriver(List<Driver> drivers) {
for (Driver driver : drivers) {
if (OVS_DRIVER_NAME.equals(driver.name())) {
return true;
}
}
return false;
}
/**
* Obtains the flow info generated by TX port from instance port.
*
* @param instPort instance port
* @param stat port statistics
* @return flow info
*/
private FlowInfo buildTxPortInfo(InstancePort instPort, PortStatistics stat) {
private FlowInfo buildTxFlowInfoFromInstancePort(InstancePort instPort,
PortStatistics stat) {
return buildTxFlowInfo(instPort.ipAddress(), instPort.macAddress(),
instPort.deviceId(), stat);
}
/**
* Obtains the flow info generated from RX port from instance port.
*
* @param instPort instance port
* @param stat port statistics
* @return flow info
*/
private FlowInfo buildRxFlowInfoFromInstancePort(InstancePort instPort,
PortStatistics stat) {
return buildRxFlowInfo(instPort.ipAddress(), instPort.macAddress(),
instPort.deviceId(), stat);
}
/**
* Obtains the flow info generated by TX port from host.
*
* @param host host
* @param stat port statistics
* @return flow info
*/
private FlowInfo buildTxFlowInfoFromHost(Host host, PortStatistics stat) {
IpAddress ip = host.ipAddresses().stream().findFirst().orElse(null);
if (ip != null) {
return buildTxFlowInfo(ip, host.mac(), host.location().deviceId(), stat);
}
return null;
}
/**
* Obtains the flow info generated by RX @param host host.
*
* @param host host
* @param stat port statistics
* @return flow info
*/
private FlowInfo buildRxFlowInfoFromHost(Host host, PortStatistics stat) {
IpAddress ip = host.ipAddresses().stream().findFirst().orElse(null);
if (ip != null) {
return buildRxFlowInfo(ip, host.mac(), host.location().deviceId(), stat);
}
return null;
}
/**
* Obtains the flow info generated from TX port.
*
* @param ipAddress IP address
* @param macAddress MAC address
* @param deviceId device identifier
* @param stat port statistics
* @return flow info
*/
private FlowInfo buildTxFlowInfo(IpAddress ipAddress,
MacAddress macAddress,
DeviceId deviceId,
PortStatistics stat) {
FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
fBuilder.withFlowType(FLOW_TYPE_SONA)
.withSrcIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
.withSrcIp(IpPrefix.valueOf(ipAddress, ARBITRARY_LENGTH))
.withDstIp(IpPrefix.valueOf(ARBITRARY_IP))
.withSrcMac(instPort.macAddress())
.withSrcMac(macAddress)
.withDstMac(NO_HOST_MAC)
.withDeviceId(instPort.deviceId())
.withDeviceId(deviceId)
.withInputInterfaceId(ARBITRARY_IN_INTF)
.withOutputInterfaceId(ARBITRARY_OUT_INTF)
.withVlanId(VlanId.vlanId());
@ -391,21 +606,26 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
}
/**
* Obtains the flow info generated by RX port.
* Obtains the flow info generated from RX port.
*
* @param instPort instance port
* @param ipAddress IP address
* @param macAddress MAC address
* @param deviceId Device identifier
* @param stat port statistics
* @return flow info
*/
private FlowInfo buildRxPortInfo(InstancePort instPort, PortStatistics stat) {
private FlowInfo buildRxFlowInfo(IpAddress ipAddress,
MacAddress macAddress,
DeviceId deviceId,
PortStatistics stat) {
FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
fBuilder.withFlowType(FLOW_TYPE_SONA)
.withSrcIp(IpPrefix.valueOf(ARBITRARY_IP))
.withDstIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
.withDstIp(IpPrefix.valueOf(ipAddress, ARBITRARY_LENGTH))
.withSrcMac(NO_HOST_MAC)
.withDstMac(instPort.macAddress())
.withDeviceId(instPort.deviceId())
.withDstMac(macAddress)
.withDeviceId(deviceId)
.withInputInterfaceId(ARBITRARY_IN_INTF)
.withOutputInterfaceId(ARBITRARY_OUT_INTF)
.withVlanId(VlanId.vlanId());
@ -439,6 +659,17 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
.findFirst().orElse(null);
}
/**
* Installs a flow rule where the source table is fromTable, while destination
* table is toTable.
*
* @param deviceId device identifier
* @param fromTable source table
* @param toTable destination table
* @param statsFlowRule stats flow rule
* @param rulePriority rule priority
* @param install installation flag
*/
private void connectTables(DeviceId deviceId, int fromTable, int toTable,
StatsFlowRule statsFlowRule, int rulePriority,
boolean install) {
@ -478,7 +709,7 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
.withSelector(selectorBuilder.build())
.withTreatment(treatmentBuilder.build())
.withPriority(prefixLength)
.fromApp(appId)
.fromApp(telemetryAppId)
.makePermanent()
.forTable(fromTable)
.build();
@ -635,7 +866,7 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
}
/**
* Get Device ID which the VM is located.
* Gets Device ID which the VM is located.
*
* @param ipAddress IP Address of host
* @return Device ID
@ -651,7 +882,7 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
}
/**
* Get VLAN ID with respect to IP Address.
* Gets VLAN ID with respect to IP Address.
*
* @param ipAddress IP Address of host
* @return VLAN ID
@ -665,7 +896,7 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
}
/**
* Get Interface ID of Switch which is connected to a host.
* Gets Interface ID of Switch which is connected to a host.
*
* @param ipAddress IP Address of host
* @return Interface ID of Switch
@ -679,7 +910,7 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
}
/**
* Get MAC Address of host.
* Gets MAC Address of host.
*
* @param ipAddress IP Address of host
* @return MAC Address of host
@ -693,6 +924,26 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
return NO_HOST_MAC;
}
/**
* Gets IP address of the host which is attached to the given device and port.
*
* @param device device
* @param inPort IN port number
* @return IP address
*/
private IpAddress getIpAddress(Device device, PortCriterion inPort) {
Host host = hostService.getConnectedHosts(device.id()).stream()
.filter(h -> h.location().port().equals(inPort.port()))
.findAny().orElse(null);
if (host != null) {
return host.ipAddresses().stream().findAny().get();
}
return NO_HOST_IP;
}
private void enqFlowInfo(FlowInfo flowInfo) {
String key = flowInfo.uniqueFlowInfoKey();
Queue<FlowInfo> queue = flowInfoMap.get(key);
@ -709,8 +960,15 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
}
}
public Map<String, Queue<FlowInfo>> getFlowInfoMap() {
return flowInfoMap;
/**
* Checks whether the given device is edge switch or not.
*
* @param id device identifier
* @return true if the given device is edge switch, false otherwise
*/
private boolean isEdgeSwitch(DeviceId id) {
return !hostService.getConnectedHosts(id).isEmpty();
}
/**
@ -751,37 +1009,82 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
portStats = portStatsConfigured;
log.info("Configured. Port stats flag is {}", portStats);
}
Boolean monitorOverlayConfigured = getBooleanProperty(properties, MONITOR_OVERLAY);
if (monitorOverlayConfigured == null) {
monitorOverlay = DEFAULT_MONITOR_OVERLAY;
log.info("Monitor overlay flag is NOT " +
"configured, default value is {}", monitorOverlay);
} else {
monitorOverlay = monitorOverlayConfigured;
log.info("Configured. Monitor overlay flag is {}", monitorOverlay);
}
Boolean monitorUnderlayConfigured = getBooleanProperty(properties, MONITOR_UNDERLAY);
if (monitorUnderlayConfigured == null) {
monitorUnderlay = DEFAULT_MONITOR_UNDERLAY;
log.info("Monitor underlay flag is NOT " +
"configured, default value is {}", monitorUnderlay);
} else {
monitorUnderlay = monitorUnderlayConfigured;
log.info("Configured. Monitor underlay flag is {}", monitorUnderlay);
}
}
private class TelemetryCollector implements Runnable {
@Override
public void run() {
Set<FlowInfo> filteredFlowInfos = Sets.newConcurrentHashSet();
Set<FlowInfo> filteredOverlayFlowInfos = Sets.newConcurrentHashSet();
Set<FlowInfo> filteredUnderlayFlowInfos = Sets.newConcurrentHashSet();
// we only let the master controller of the device where the
// stats flow rules are installed send stats message
getFlowInfos().forEach(f -> {
if (checkSrcDstLocalMaster(f)) {
filteredFlowInfos.add(f);
}
});
// we only let the master controller of the device where the port
// is located to send stats message
if (portStats) {
getDstPortBasedFlowInfos().forEach(f -> {
if (monitorOverlay) {
getOverlayFlowInfos().forEach(f -> {
if (checkSrcDstLocalMaster(f)) {
filteredFlowInfos.add(f);
filteredOverlayFlowInfos.add(f);
}
});
}
if (monitorUnderlay) {
getUnderlayFlowInfos().forEach(f -> {
if (checkSrcDstLocalMaster(f)) {
filteredUnderlayFlowInfos.add(f);
}
});
}
telemetryService.publish(filteredFlowInfos);
// we only let the master controller of the device where the port
// is located to send stats message
if (portStats) {
if (monitorOverlay) {
getOverlayDstPortBasedFlowInfos().forEach(f -> {
if (checkSrcDstLocalMaster(f)) {
filteredOverlayFlowInfos.add(f);
}
});
}
// TODO: Refactor the following code to "TelemetryService" style.
filteredFlowInfos.forEach(flowInfo -> {
enqFlowInfo(flowInfo);
});
if (monitorUnderlay) {
getUnderlayDstPortBasedFlowInfos().forEach(f -> {
if (checkSrcDstLocalMaster(f)) {
filteredUnderlayFlowInfos.add(f);
}
});
}
}
if (monitorOverlay) {
telemetryService.publish(filteredOverlayFlowInfos);
// TODO: Refactor the following code to "TelemetryService" style.
filteredOverlayFlowInfos.forEach(StatsFlowRuleManager.this::enqFlowInfo);
}
if (monitorUnderlay) {
telemetryService.publish(filteredUnderlayFlowInfos);
}
}
private boolean checkSrcDstLocalMaster(FlowInfo info) {

View File

@ -136,7 +136,7 @@ public class OpenstackTelemetryWebResource extends AbstractWebResource {
log.info("GET BULK FLOW RULE");
Set<FlowInfo> flowInfoSet;
flowInfoSet = statsFlowRuleService.getFlowInfos();
flowInfoSet = statsFlowRuleService.getOverlayFlowInfos();
log.info("\n\n======================================================\n" +
"FlowInfo Set: \n{}" +