Fix: resolve NPE caused by non-existence of IP protocol

Change-Id: I8f2233637986089b9347da03221db2852faa7fe7
This commit is contained in:
Jian Li 2018-06-27 22:29:14 +09:00
parent 41c652c6ef
commit 85573f4dec
4 changed files with 94 additions and 47 deletions

View File

@ -41,11 +41,21 @@ public class TinaFlowInfoByteBufferCodec extends ByteBufferCodec<FlowInfo> {
public ByteBuffer encode(FlowInfo flowInfo) {
ByteBuffer byteBuffer = ByteBuffer.allocate(MESSAGE_SIZE);
int srcPort = 0;
int dstPort = 0;
String deviceId = flowInfo.deviceId().toString();
short switchId = (short) Integer.parseInt(deviceId.substring(3,
deviceId.length()), 16);
if (flowInfo.srcPort() != null) {
srcPort = flowInfo.srcPort().toInt();
}
if (flowInfo.dstPort() != null) {
dstPort = flowInfo.dstPort().toInt();
}
byteBuffer.put(flowInfo.flowType())
.putShort(switchId)
.putInt(flowInfo.inputInterfaceId())
@ -53,10 +63,10 @@ public class TinaFlowInfoByteBufferCodec extends ByteBufferCodec<FlowInfo> {
.putShort(flowInfo.vlanId().toShort())
.put(flowInfo.srcIp().address().toOctets())
.put((byte) flowInfo.srcIp().prefixLength())
.putShort((short) flowInfo.srcPort().toInt())
.putShort((short) srcPort)
.put(flowInfo.dstIp().address().toOctets())
.put((byte) flowInfo.dstIp().prefixLength())
.putShort((short) flowInfo.dstPort().toInt())
.putShort((short) dstPort)
.put(flowInfo.protocol())
.put(flowInfo.srcMac().toBytes())
.put(flowInfo.dstMac().toBytes());

View File

@ -142,12 +142,13 @@ public final class DefaultFlowInfo implements FlowInfo {
@Override
public boolean roughEquals(FlowInfo flowInfo) {
return deviceId.equals(flowInfo.deviceId()) &&
srcIp.equals(flowInfo.srcIp()) &&
dstIp.equals(flowInfo.dstIp()) &&
srcPort.equals(flowInfo.srcPort()) &&
dstPort.equals(flowInfo.dstPort()) &&
(protocol == flowInfo.protocol());
final DefaultFlowInfo other = (DefaultFlowInfo) flowInfo;
return Objects.equals(this.deviceId, other.deviceId) &&
Objects.equals(this.srcIp, other.srcIp) &&
Objects.equals(this.dstIp, other.dstIp) &&
Objects.equals(this.srcPort, other.srcPort) &&
Objects.equals(this.dstPort, other.dstPort) &&
Objects.equals(this.protocol, other.protocol);
}
@Override

View File

@ -128,6 +128,7 @@ public class KafkaTelemetryManager implements KafkaTelemetryAdminService {
return null;
}
log.debug("Send telemetry record to kafka server...");
return producer.send(record);
}

View File

@ -28,7 +28,7 @@ import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.Device;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
import org.onosproject.net.device.DeviceService;
@ -56,6 +56,7 @@ import org.onosproject.openstacktelemetry.api.StatsInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@ -104,6 +105,9 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenstackTelemetryService telemetryService;
@ -275,37 +279,40 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
fBuilder.withFlowType(FLOW_TYPE_SONA)
.withSrcIp(srcIp.ip())
.withDstIp(dstIp.ip())
.withProtocol((byte) ipProtocol.protocol());
.withDstIp(dstIp.ip());
if (ipProtocol.protocol() == PROTOCOL_TCP) {
TcpPortCriterion tcpSrc =
(TcpPortCriterion) selector.getCriterion(TCP_SRC);
TcpPortCriterion tcpDst =
(TcpPortCriterion) selector.getCriterion(TCP_DST);
if (ipProtocol != null) {
fBuilder.withProtocol((byte) ipProtocol.protocol());
log.debug("TCP SRC Port: {}, DST Port: {}",
tcpSrc.tcpPort().toInt(),
tcpDst.tcpPort().toInt());
if (ipProtocol.protocol() == PROTOCOL_TCP) {
TcpPortCriterion tcpSrc =
(TcpPortCriterion) selector.getCriterion(TCP_SRC);
TcpPortCriterion tcpDst =
(TcpPortCriterion) selector.getCriterion(TCP_DST);
fBuilder.withSrcPort(tcpSrc.tcpPort());
fBuilder.withDstPort(tcpDst.tcpPort());
log.debug("TCP SRC Port: {}, DST Port: {}",
tcpSrc.tcpPort().toInt(),
tcpDst.tcpPort().toInt());
} else if (ipProtocol.protocol() == PROTOCOL_UDP) {
fBuilder.withSrcPort(tcpSrc.tcpPort());
fBuilder.withDstPort(tcpDst.tcpPort());
UdpPortCriterion udpSrc =
(UdpPortCriterion) selector.getCriterion(UDP_SRC);
UdpPortCriterion udpDst =
(UdpPortCriterion) selector.getCriterion(UDP_DST);
} else if (ipProtocol.protocol() == PROTOCOL_UDP) {
log.debug("UDP SRC Port: {}, DST Port: {}",
udpSrc.udpPort().toInt(),
udpDst.udpPort().toInt());
UdpPortCriterion udpSrc =
(UdpPortCriterion) selector.getCriterion(UDP_SRC);
UdpPortCriterion udpDst =
(UdpPortCriterion) selector.getCriterion(UDP_DST);
fBuilder.withSrcPort(udpSrc.udpPort());
fBuilder.withDstPort(udpDst.udpPort());
} else {
log.debug("Other protocol: {}", ipProtocol.protocol());
log.debug("UDP SRC Port: {}, DST Port: {}",
udpSrc.udpPort().toInt(),
udpDst.udpPort().toInt());
fBuilder.withSrcPort(udpSrc.udpPort());
fBuilder.withDstPort(udpDst.udpPort());
} else {
log.debug("Other protocol: {}", ipProtocol.protocol());
}
}
fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
@ -363,13 +370,13 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
gFlowInfoSet.remove(gFlowInfo);
gFlowInfoSet.add(newFlowInfo);
log.info("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
log.debug("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
return newFlowInfo;
}
}
// No such record, then build the FlowInfo object and return this object.
log.info("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
log.debug("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
gFlowInfoSet.add(newFlowInfo);
return newFlowInfo;
@ -384,18 +391,34 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
.dstTpPort(statsFlowRule.srcTpPort())
.build();
// FIXME: install stat flow rules for all devices for now
// need to query the device where the host with the given IP located
for (Device d : deviceService.getDevices()) {
if (d.type() == Device.Type.CONTROLLER) {
log.info("Not provide stats for 'CONTROLLER' ({})", d.id().toString());
continue;
}
DeviceId srcDeviceId = getDeviceId(statsFlowRule.srcIpPrefix().address());
DeviceId dstDeviceId = getDeviceId(statsFlowRule.dstIpPrefix().address());
connectTables(d.id(), STAT_INBOUND_TABLE, DHCP_ARP_TABLE,
statsFlowRule, METRIC_PRIORITY_SOURCE, install);
connectTables(d.id(), STAT_OUTBOUND_TABLE, FORWARDING_TABLE,
inverseFlowRule, METRIC_PRIORITY_TARGET, install);
if (srcDeviceId == null || dstDeviceId == null) {
return;
}
connectTables(srcDeviceId, STAT_INBOUND_TABLE, DHCP_ARP_TABLE,
statsFlowRule, METRIC_PRIORITY_SOURCE, install);
connectTables(dstDeviceId, STAT_OUTBOUND_TABLE, FORWARDING_TABLE,
inverseFlowRule, METRIC_PRIORITY_TARGET, install);
}
/**
* Get Device ID which the VM is located.
*
* @param ipAddress IP Address of host
* @return Device ID
*/
private DeviceId getDeviceId(IpAddress ipAddress) {
if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
Optional<Host> host = hostService.getHostsByIp(ipAddress).stream().findAny();
return host.map(host1 -> host1.location().deviceId()).orElse(null);
} else {
log.error("Failed to get DeviceID which is connected to {}. " +
"The VM is not instantiated correctly now.",
ipAddress.toString());
return null;
}
}
@ -446,8 +469,20 @@ public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
@Override
public void run() {
log.debug("Timer Task Thread Starts ({})", loopCount++);
Set<FlowInfo> filteredFlowInfos = Sets.newConcurrentHashSet();
// we only let the master controller of the device where the
// stats flow rules are installed send kafka message
getFlowInfo().forEach(f -> {
DeviceId deviceId = getDeviceId(f.srcIp().address());
if (mastershipService.isLocalMaster(deviceId)) {
filteredFlowInfos.add(f);
}
});
try {
telemetryService.publish(getFlowInfo());
telemetryService.publish(filteredFlowInfos);
} catch (Exception ex) {
log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
}