From a494768d949a2cfd244ba7fc3c0dcf7b74792c08 Mon Sep 17 00:00:00 2001 From: Jian Li Date: Sat, 7 Jul 2018 14:53:32 +0900 Subject: [PATCH] Support to publish port TX and RX stats in openstacktelemetry Change-Id: I368fb676e4817cd01e5782a3b37170e2b9a5c6bd --- .../api/StatsFlowRuleAdminService.java | 2 +- apps/openstacktelemetry/app/BUCK | 1 + apps/openstacktelemetry/app/pom.xml | 6 + .../codec/TinaMessageByteBufferCodec.java | 3 + .../impl/KafkaTelemetryManager.java | 2 +- .../impl/OpenstackTelemetryManager.java | 8 +- .../impl/StatsFlowRuleManager.java | 430 ++++++++++++------ .../web/OpenstackTelemetryWebResource.java | 13 +- 8 files changed, 328 insertions(+), 137 deletions(-) diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRuleAdminService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRuleAdminService.java index 7fed73ac60..dcf682d441 100644 --- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRuleAdminService.java +++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRuleAdminService.java @@ -44,7 +44,7 @@ public interface StatsFlowRuleAdminService { * * @return a set of flow infos */ - Set getFlowInfo(); + Set getFlowInfos(); /** * Deletes stat flow rule. diff --git a/apps/openstacktelemetry/app/BUCK b/apps/openstacktelemetry/app/BUCK index 97bc0f9b7d..4cc3d98f98 100644 --- a/apps/openstacktelemetry/app/BUCK +++ b/apps/openstacktelemetry/app/BUCK @@ -10,6 +10,7 @@ COMPILE_DEPS = [ '//lib:jersey-client', '//cli:onos-cli', '//lib:org.apache.karaf.shell.console', + '//apps/openstacknode/api:onos-apps-openstacknode-api', '//apps/openstacknetworking/api:onos-apps-openstacknetworking-api', '//apps/openstacktelemetry/api:onos-apps-openstacktelemetry-api', '//lib:kafka-clients', diff --git a/apps/openstacktelemetry/app/pom.xml b/apps/openstacktelemetry/app/pom.xml index 5b0f8bfa30..920958eaa7 100644 --- a/apps/openstacktelemetry/app/pom.xml +++ b/apps/openstacktelemetry/app/pom.xml @@ -85,6 +85,12 @@ ${project.version} + + org.onosproject + onos-apps-openstacknode-api + ${project.version} + + org.onosproject onlab-osgi diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java index 228bc2b4d8..2ced23b082 100644 --- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java @@ -25,6 +25,9 @@ import java.util.Set; */ public class TinaMessageByteBufferCodec { + public static final String KAFKA_TOPIC = "sona.flow"; + public static final String KAFKA_KEY = "flowdata"; + private static final int HEADER_SIZE = 8; private static final int ENTRY_SIZE = 88; private static final int MILLISECONDS = 1000; diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java index d1de54a52c..56a0e81a67 100644 --- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java @@ -124,7 +124,7 @@ public class KafkaTelemetryManager implements KafkaTelemetryAdminService { public Future publish(ProducerRecord record) { if (producer == null) { - log.warn("Kafka telemetry service has not been enabled!"); + log.debug("Kafka telemetry service has not been enabled!"); return null; } diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java index da492a9aa1..f3f386f5bb 100644 --- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java @@ -36,6 +36,9 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Set; +import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_KEY; +import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_TOPIC; + /** * Openstack telemetry manager. */ @@ -45,9 +48,6 @@ public class OpenstackTelemetryManager implements OpenstackTelemetryService { private final Logger log = LoggerFactory.getLogger(getClass()); - private static final String KAFKA_TOPIC = "sona.flow"; - private static final String KAFKA_KEY = "flowdata"; - private List telemetryServices = Lists.newArrayList(); @Activate @@ -89,6 +89,8 @@ public class OpenstackTelemetryManager implements OpenstackTelemetryService { if (service instanceof RestTelemetryManager) { invokeRestPublisher((RestTelemetryService) service, flowInfos); } + + log.trace("Publishing Flow Infos {}", flowInfos); }); } diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java index 9025224ea6..a7ead8e491 100644 --- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java @@ -16,7 +16,6 @@ package org.onosproject.openstacktelemetry.impl; import com.google.common.collect.Sets; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Deactivate; @@ -29,12 +28,17 @@ import org.onlab.packet.IpAddress; import org.onlab.packet.IpPrefix; import org.onlab.packet.MacAddress; import org.onlab.packet.VlanId; +import org.onlab.util.SharedScheduledExecutorService; +import org.onlab.util.SharedScheduledExecutors; import org.onosproject.cfg.ComponentConfigService; import org.onosproject.core.ApplicationId; import org.onosproject.core.CoreService; import org.onosproject.mastership.MastershipService; import org.onosproject.net.DeviceId; import org.onosproject.net.Host; +import org.onosproject.net.PortNumber; +import org.onosproject.net.device.DeviceService; +import org.onosproject.net.device.PortStatistics; import org.onosproject.net.flow.DefaultFlowRule; import org.onosproject.net.flow.DefaultTrafficSelector; import org.onosproject.net.flow.DefaultTrafficTreatment; @@ -51,7 +55,11 @@ import org.onosproject.net.flow.criteria.IPProtocolCriterion; import org.onosproject.net.flow.criteria.TcpPortCriterion; import org.onosproject.net.flow.criteria.UdpPortCriterion; import org.onosproject.net.host.HostService; +import org.onosproject.openstacknetworking.api.InstancePort; +import org.onosproject.openstacknetworking.api.InstancePortService; import org.onosproject.openstacknetworking.api.OpenstackNetworkService; +import org.onosproject.openstacknode.api.OpenstackNode; +import org.onosproject.openstacknode.api.OpenstackNodeService; import org.onosproject.openstacktelemetry.api.FlowInfo; import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService; import org.onosproject.openstacktelemetry.api.StatsFlowRule; @@ -62,10 +70,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Dictionary; +import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.onlab.packet.Ethernet.TYPE_IPV4; import static org.onlab.packet.IPv4.PROTOCOL_TCP; @@ -83,6 +93,7 @@ import static org.onosproject.openstacknetworking.api.Constants.STAT_OUTBOUND_TA import static org.onosproject.openstacknetworking.api.Constants.VTAP_FLAT_OUTBOUND_TABLE; import static org.onosproject.openstacknetworking.api.Constants.VTAP_INBOUND_TABLE; import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_TABLE; +import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE; import static org.onosproject.openstacktelemetry.api.Constants.FLAT; import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID; import static org.onosproject.openstacktelemetry.api.Constants.VLAN; @@ -101,16 +112,28 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { private static final byte FLOW_TYPE_SONA = 1; // VLAN private static final long MILLISECONDS = 1000L; + private static final long INITIAL_DELAY = 5L; private static final long REFRESH_INTERVAL = 5L; + private static final TimeUnit TIME_UNIT_SECOND = TimeUnit.SECONDS; private static final String REVERSE_PATH_STATS = "reversePathStats"; private static final String EGRESS_STATS = "egressStats"; + private static final String PORT_STATS = "portStats"; private static final boolean DEFAULT_REVERSE_PATH_STATS = false; private static final boolean DEFAULT_EGRESS_STATS = false; + private static final boolean DEFAULT_PORT_STATS = true; private static final String MAC_NOT_NULL = "MAC should not be null"; + private static final String ARBITRARY_IP = "0.0.0.0/32"; + private static final int ARBITRARY_LENGTH = 32; + private static final String ARBITRARY_MAC = "00:00:00:00:00:00"; + private static final int ARBITRARY_IN_INTF = 0; + private static final int ARBITRARY_OUT_INTF = 0; + + private static final boolean RECOVER_FROM_FAILURE = true; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected CoreService coreService; @@ -120,6 +143,9 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected HostService hostService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected DeviceService deviceService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected ComponentConfigService componentConfigService; @@ -129,6 +155,12 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected OpenstackNetworkService osNetworkService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected InstancePortService instPortService; + + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected OpenstackNodeService osNodeService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected OpenstackTelemetryService telemetryService; @@ -142,12 +174,16 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { "collecting the flow-based stats for egress port.") private boolean egressStats = DEFAULT_EGRESS_STATS; + @Property(name = PORT_STATS, boolValue = DEFAULT_PORT_STATS, + label = "A flag which indicates whether to collect port TX & RX stats.") + private boolean portStats = DEFAULT_PORT_STATS; + private ApplicationId appId; - private Timer timer; - private TimerTask task; + private TelemetryCollector collector; + private SharedScheduledExecutorService executor; + private ScheduledFuture result; private final Set gFlowInfoSet = Sets.newHashSet(); - private int loopCount = 0; private static final int SOURCE_ID = 1; private static final int TARGET_ID = 2; @@ -157,15 +193,12 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { private static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00"); - public StatsFlowRuleManager() { - this.timer = new Timer("openstack-telemetry-sender"); - } - @Activate protected void activate() { appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID); componentConfigService.registerProperties(getClass()); + executor = SharedScheduledExecutors.getSingleThreadExecutor(); this.start(); @@ -177,6 +210,8 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { componentConfigService.unregisterProperties(getClass(), false); + flowRuleService.removeFlowRulesById(appId); + log.info("Stopped"); } @@ -190,16 +225,17 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { @Override public void start() { log.info("Start publishing thread"); - task = new InternalTimerTask(); - timer.scheduleAtFixedRate(task, MILLISECONDS * REFRESH_INTERVAL, - MILLISECONDS * REFRESH_INTERVAL); + collector = new TelemetryCollector(); + + result = executor.scheduleAtFixedRate(collector, INITIAL_DELAY, + REFRESH_INTERVAL, TIME_UNIT_SECOND, RECOVER_FROM_FAILURE); } @Override public void stop() { log.info("Stop data publishing thread"); - task.cancel(); - task = null; + result.cancel(true); + collector = null; } @Override @@ -210,12 +246,221 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { @Override public void deleteStatFlowRule(StatsFlowRule statsFlowRule) { - // FIXME: following code might not be necessary - flowRuleService.removeFlowRulesById(appId); setStatFlowRule(statsFlowRule, false); } + /** + * Gets a set of the flow infos. + * + * @return a set of flow infos + */ + @Override + public Set getFlowInfos() { + Set flowInfos = Sets.newConcurrentHashSet(); + + // obtain all flow rule entries installed by telemetry app + for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) { + FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder(); + TrafficSelector selector = entry.selector(); + + IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC); + IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST); + IPProtocolCriterion ipProtocol = + (IPProtocolCriterion) selector.getCriterion(IP_PROTO); + + log.debug("[FlowInfo] TableID:{} SRC_IP:{} DST_IP:{} Pkt:{} Byte:{}", + ((IndexTableId) entry.table()).id(), + srcIp.ip().toString(), + dstIp.ip().toString(), + entry.packets(), + entry.bytes()); + + fBuilder.withFlowType(FLOW_TYPE_SONA) + .withSrcIp(srcIp.ip()) + .withDstIp(dstIp.ip()); + + if (ipProtocol != null) { + fBuilder.withProtocol((byte) ipProtocol.protocol()); + + if (ipProtocol.protocol() == PROTOCOL_TCP) { + TcpPortCriterion tcpSrc = + (TcpPortCriterion) selector.getCriterion(TCP_SRC); + TcpPortCriterion tcpDst = + (TcpPortCriterion) selector.getCriterion(TCP_DST); + + log.debug("TCP SRC Port: {}, DST Port: {}", + tcpSrc.tcpPort().toInt(), + tcpDst.tcpPort().toInt()); + + fBuilder.withSrcPort(tcpSrc.tcpPort()); + fBuilder.withDstPort(tcpDst.tcpPort()); + + } else if (ipProtocol.protocol() == PROTOCOL_UDP) { + + UdpPortCriterion udpSrc = + (UdpPortCriterion) selector.getCriterion(UDP_SRC); + UdpPortCriterion udpDst = + (UdpPortCriterion) selector.getCriterion(UDP_DST); + + log.debug("UDP SRC Port: {}, DST Port: {}", + udpSrc.udpPort().toInt(), + udpDst.udpPort().toInt()); + + fBuilder.withSrcPort(udpSrc.udpPort()); + fBuilder.withDstPort(udpDst.udpPort()); + } else { + log.debug("Other protocol: {}", ipProtocol.protocol()); + } + } + + fBuilder.withSrcMac(getMacAddress(srcIp.ip().address())) + .withDstMac(getMacAddress(dstIp.ip().address())) + .withInputInterfaceId(getInterfaceId(srcIp.ip().address())) + .withOutputInterfaceId(getInterfaceId(dstIp.ip().address())) + .withVlanId(getVlanId(srcIp.ip().address())) + .withDeviceId(entry.deviceId()); + + StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder(); + + // TODO: need to collect error and drop packets stats + // TODO: need to make the refresh interval configurable + sBuilder.withStartupTime(System.currentTimeMillis()) + .withFstPktArrTime(System.currentTimeMillis()) + .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS)) + .withCurrAccPkts((int) entry.packets()) + .withCurrAccBytes(entry.bytes()) + .withErrorPkts((short) 0) + .withDropPkts((short) 0); + + fBuilder.withStatsInfo(sBuilder.build()); + + FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder); + + flowInfos.add(flowInfo); + + log.debug("FlowInfo: \n{}", flowInfo.toString()); + } + + return flowInfos; + } + + /** + * Gets a set of flow infos by referring to destination VM port. + * + * @return flow infos + */ + private Set getDstPortBasedFlowInfos() { + Set flowInfos = Sets.newConcurrentHashSet(); + Set instPortNums = instPortService.instancePorts() + .stream() + .map(InstancePort::portNumber) + .collect(Collectors.toSet()); + Set deviceIds = osNodeService.completeNodes(COMPUTE) + .stream() + .map(OpenstackNode::intgBridge) + .collect(Collectors.toSet()); + + deviceIds.forEach(d -> { + List stats = + deviceService.getPortStatistics(d) + .stream() + .filter(s -> instPortNums.contains(s.portNumber())) + .collect(Collectors.toList()); + + stats.forEach(s -> { + InstancePort instPort = getInstancePort(d, s.portNumber()); + flowInfos.add(buildTxPortInfo(instPort, s)); + flowInfos.add(buildRxPortInfo(instPort, s)); + }); + }); + + return flowInfos; + } + + /** + * Obtains the flow info generated by TX port. + * + * @param instPort instance port + * @param stat port statistics + * @return flow info + */ + private FlowInfo buildTxPortInfo(InstancePort instPort, PortStatistics stat) { + FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder(); + + fBuilder.withFlowType(FLOW_TYPE_SONA) + .withSrcIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH)) + .withDstIp(IpPrefix.valueOf(ARBITRARY_IP)) + .withSrcMac(instPort.macAddress()) + .withDstMac(MacAddress.valueOf(ARBITRARY_MAC)) + .withDeviceId(instPort.deviceId()) + .withInputInterfaceId(ARBITRARY_IN_INTF) + .withOutputInterfaceId(ARBITRARY_OUT_INTF) + .withVlanId(VlanId.vlanId()); + + StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder(); + sBuilder.withStartupTime(System.currentTimeMillis()) + .withFstPktArrTime(System.currentTimeMillis()) + .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS)) + .withCurrAccPkts((int) stat.packetsSent()) + .withCurrAccBytes(stat.bytesSent()) + .withErrorPkts((short) stat.packetsTxErrors()) + .withDropPkts((short) stat.packetsTxDropped()); + + fBuilder.withStatsInfo(sBuilder.build()); + + return mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder); + } + + /** + * Obtains the flow info generated by RX port. + * + * @param instPort instance port + * @param stat port statistics + * @return flow info + */ + private FlowInfo buildRxPortInfo(InstancePort instPort, PortStatistics stat) { + FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder(); + + fBuilder.withFlowType(FLOW_TYPE_SONA) + .withSrcIp(IpPrefix.valueOf(ARBITRARY_IP)) + .withDstIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH)) + .withSrcMac(MacAddress.valueOf(ARBITRARY_MAC)) + .withDstMac(instPort.macAddress()) + .withDeviceId(instPort.deviceId()) + .withInputInterfaceId(ARBITRARY_IN_INTF) + .withOutputInterfaceId(ARBITRARY_OUT_INTF) + .withVlanId(VlanId.vlanId()); + + StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder(); + sBuilder.withStartupTime(System.currentTimeMillis()) + .withFstPktArrTime(System.currentTimeMillis()) + .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS)) + .withCurrAccPkts((int) stat.packetsReceived()) + .withCurrAccBytes(stat.bytesReceived()) + .withErrorPkts((short) stat.packetsRxErrors()) + .withDropPkts((short) stat.packetsRxDropped()); + + fBuilder.withStatsInfo(sBuilder.build()); + + return mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder); + } + + /** + * Obtains instance port which associated with the given device identifier + * and port number. + * + * @param deviceId device identifier + * @param portNumber port number + * @return instance port + */ + private InstancePort getInstancePort(DeviceId deviceId, PortNumber portNumber) { + return instPortService.instancePorts().stream() + .filter(p -> p.deviceId().equals(deviceId)) + .filter(p -> p.portNumber().equals(portNumber)) + .findFirst().orElse(null); + } + private void connectTables(DeviceId deviceId, int fromTable, int toTable, StatsFlowRule statsFlowRule, int rulePriority, boolean install) { @@ -278,110 +523,18 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() { @Override public void onSuccess(FlowRuleOperations ops) { - log.debug("Provisioned vni or forwarding table: \n {}", ops.toString()); + log.debug("Install rules for telemetry stats: \n {}", + ops.toString()); } @Override public void onError(FlowRuleOperations ops) { - log.debug("Failed to provision vni or forwarding table: \n {}", ops.toString()); + log.debug("Failed to install rules for telemetry stats: \n {}", + ops.toString()); } })); } - /** - * Gets a set of the flow infos. - * - * @return a set of flow infos - */ - public Set getFlowInfo() { - Set flowInfos = Sets.newConcurrentHashSet(); - - // obtain all flow rule entries installed by telemetry app - for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) { - FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder(); - TrafficSelector selector = entry.selector(); - - IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC); - IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST); - IPProtocolCriterion ipProtocol = - (IPProtocolCriterion) selector.getCriterion(IP_PROTO); - - log.debug("[FlowInfo] TableID:{} SRC_IP:{} DST_IP:{} Pkt:{} Byte:{}", - ((IndexTableId) entry.table()).id(), - srcIp.ip().toString(), - dstIp.ip().toString(), - entry.packets(), - entry.bytes()); - - fBuilder.withFlowType(FLOW_TYPE_SONA) - .withSrcIp(srcIp.ip()) - .withDstIp(dstIp.ip()); - - if (ipProtocol != null) { - fBuilder.withProtocol((byte) ipProtocol.protocol()); - - if (ipProtocol.protocol() == PROTOCOL_TCP) { - TcpPortCriterion tcpSrc = - (TcpPortCriterion) selector.getCriterion(TCP_SRC); - TcpPortCriterion tcpDst = - (TcpPortCriterion) selector.getCriterion(TCP_DST); - - log.debug("TCP SRC Port: {}, DST Port: {}", - tcpSrc.tcpPort().toInt(), - tcpDst.tcpPort().toInt()); - - fBuilder.withSrcPort(tcpSrc.tcpPort()); - fBuilder.withDstPort(tcpDst.tcpPort()); - - } else if (ipProtocol.protocol() == PROTOCOL_UDP) { - - UdpPortCriterion udpSrc = - (UdpPortCriterion) selector.getCriterion(UDP_SRC); - UdpPortCriterion udpDst = - (UdpPortCriterion) selector.getCriterion(UDP_DST); - - log.debug("UDP SRC Port: {}, DST Port: {}", - udpSrc.udpPort().toInt(), - udpDst.udpPort().toInt()); - - fBuilder.withSrcPort(udpSrc.udpPort()); - fBuilder.withDstPort(udpDst.udpPort()); - } else { - log.debug("Other protocol: {}", ipProtocol.protocol()); - } - } - - fBuilder.withSrcMac(getMacAddress(srcIp.ip().address())) - .withDstMac(getMacAddress(dstIp.ip().address())) - .withInputInterfaceId(getInterfaceId(srcIp.ip().address())) - .withOutputInterfaceId(getInterfaceId(dstIp.ip().address())) - .withVlanId(getVlanId(srcIp.ip().address())) - .withDeviceId(entry.deviceId()); - - StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder(); - - // TODO: need to collect error and drop packets stats - // TODO: need to make the refresh interval configurable - sBuilder.withStartupTime(System.currentTimeMillis()) - .withFstPktArrTime(System.currentTimeMillis()) - .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS)) - .withCurrAccPkts((int) entry.packets()) - .withCurrAccBytes(entry.bytes()) - .withErrorPkts((short) 0) - .withDropPkts((short) 0); - - fBuilder.withStatsInfo(sBuilder.build()); - - FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder); - - flowInfos.add(flowInfo); - - log.debug("FlowInfo: \n{}", flowInfo.toString()); - } - - return flowInfos; - } - /** * Merges old FlowInfo.StatsInfo and current FlowInfo.StatsInfo. * @@ -515,9 +668,7 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { Optional host = hostService.getHostsByIp(ipAddress).stream().findAny(); return host.map(host1 -> host1.location().deviceId()).orElse(null); } else { - log.warn("Failed to get DeviceID which is connected to {}. " + - "The destination is either a bare-metal or located out of DC", - ipAddress.toString()); + log.debug("No DeviceID is associated to {}", ipAddress.toString()); return null; } } @@ -593,29 +744,54 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { egressStats = egressStatsConfigured; log.info("Configured. Egress stats flag is {}", egressStats); } + + Boolean portStatsConfigured = getBooleanProperty(properties, PORT_STATS); + if (portStatsConfigured == null) { + portStats = DEFAULT_PORT_STATS; + log.info("Port stats flag is NOT " + + "configured, default value is {}", portStats); + } else { + portStats = portStatsConfigured; + log.info("Configured. Port stats flag is {}", portStats); + } } - private class InternalTimerTask extends TimerTask { + private class TelemetryCollector implements Runnable { @Override public void run() { - log.debug("Timer task thread starts ({})", loopCount++); - Set filteredFlowInfos = Sets.newConcurrentHashSet(); // we only let the master controller of the device where the - // stats flow rules are installed send kafka message - getFlowInfo().forEach(f -> { - DeviceId deviceId = getDeviceId(f.srcIp().address()); - if (mastershipService.isLocalMaster(deviceId)) { + // stats flow rules are installed send stats message + getFlowInfos().forEach(f -> { + if (checkSrcDstLocalMaster(f)) { filteredFlowInfos.add(f); } }); - try { - telemetryService.publish(filteredFlowInfos); - } catch (Exception ex) { - log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex)); + // we only let the master controller of the device where the port + // is located to send stats message + if (portStats) { + getDstPortBasedFlowInfos().forEach(f -> { + if (checkSrcDstLocalMaster(f)) { + filteredFlowInfos.add(f); + } + }); } + + telemetryService.publish(filteredFlowInfos); + } + + private boolean checkSrcDstLocalMaster(FlowInfo info) { + DeviceId srcDeviceId = getDeviceId(info.srcIp().address()); + DeviceId dstDeviceId = getDeviceId(info.dstIp().address()); + + boolean isSrcLocalMaster = srcDeviceId != null && + mastershipService.isLocalMaster(srcDeviceId); + boolean isDstLocalMaster = dstDeviceId != null && + mastershipService.isLocalMaster(dstDeviceId); + + return isSrcLocalMaster || isDstLocalMaster; } } } diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java index 2efab8b719..419b78951d 100644 --- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -98,6 +99,9 @@ public class OpenstackTelemetryWebResource extends AbstractWebResource { * @param input openstack flow rule JSON input stream * @return 200 OK if processing is correct. */ + @DELETE + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) public Response deleteBulkFlowRule(InputStream input) { log.info("DELETE BULK FLOW RULE: {}", input.toString()); @@ -132,7 +136,7 @@ public class OpenstackTelemetryWebResource extends AbstractWebResource { log.info("GET BULK FLOW RULE"); Set flowInfoSet; - flowInfoSet = statsFlowRuleService.getFlowInfo(); + flowInfoSet = statsFlowRuleService.getFlowInfos(); log.info("\n\n======================================================\n" + "FlowInfo Set: \n{}" + @@ -151,11 +155,10 @@ public class OpenstackTelemetryWebResource extends AbstractWebResource { } @GET - @Path("list/{src_ip_prefix}/{dst_ip_prefix}") + @Path("list/{srcIpPrefix}/{dstIpPrefix}") @Produces(MediaType.APPLICATION_JSON) - public Response getFlowRule( - @PathParam("src_ip_prefix") String srcIpPrefix, - @PathParam("dst_ip_prefix") String dstIpPrefix) { + public Response getFlowRule(@PathParam("srcIpPrefix") String srcIpPrefix, + @PathParam("dstIpPrefix") String dstIpPrefix) { return ok(root).build(); }