diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java index 8b28797078..73258aca90 100644 --- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java @@ -41,11 +41,21 @@ public class TinaFlowInfoByteBufferCodec extends ByteBufferCodec { public ByteBuffer encode(FlowInfo flowInfo) { ByteBuffer byteBuffer = ByteBuffer.allocate(MESSAGE_SIZE); + int srcPort = 0; + int dstPort = 0; String deviceId = flowInfo.deviceId().toString(); short switchId = (short) Integer.parseInt(deviceId.substring(3, deviceId.length()), 16); + if (flowInfo.srcPort() != null) { + srcPort = flowInfo.srcPort().toInt(); + } + + if (flowInfo.dstPort() != null) { + dstPort = flowInfo.dstPort().toInt(); + } + byteBuffer.put(flowInfo.flowType()) .putShort(switchId) .putInt(flowInfo.inputInterfaceId()) @@ -53,10 +63,10 @@ public class TinaFlowInfoByteBufferCodec extends ByteBufferCodec { .putShort(flowInfo.vlanId().toShort()) .put(flowInfo.srcIp().address().toOctets()) .put((byte) flowInfo.srcIp().prefixLength()) - .putShort((short) flowInfo.srcPort().toInt()) + .putShort((short) srcPort) .put(flowInfo.dstIp().address().toOctets()) .put((byte) flowInfo.dstIp().prefixLength()) - .putShort((short) flowInfo.dstPort().toInt()) + .putShort((short) dstPort) .put(flowInfo.protocol()) .put(flowInfo.srcMac().toBytes()) .put(flowInfo.dstMac().toBytes()); diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java index 3f56362a55..a631abc8b4 100644 --- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java @@ -142,12 +142,13 @@ public final class DefaultFlowInfo implements FlowInfo { @Override public boolean roughEquals(FlowInfo flowInfo) { - return deviceId.equals(flowInfo.deviceId()) && - srcIp.equals(flowInfo.srcIp()) && - dstIp.equals(flowInfo.dstIp()) && - srcPort.equals(flowInfo.srcPort()) && - dstPort.equals(flowInfo.dstPort()) && - (protocol == flowInfo.protocol()); + final DefaultFlowInfo other = (DefaultFlowInfo) flowInfo; + return Objects.equals(this.deviceId, other.deviceId) && + Objects.equals(this.srcIp, other.srcIp) && + Objects.equals(this.dstIp, other.dstIp) && + Objects.equals(this.srcPort, other.srcPort) && + Objects.equals(this.dstPort, other.dstPort) && + Objects.equals(this.protocol, other.protocol); } @Override diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java index 8d6f62f6db..d1de54a52c 100644 --- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java @@ -128,6 +128,7 @@ public class KafkaTelemetryManager implements KafkaTelemetryAdminService { return null; } + log.debug("Send telemetry record to kafka server..."); return producer.send(record); } diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java index 54f454eb40..3d609c609c 100644 --- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java +++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java @@ -28,7 +28,7 @@ import org.onlab.packet.MacAddress; import org.onlab.packet.VlanId; import org.onosproject.core.ApplicationId; import org.onosproject.core.CoreService; -import org.onosproject.net.Device; +import org.onosproject.mastership.MastershipService; import org.onosproject.net.DeviceId; import org.onosproject.net.Host; import org.onosproject.net.device.DeviceService; @@ -56,6 +56,7 @@ import org.onosproject.openstacktelemetry.api.StatsInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -104,6 +105,9 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected DeviceService deviceService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) + protected MastershipService mastershipService; + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) protected OpenstackTelemetryService telemetryService; @@ -275,37 +279,40 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { fBuilder.withFlowType(FLOW_TYPE_SONA) .withSrcIp(srcIp.ip()) - .withDstIp(dstIp.ip()) - .withProtocol((byte) ipProtocol.protocol()); + .withDstIp(dstIp.ip()); - if (ipProtocol.protocol() == PROTOCOL_TCP) { - TcpPortCriterion tcpSrc = - (TcpPortCriterion) selector.getCriterion(TCP_SRC); - TcpPortCriterion tcpDst = - (TcpPortCriterion) selector.getCriterion(TCP_DST); + if (ipProtocol != null) { + fBuilder.withProtocol((byte) ipProtocol.protocol()); - log.debug("TCP SRC Port: {}, DST Port: {}", - tcpSrc.tcpPort().toInt(), - tcpDst.tcpPort().toInt()); + if (ipProtocol.protocol() == PROTOCOL_TCP) { + TcpPortCriterion tcpSrc = + (TcpPortCriterion) selector.getCriterion(TCP_SRC); + TcpPortCriterion tcpDst = + (TcpPortCriterion) selector.getCriterion(TCP_DST); - fBuilder.withSrcPort(tcpSrc.tcpPort()); - fBuilder.withDstPort(tcpDst.tcpPort()); + log.debug("TCP SRC Port: {}, DST Port: {}", + tcpSrc.tcpPort().toInt(), + tcpDst.tcpPort().toInt()); - } else if (ipProtocol.protocol() == PROTOCOL_UDP) { + fBuilder.withSrcPort(tcpSrc.tcpPort()); + fBuilder.withDstPort(tcpDst.tcpPort()); - UdpPortCriterion udpSrc = - (UdpPortCriterion) selector.getCriterion(UDP_SRC); - UdpPortCriterion udpDst = - (UdpPortCriterion) selector.getCriterion(UDP_DST); + } else if (ipProtocol.protocol() == PROTOCOL_UDP) { - log.debug("UDP SRC Port: {}, DST Port: {}", - udpSrc.udpPort().toInt(), - udpDst.udpPort().toInt()); + UdpPortCriterion udpSrc = + (UdpPortCriterion) selector.getCriterion(UDP_SRC); + UdpPortCriterion udpDst = + (UdpPortCriterion) selector.getCriterion(UDP_DST); - fBuilder.withSrcPort(udpSrc.udpPort()); - fBuilder.withDstPort(udpDst.udpPort()); - } else { - log.debug("Other protocol: {}", ipProtocol.protocol()); + log.debug("UDP SRC Port: {}, DST Port: {}", + udpSrc.udpPort().toInt(), + udpDst.udpPort().toInt()); + + fBuilder.withSrcPort(udpSrc.udpPort()); + fBuilder.withDstPort(udpDst.udpPort()); + } else { + log.debug("Other protocol: {}", ipProtocol.protocol()); + } } fBuilder.withSrcMac(getMacAddress(srcIp.ip().address())) @@ -363,13 +370,13 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { gFlowInfoSet.remove(gFlowInfo); gFlowInfoSet.add(newFlowInfo); - log.info("Old FlowInfo found, Merge this {}", newFlowInfo.toString()); + log.debug("Old FlowInfo found, Merge this {}", newFlowInfo.toString()); return newFlowInfo; } } // No such record, then build the FlowInfo object and return this object. - log.info("No FlowInfo found, add new FlowInfo {}", flowInfo.toString()); + log.debug("No FlowInfo found, add new FlowInfo {}", flowInfo.toString()); FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build(); gFlowInfoSet.add(newFlowInfo); return newFlowInfo; @@ -384,18 +391,34 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { .dstTpPort(statsFlowRule.srcTpPort()) .build(); - // FIXME: install stat flow rules for all devices for now - // need to query the device where the host with the given IP located - for (Device d : deviceService.getDevices()) { - if (d.type() == Device.Type.CONTROLLER) { - log.info("Not provide stats for 'CONTROLLER' ({})", d.id().toString()); - continue; - } + DeviceId srcDeviceId = getDeviceId(statsFlowRule.srcIpPrefix().address()); + DeviceId dstDeviceId = getDeviceId(statsFlowRule.dstIpPrefix().address()); - connectTables(d.id(), STAT_INBOUND_TABLE, DHCP_ARP_TABLE, - statsFlowRule, METRIC_PRIORITY_SOURCE, install); - connectTables(d.id(), STAT_OUTBOUND_TABLE, FORWARDING_TABLE, - inverseFlowRule, METRIC_PRIORITY_TARGET, install); + if (srcDeviceId == null || dstDeviceId == null) { + return; + } + + connectTables(srcDeviceId, STAT_INBOUND_TABLE, DHCP_ARP_TABLE, + statsFlowRule, METRIC_PRIORITY_SOURCE, install); + connectTables(dstDeviceId, STAT_OUTBOUND_TABLE, FORWARDING_TABLE, + inverseFlowRule, METRIC_PRIORITY_TARGET, install); + } + + /** + * Get Device ID which the VM is located. + * + * @param ipAddress IP Address of host + * @return Device ID + */ + private DeviceId getDeviceId(IpAddress ipAddress) { + if (!hostService.getHostsByIp(ipAddress).isEmpty()) { + Optional host = hostService.getHostsByIp(ipAddress).stream().findAny(); + return host.map(host1 -> host1.location().deviceId()).orElse(null); + } else { + log.error("Failed to get DeviceID which is connected to {}. " + + "The VM is not instantiated correctly now.", + ipAddress.toString()); + return null; } } @@ -446,8 +469,20 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService { @Override public void run() { log.debug("Timer Task Thread Starts ({})", loopCount++); + + Set filteredFlowInfos = Sets.newConcurrentHashSet(); + + // we only let the master controller of the device where the + // stats flow rules are installed send kafka message + getFlowInfo().forEach(f -> { + DeviceId deviceId = getDeviceId(f.srcIp().address()); + if (mastershipService.isLocalMaster(deviceId)) { + filteredFlowInfos.add(f); + } + }); + try { - telemetryService.publish(getFlowInfo()); + telemetryService.publish(filteredFlowInfos); } catch (Exception ex) { log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex)); }