Support to publish port TX and RX stats in openstacktelemetry

Change-Id: I368fb676e4817cd01e5782a3b37170e2b9a5c6bd
This commit is contained in:
Jian Li 2018-07-07 14:53:32 +09:00
parent 7f70bb700d
commit a494768d94
8 changed files with 328 additions and 137 deletions

View File

@ -44,7 +44,7 @@ public interface StatsFlowRuleAdminService {
*
* @return a set of flow infos
*/
Set<FlowInfo> getFlowInfo();
Set<FlowInfo> getFlowInfos();
/**
* Deletes stat flow rule.

View File

@ -10,6 +10,7 @@ COMPILE_DEPS = [
'//lib:jersey-client',
'//cli:onos-cli',
'//lib:org.apache.karaf.shell.console',
'//apps/openstacknode/api:onos-apps-openstacknode-api',
'//apps/openstacknetworking/api:onos-apps-openstacknetworking-api',
'//apps/openstacktelemetry/api:onos-apps-openstacktelemetry-api',
'//lib:kafka-clients',

View File

@ -85,6 +85,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-apps-openstacknode-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-osgi</artifactId>

View File

@ -25,6 +25,9 @@ import java.util.Set;
*/
public class TinaMessageByteBufferCodec {
public static final String KAFKA_TOPIC = "sona.flow";
public static final String KAFKA_KEY = "flowdata";
private static final int HEADER_SIZE = 8;
private static final int ENTRY_SIZE = 88;
private static final int MILLISECONDS = 1000;

View File

@ -124,7 +124,7 @@ public class KafkaTelemetryManager implements KafkaTelemetryAdminService {
public Future<RecordMetadata> publish(ProducerRecord<String, byte[]> record) {
if (producer == null) {
log.warn("Kafka telemetry service has not been enabled!");
log.debug("Kafka telemetry service has not been enabled!");
return null;
}

View File

@ -36,6 +36,9 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_KEY;
import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_TOPIC;
/**
* Openstack telemetry manager.
*/
@ -45,9 +48,6 @@ public class OpenstackTelemetryManager implements OpenstackTelemetryService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final String KAFKA_TOPIC = "sona.flow";
private static final String KAFKA_KEY = "flowdata";
private List<TelemetryService> telemetryServices = Lists.newArrayList();
@Activate
@ -89,6 +89,8 @@ public class OpenstackTelemetryManager implements OpenstackTelemetryService {
if (service instanceof RestTelemetryManager) {
invokeRestPublisher((RestTelemetryService) service, flowInfos);
}
log.trace("Publishing Flow Infos {}", flowInfos);
});
}

View File

@ -16,7 +16,6 @@
package org.onosproject.openstacktelemetry.impl;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@ -29,12 +28,17 @@ import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.SharedScheduledExecutorService;
import org.onlab.util.SharedScheduledExecutors;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
@ -51,7 +55,11 @@ import org.onosproject.net.flow.criteria.IPProtocolCriterion;
import org.onosproject.net.flow.criteria.TcpPortCriterion;
import org.onosproject.net.flow.criteria.UdpPortCriterion;
import org.onosproject.net.host.HostService;
import org.onosproject.openstacknetworking.api.InstancePort;
import org.onosproject.openstacknetworking.api.InstancePortService;
import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
import org.onosproject.openstacknode.api.OpenstackNode;
import org.onosproject.openstacknode.api.OpenstackNodeService;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.StatsFlowRule;
@ -62,10 +70,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Dictionary;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.onlab.packet.Ethernet.TYPE_IPV4;
import static org.onlab.packet.IPv4.PROTOCOL_TCP;
@ -83,6 +93,7 @@ import static org.onosproject.openstacknetworking.api.Constants.STAT_OUTBOUND_TA
import static org.onosproject.openstacknetworking.api.Constants.VTAP_FLAT_OUTBOUND_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.VTAP_INBOUND_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_TABLE;
import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
import static org.onosproject.openstacktelemetry.api.Constants.FLAT;
import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
import static org.onosproject.openstacktelemetry.api.Constants.VLAN;
@ -101,16 +112,28 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
private static final byte FLOW_TYPE_SONA = 1; // VLAN
private static final long MILLISECONDS = 1000L;
private static final long INITIAL_DELAY = 5L;
private static final long REFRESH_INTERVAL = 5L;
private static final TimeUnit TIME_UNIT_SECOND = TimeUnit.SECONDS;
private static final String REVERSE_PATH_STATS = "reversePathStats";
private static final String EGRESS_STATS = "egressStats";
private static final String PORT_STATS = "portStats";
private static final boolean DEFAULT_REVERSE_PATH_STATS = false;
private static final boolean DEFAULT_EGRESS_STATS = false;
private static final boolean DEFAULT_PORT_STATS = true;
private static final String MAC_NOT_NULL = "MAC should not be null";
private static final String ARBITRARY_IP = "0.0.0.0/32";
private static final int ARBITRARY_LENGTH = 32;
private static final String ARBITRARY_MAC = "00:00:00:00:00:00";
private static final int ARBITRARY_IN_INTF = 0;
private static final int ARBITRARY_OUT_INTF = 0;
private static final boolean RECOVER_FROM_FAILURE = true;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@ -120,6 +143,9 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService componentConfigService;
@ -129,6 +155,12 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenstackNetworkService osNetworkService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected InstancePortService instPortService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenstackNodeService osNodeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenstackTelemetryService telemetryService;
@ -142,12 +174,16 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
"collecting the flow-based stats for egress port.")
private boolean egressStats = DEFAULT_EGRESS_STATS;
@Property(name = PORT_STATS, boolValue = DEFAULT_PORT_STATS,
label = "A flag which indicates whether to collect port TX & RX stats.")
private boolean portStats = DEFAULT_PORT_STATS;
private ApplicationId appId;
private Timer timer;
private TimerTask task;
private TelemetryCollector collector;
private SharedScheduledExecutorService executor;
private ScheduledFuture result;
private final Set<FlowInfo> gFlowInfoSet = Sets.newHashSet();
private int loopCount = 0;
private static final int SOURCE_ID = 1;
private static final int TARGET_ID = 2;
@ -157,15 +193,12 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
private static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00");
public StatsFlowRuleManager() {
this.timer = new Timer("openstack-telemetry-sender");
}
@Activate
protected void activate() {
appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
componentConfigService.registerProperties(getClass());
executor = SharedScheduledExecutors.getSingleThreadExecutor();
this.start();
@ -177,6 +210,8 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
componentConfigService.unregisterProperties(getClass(), false);
flowRuleService.removeFlowRulesById(appId);
log.info("Stopped");
}
@ -190,16 +225,17 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
@Override
public void start() {
log.info("Start publishing thread");
task = new InternalTimerTask();
timer.scheduleAtFixedRate(task, MILLISECONDS * REFRESH_INTERVAL,
MILLISECONDS * REFRESH_INTERVAL);
collector = new TelemetryCollector();
result = executor.scheduleAtFixedRate(collector, INITIAL_DELAY,
REFRESH_INTERVAL, TIME_UNIT_SECOND, RECOVER_FROM_FAILURE);
}
@Override
public void stop() {
log.info("Stop data publishing thread");
task.cancel();
task = null;
result.cancel(true);
collector = null;
}
@Override
@ -210,12 +246,221 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
@Override
public void deleteStatFlowRule(StatsFlowRule statsFlowRule) {
// FIXME: following code might not be necessary
flowRuleService.removeFlowRulesById(appId);
setStatFlowRule(statsFlowRule, false);
}
/**
* Gets a set of the flow infos.
*
* @return a set of flow infos
*/
@Override
public Set<FlowInfo> getFlowInfos() {
Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
// obtain all flow rule entries installed by telemetry app
for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
TrafficSelector selector = entry.selector();
IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
IPProtocolCriterion ipProtocol =
(IPProtocolCriterion) selector.getCriterion(IP_PROTO);
log.debug("[FlowInfo] TableID:{} SRC_IP:{} DST_IP:{} Pkt:{} Byte:{}",
((IndexTableId) entry.table()).id(),
srcIp.ip().toString(),
dstIp.ip().toString(),
entry.packets(),
entry.bytes());
fBuilder.withFlowType(FLOW_TYPE_SONA)
.withSrcIp(srcIp.ip())
.withDstIp(dstIp.ip());
if (ipProtocol != null) {
fBuilder.withProtocol((byte) ipProtocol.protocol());
if (ipProtocol.protocol() == PROTOCOL_TCP) {
TcpPortCriterion tcpSrc =
(TcpPortCriterion) selector.getCriterion(TCP_SRC);
TcpPortCriterion tcpDst =
(TcpPortCriterion) selector.getCriterion(TCP_DST);
log.debug("TCP SRC Port: {}, DST Port: {}",
tcpSrc.tcpPort().toInt(),
tcpDst.tcpPort().toInt());
fBuilder.withSrcPort(tcpSrc.tcpPort());
fBuilder.withDstPort(tcpDst.tcpPort());
} else if (ipProtocol.protocol() == PROTOCOL_UDP) {
UdpPortCriterion udpSrc =
(UdpPortCriterion) selector.getCriterion(UDP_SRC);
UdpPortCriterion udpDst =
(UdpPortCriterion) selector.getCriterion(UDP_DST);
log.debug("UDP SRC Port: {}, DST Port: {}",
udpSrc.udpPort().toInt(),
udpDst.udpPort().toInt());
fBuilder.withSrcPort(udpSrc.udpPort());
fBuilder.withDstPort(udpDst.udpPort());
} else {
log.debug("Other protocol: {}", ipProtocol.protocol());
}
}
fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
.withDstMac(getMacAddress(dstIp.ip().address()))
.withInputInterfaceId(getInterfaceId(srcIp.ip().address()))
.withOutputInterfaceId(getInterfaceId(dstIp.ip().address()))
.withVlanId(getVlanId(srcIp.ip().address()))
.withDeviceId(entry.deviceId());
StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
// TODO: need to collect error and drop packets stats
// TODO: need to make the refresh interval configurable
sBuilder.withStartupTime(System.currentTimeMillis())
.withFstPktArrTime(System.currentTimeMillis())
.withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
.withCurrAccPkts((int) entry.packets())
.withCurrAccBytes(entry.bytes())
.withErrorPkts((short) 0)
.withDropPkts((short) 0);
fBuilder.withStatsInfo(sBuilder.build());
FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
flowInfos.add(flowInfo);
log.debug("FlowInfo: \n{}", flowInfo.toString());
}
return flowInfos;
}
/**
* Gets a set of flow infos by referring to destination VM port.
*
* @return flow infos
*/
private Set<FlowInfo> getDstPortBasedFlowInfos() {
Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
Set<PortNumber> instPortNums = instPortService.instancePorts()
.stream()
.map(InstancePort::portNumber)
.collect(Collectors.toSet());
Set<DeviceId> deviceIds = osNodeService.completeNodes(COMPUTE)
.stream()
.map(OpenstackNode::intgBridge)
.collect(Collectors.toSet());
deviceIds.forEach(d -> {
List<PortStatistics> stats =
deviceService.getPortStatistics(d)
.stream()
.filter(s -> instPortNums.contains(s.portNumber()))
.collect(Collectors.toList());
stats.forEach(s -> {
InstancePort instPort = getInstancePort(d, s.portNumber());
flowInfos.add(buildTxPortInfo(instPort, s));
flowInfos.add(buildRxPortInfo(instPort, s));
});
});
return flowInfos;
}
/**
* Obtains the flow info generated by TX port.
*
* @param instPort instance port
* @param stat port statistics
* @return flow info
*/
private FlowInfo buildTxPortInfo(InstancePort instPort, PortStatistics stat) {
FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
fBuilder.withFlowType(FLOW_TYPE_SONA)
.withSrcIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
.withDstIp(IpPrefix.valueOf(ARBITRARY_IP))
.withSrcMac(instPort.macAddress())
.withDstMac(MacAddress.valueOf(ARBITRARY_MAC))
.withDeviceId(instPort.deviceId())
.withInputInterfaceId(ARBITRARY_IN_INTF)
.withOutputInterfaceId(ARBITRARY_OUT_INTF)
.withVlanId(VlanId.vlanId());
StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
sBuilder.withStartupTime(System.currentTimeMillis())
.withFstPktArrTime(System.currentTimeMillis())
.withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
.withCurrAccPkts((int) stat.packetsSent())
.withCurrAccBytes(stat.bytesSent())
.withErrorPkts((short) stat.packetsTxErrors())
.withDropPkts((short) stat.packetsTxDropped());
fBuilder.withStatsInfo(sBuilder.build());
return mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
}
/**
* Obtains the flow info generated by RX port.
*
* @param instPort instance port
* @param stat port statistics
* @return flow info
*/
private FlowInfo buildRxPortInfo(InstancePort instPort, PortStatistics stat) {
FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
fBuilder.withFlowType(FLOW_TYPE_SONA)
.withSrcIp(IpPrefix.valueOf(ARBITRARY_IP))
.withDstIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
.withSrcMac(MacAddress.valueOf(ARBITRARY_MAC))
.withDstMac(instPort.macAddress())
.withDeviceId(instPort.deviceId())
.withInputInterfaceId(ARBITRARY_IN_INTF)
.withOutputInterfaceId(ARBITRARY_OUT_INTF)
.withVlanId(VlanId.vlanId());
StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
sBuilder.withStartupTime(System.currentTimeMillis())
.withFstPktArrTime(System.currentTimeMillis())
.withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
.withCurrAccPkts((int) stat.packetsReceived())
.withCurrAccBytes(stat.bytesReceived())
.withErrorPkts((short) stat.packetsRxErrors())
.withDropPkts((short) stat.packetsRxDropped());
fBuilder.withStatsInfo(sBuilder.build());
return mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
}
/**
* Obtains instance port which associated with the given device identifier
* and port number.
*
* @param deviceId device identifier
* @param portNumber port number
* @return instance port
*/
private InstancePort getInstancePort(DeviceId deviceId, PortNumber portNumber) {
return instPortService.instancePorts().stream()
.filter(p -> p.deviceId().equals(deviceId))
.filter(p -> p.portNumber().equals(portNumber))
.findFirst().orElse(null);
}
private void connectTables(DeviceId deviceId, int fromTable, int toTable,
StatsFlowRule statsFlowRule, int rulePriority,
boolean install) {
@ -278,110 +523,18 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
@Override
public void onSuccess(FlowRuleOperations ops) {
log.debug("Provisioned vni or forwarding table: \n {}", ops.toString());
log.debug("Install rules for telemetry stats: \n {}",
ops.toString());
}
@Override
public void onError(FlowRuleOperations ops) {
log.debug("Failed to provision vni or forwarding table: \n {}", ops.toString());
log.debug("Failed to install rules for telemetry stats: \n {}",
ops.toString());
}
}));
}
/**
* Gets a set of the flow infos.
*
* @return a set of flow infos
*/
public Set<FlowInfo> getFlowInfo() {
Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
// obtain all flow rule entries installed by telemetry app
for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
TrafficSelector selector = entry.selector();
IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
IPProtocolCriterion ipProtocol =
(IPProtocolCriterion) selector.getCriterion(IP_PROTO);
log.debug("[FlowInfo] TableID:{} SRC_IP:{} DST_IP:{} Pkt:{} Byte:{}",
((IndexTableId) entry.table()).id(),
srcIp.ip().toString(),
dstIp.ip().toString(),
entry.packets(),
entry.bytes());
fBuilder.withFlowType(FLOW_TYPE_SONA)
.withSrcIp(srcIp.ip())
.withDstIp(dstIp.ip());
if (ipProtocol != null) {
fBuilder.withProtocol((byte) ipProtocol.protocol());
if (ipProtocol.protocol() == PROTOCOL_TCP) {
TcpPortCriterion tcpSrc =
(TcpPortCriterion) selector.getCriterion(TCP_SRC);
TcpPortCriterion tcpDst =
(TcpPortCriterion) selector.getCriterion(TCP_DST);
log.debug("TCP SRC Port: {}, DST Port: {}",
tcpSrc.tcpPort().toInt(),
tcpDst.tcpPort().toInt());
fBuilder.withSrcPort(tcpSrc.tcpPort());
fBuilder.withDstPort(tcpDst.tcpPort());
} else if (ipProtocol.protocol() == PROTOCOL_UDP) {
UdpPortCriterion udpSrc =
(UdpPortCriterion) selector.getCriterion(UDP_SRC);
UdpPortCriterion udpDst =
(UdpPortCriterion) selector.getCriterion(UDP_DST);
log.debug("UDP SRC Port: {}, DST Port: {}",
udpSrc.udpPort().toInt(),
udpDst.udpPort().toInt());
fBuilder.withSrcPort(udpSrc.udpPort());
fBuilder.withDstPort(udpDst.udpPort());
} else {
log.debug("Other protocol: {}", ipProtocol.protocol());
}
}
fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
.withDstMac(getMacAddress(dstIp.ip().address()))
.withInputInterfaceId(getInterfaceId(srcIp.ip().address()))
.withOutputInterfaceId(getInterfaceId(dstIp.ip().address()))
.withVlanId(getVlanId(srcIp.ip().address()))
.withDeviceId(entry.deviceId());
StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
// TODO: need to collect error and drop packets stats
// TODO: need to make the refresh interval configurable
sBuilder.withStartupTime(System.currentTimeMillis())
.withFstPktArrTime(System.currentTimeMillis())
.withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
.withCurrAccPkts((int) entry.packets())
.withCurrAccBytes(entry.bytes())
.withErrorPkts((short) 0)
.withDropPkts((short) 0);
fBuilder.withStatsInfo(sBuilder.build());
FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
flowInfos.add(flowInfo);
log.debug("FlowInfo: \n{}", flowInfo.toString());
}
return flowInfos;
}
/**
* Merges old FlowInfo.StatsInfo and current FlowInfo.StatsInfo.
*
@ -515,9 +668,7 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
Optional<Host> host = hostService.getHostsByIp(ipAddress).stream().findAny();
return host.map(host1 -> host1.location().deviceId()).orElse(null);
} else {
log.warn("Failed to get DeviceID which is connected to {}. " +
"The destination is either a bare-metal or located out of DC",
ipAddress.toString());
log.debug("No DeviceID is associated to {}", ipAddress.toString());
return null;
}
}
@ -593,29 +744,54 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
egressStats = egressStatsConfigured;
log.info("Configured. Egress stats flag is {}", egressStats);
}
Boolean portStatsConfigured = getBooleanProperty(properties, PORT_STATS);
if (portStatsConfigured == null) {
portStats = DEFAULT_PORT_STATS;
log.info("Port stats flag is NOT " +
"configured, default value is {}", portStats);
} else {
portStats = portStatsConfigured;
log.info("Configured. Port stats flag is {}", portStats);
}
}
private class InternalTimerTask extends TimerTask {
private class TelemetryCollector implements Runnable {
@Override
public void run() {
log.debug("Timer task thread starts ({})", loopCount++);
Set<FlowInfo> filteredFlowInfos = Sets.newConcurrentHashSet();
// we only let the master controller of the device where the
// stats flow rules are installed send kafka message
getFlowInfo().forEach(f -> {
DeviceId deviceId = getDeviceId(f.srcIp().address());
if (mastershipService.isLocalMaster(deviceId)) {
// stats flow rules are installed send stats message
getFlowInfos().forEach(f -> {
if (checkSrcDstLocalMaster(f)) {
filteredFlowInfos.add(f);
}
});
try {
telemetryService.publish(filteredFlowInfos);
} catch (Exception ex) {
log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
// we only let the master controller of the device where the port
// is located to send stats message
if (portStats) {
getDstPortBasedFlowInfos().forEach(f -> {
if (checkSrcDstLocalMaster(f)) {
filteredFlowInfos.add(f);
}
});
}
telemetryService.publish(filteredFlowInfos);
}
private boolean checkSrcDstLocalMaster(FlowInfo info) {
DeviceId srcDeviceId = getDeviceId(info.srcIp().address());
DeviceId dstDeviceId = getDeviceId(info.dstIp().address());
boolean isSrcLocalMaster = srcDeviceId != null &&
mastershipService.isLocalMaster(srcDeviceId);
boolean isDstLocalMaster = dstDeviceId != null &&
mastershipService.isLocalMaster(dstDeviceId);
return isSrcLocalMaster || isDstLocalMaster;
}
}
}

View File

@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@ -98,6 +99,9 @@ public class OpenstackTelemetryWebResource extends AbstractWebResource {
* @param input openstack flow rule JSON input stream
* @return 200 OK if processing is correct.
*/
@DELETE
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response deleteBulkFlowRule(InputStream input) {
log.info("DELETE BULK FLOW RULE: {}", input.toString());
@ -132,7 +136,7 @@ public class OpenstackTelemetryWebResource extends AbstractWebResource {
log.info("GET BULK FLOW RULE");
Set<FlowInfo> flowInfoSet;
flowInfoSet = statsFlowRuleService.getFlowInfo();
flowInfoSet = statsFlowRuleService.getFlowInfos();
log.info("\n\n======================================================\n" +
"FlowInfo Set: \n{}" +
@ -151,11 +155,10 @@ public class OpenstackTelemetryWebResource extends AbstractWebResource {
}
@GET
@Path("list/{src_ip_prefix}/{dst_ip_prefix}")
@Path("list/{srcIpPrefix}/{dstIpPrefix}")
@Produces(MediaType.APPLICATION_JSON)
public Response getFlowRule(
@PathParam("src_ip_prefix") String srcIpPrefix,
@PathParam("dst_ip_prefix") String dstIpPrefix) {
public Response getFlowRule(@PathParam("srcIpPrefix") String srcIpPrefix,
@PathParam("dstIpPrefix") String dstIpPrefix) {
return ok(root).build();
}