From c98872586e12fa7d9fc8b5ab543be759bf6a67de Mon Sep 17 00:00:00 2001 From: Yuichi Ito Date: Wed, 6 Nov 2013 17:16:57 +0900 Subject: [PATCH] packet lib: add unittests that use default parameters of IPv4/IPv6 and ICMP/ICMPv6 Signed-off-by: Yuichi Ito Signed-off-by: FUJITA Tomonori --- ryu/tests/unit/packet/test_packet.py | 249 +++++++++++++++++++++++++++ 1 file changed, 249 insertions(+) diff --git a/ryu/tests/unit/packet/test_packet.py b/ryu/tests/unit/packet/test_packet.py index a97c6e85..d5c0375c 100644 --- a/ryu/tests/unit/packet/test_packet.py +++ b/ryu/tests/unit/packet/test_packet.py @@ -719,6 +719,141 @@ class TestPacket(unittest.TestCase): eq_(pkt_str, str(pkt)) eq_(pkt_str, repr(pkt)) + def test_ipv4_icmp(self): + # buid packet + e = ethernet.ethernet(self.dst_mac, self.src_mac, + ether.ETH_TYPE_IP) + ip = ipv4.ipv4(proto=inet.IPPROTO_ICMP) + ic = icmp.icmp() + + p = e/ip/ic + p.serialize() + + ipaddr = addrconv.ipv4.text_to_bin('0.0.0.0') + + # ethernet !6s6sH + e_buf = self.dst_mac_bin \ + + self.src_mac_bin \ + + '\x08\x00' + + # ipv4 !BBHHHBBHII + ip_buf = '\x45' \ + + '\x00' \ + + '\x00\x1c' \ + + '\x00\x00' \ + + '\x00\x00' \ + + '\xff' \ + + '\x01' \ + + '\x00\x00' \ + + ipaddr \ + + ipaddr + + # icmp !BBH + echo !HH + ic_buf = '\x08' \ + + '\x00' \ + + '\x00\x00' \ + + '\x00\x00' \ + + '\x00\x00' + + buf = e_buf + ip_buf + ic_buf + + # parse + pkt = packet.Packet(array.array('B', p.data)) + protocols = self.get_protocols(pkt) + p_eth = protocols['ethernet'] + p_ipv4 = protocols['ipv4'] + p_icmp = protocols['icmp'] + + # ethernet + ok_(p_eth) + eq_(self.dst_mac, p_eth.dst) + eq_(self.src_mac, p_eth.src) + eq_(ether.ETH_TYPE_IP, p_eth.ethertype) + + # ipv4 + ok_(p_ipv4) + eq_(4, p_ipv4.version) + eq_(5, p_ipv4.header_length) + eq_(0, p_ipv4.tos) + l = len(ip_buf) + len(ic_buf) + eq_(l, p_ipv4.total_length) + eq_(0, p_ipv4.identification) + eq_(0, p_ipv4.flags) + eq_(255, p_ipv4.ttl) + eq_(inet.IPPROTO_ICMP, p_ipv4.proto) + eq_('0.0.0.0', p_ipv4.src) + eq_('0.0.0.0', p_ipv4.dst) + t = bytearray(ip_buf) + struct.pack_into('!H', t, 10, p_ipv4.csum) + eq_(packet_utils.checksum(t), 0) + + # icmp + ok_(p_icmp) + eq_(8, p_icmp.type) + eq_(0, p_icmp.code) + eq_(0, p_icmp.data.id) + eq_(0, p_icmp.data.seq) + eq_(len(ic_buf), len(p_icmp)) + t = bytearray(ic_buf) + struct.pack_into('!H', t, 2, p_icmp.csum) + eq_(packet_utils.checksum(t), 0) + + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_IP} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, _ in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + ipv4_values = {'version': 4, + 'header_length': 5, + 'tos': 0, + 'total_length': l, + 'identification': 0, + 'flags': 0, + 'offset': p_ipv4.offset, + 'ttl': 255, + 'proto': inet.IPPROTO_ICMP, + 'csum': p_ipv4.csum, + 'src': '0.0.0.0', + 'dst': '0.0.0.0', + 'option': None} + _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k])) + for k, _ in inspect.getmembers(p_ipv4) + if k in ipv4_values]) + ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str) + + echo_values = {'id': 0, + 'seq': 0, + 'data': None} + _echo_str = ','.join(['%s=%s' % (k, repr(echo_values[k])) + for k in sorted(echo_values.keys())]) + echo_str = '%s(%s)' % (icmp.echo.__name__, _echo_str) + icmp_values = {'type': 8, + 'code': 0, + 'csum': p_icmp.csum, + 'data': echo_str} + _icmp_str = ','.join(['%s=%s' % (k, icmp_values[k]) + for k, _ in inspect.getmembers(p_icmp) + if k in icmp_values]) + icmp_str = '%s(%s)' % (icmp.icmp.__name__, _icmp_str) + + pkt_str = '%s, %s, %s' % (eth_str, ipv4_str, icmp_str) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(ipv4_str, str(p_ipv4)) + eq_(ipv4_str, repr(p_ipv4)) + + eq_(icmp_str, str(p_icmp)) + eq_(icmp_str, repr(p_icmp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_ipv6_udp(self): # build packet e = ethernet.ethernet(self.dst_mac, self.src_mac, @@ -1124,6 +1259,120 @@ class TestPacket(unittest.TestCase): eq_(pkt_str, str(pkt)) eq_(pkt_str, repr(pkt)) + def test_ipv6_icmpv6(self): + # build packet + e = ethernet.ethernet(self.dst_mac, self.src_mac, + ether.ETH_TYPE_IPV6) + ip = ipv6.ipv6(nxt=inet.IPPROTO_ICMPV6) + ic = icmpv6.icmpv6() + + p = e/ip/ic + p.serialize() + + ipaddr = addrconv.ipv6.text_to_bin('::') + + # ethernet !6s6sH + e_buf = self.dst_mac_bin \ + + self.src_mac_bin \ + + '\x86\xdd' + + # ipv6 !IHBB16s16s' + ip_buf = '\x60\x00\x00\x00' \ + + '\x00\x00' \ + + '\x3a' \ + + '\x00' \ + + '\x00\x00' \ + + ipaddr \ + + ipaddr + + # icmpv6 !BBH + ic_buf = '\x00' \ + + '\x00' \ + + '\x00\x00' + + buf = e_buf + ip_buf + ic_buf + + # parse + pkt = packet.Packet(array.array('B', p.data)) + protocols = self.get_protocols(pkt) + p_eth = protocols['ethernet'] + p_ipv6 = protocols['ipv6'] + p_icmpv6 = protocols['icmpv6'] + + # ethernet + ok_(p_eth) + eq_(self.dst_mac, p_eth.dst) + eq_(self.src_mac, p_eth.src) + eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype) + + # ipv6 + ok_(p_ipv6) + eq_(6, p_ipv6.version) + eq_(0, p_ipv6.traffic_class) + eq_(0, p_ipv6.flow_label) + eq_(len(ic_buf), p_ipv6.payload_length) + eq_(inet.IPPROTO_ICMPV6, p_ipv6.nxt) + eq_(0, p_ipv6.hop_limit) + eq_('::', p_ipv6.src) + eq_('::', p_ipv6.dst) + + # icmpv6 + ok_(p_icmpv6) + eq_(0, p_icmpv6.type_) + eq_(0, p_icmpv6.code) + eq_(len(ic_buf), len(p_icmpv6)) + t = bytearray(ic_buf) + struct.pack_into('!H', t, 2, p_icmpv6.csum) + ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr, len(ic_buf), 58) + t = ph + t + eq_(packet_utils.checksum(t), 0) + + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_IPV6} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, _ in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + ipv6_values = {'version': 6, + 'traffic_class': 0, + 'flow_label': 0, + 'payload_length': len(ic_buf), + 'nxt': inet.IPPROTO_ICMPV6, + 'hop_limit': 0, + 'src': '::', + 'dst': '::', + 'ext_hdrs': []} + _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k])) + for k, _ in inspect.getmembers(p_ipv6) + if k in ipv6_values]) + ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str) + + icmpv6_values = {'type_': 0, + 'code': 0, + 'csum': p_icmpv6.csum, + 'data': None} + _icmpv6_str = ','.join(['%s=%s' % (k, repr(icmpv6_values[k])) + for k, _ in inspect.getmembers(p_icmpv6) + if k in icmpv6_values]) + icmpv6_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _icmpv6_str) + + pkt_str = '%s, %s, %s' % (eth_str, ipv6_str, icmpv6_str) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(ipv6_str, str(p_ipv6)) + eq_(ipv6_str, repr(p_ipv6)) + + eq_(icmpv6_str, str(p_icmpv6)) + eq_(icmpv6_str, repr(p_icmpv6)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_llc_bpdu(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac,