mirror of
https://github.com/faucetsdn/ryu.git
synced 2026-05-08 13:56:09 +02:00
packet lib: add unittests that use default parameters of IPv4/IPv6 and ICMP/ICMPv6
Signed-off-by: Yuichi Ito <ito.yuichi0@gmail.com> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
parent
fe011d7fc1
commit
c98872586e
@ -719,6 +719,141 @@ class TestPacket(unittest.TestCase):
|
||||
eq_(pkt_str, str(pkt))
|
||||
eq_(pkt_str, repr(pkt))
|
||||
|
||||
def test_ipv4_icmp(self):
|
||||
# buid packet
|
||||
e = ethernet.ethernet(self.dst_mac, self.src_mac,
|
||||
ether.ETH_TYPE_IP)
|
||||
ip = ipv4.ipv4(proto=inet.IPPROTO_ICMP)
|
||||
ic = icmp.icmp()
|
||||
|
||||
p = e/ip/ic
|
||||
p.serialize()
|
||||
|
||||
ipaddr = addrconv.ipv4.text_to_bin('0.0.0.0')
|
||||
|
||||
# ethernet !6s6sH
|
||||
e_buf = self.dst_mac_bin \
|
||||
+ self.src_mac_bin \
|
||||
+ '\x08\x00'
|
||||
|
||||
# ipv4 !BBHHHBBHII
|
||||
ip_buf = '\x45' \
|
||||
+ '\x00' \
|
||||
+ '\x00\x1c' \
|
||||
+ '\x00\x00' \
|
||||
+ '\x00\x00' \
|
||||
+ '\xff' \
|
||||
+ '\x01' \
|
||||
+ '\x00\x00' \
|
||||
+ ipaddr \
|
||||
+ ipaddr
|
||||
|
||||
# icmp !BBH + echo !HH
|
||||
ic_buf = '\x08' \
|
||||
+ '\x00' \
|
||||
+ '\x00\x00' \
|
||||
+ '\x00\x00' \
|
||||
+ '\x00\x00'
|
||||
|
||||
buf = e_buf + ip_buf + ic_buf
|
||||
|
||||
# parse
|
||||
pkt = packet.Packet(array.array('B', p.data))
|
||||
protocols = self.get_protocols(pkt)
|
||||
p_eth = protocols['ethernet']
|
||||
p_ipv4 = protocols['ipv4']
|
||||
p_icmp = protocols['icmp']
|
||||
|
||||
# ethernet
|
||||
ok_(p_eth)
|
||||
eq_(self.dst_mac, p_eth.dst)
|
||||
eq_(self.src_mac, p_eth.src)
|
||||
eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
|
||||
|
||||
# ipv4
|
||||
ok_(p_ipv4)
|
||||
eq_(4, p_ipv4.version)
|
||||
eq_(5, p_ipv4.header_length)
|
||||
eq_(0, p_ipv4.tos)
|
||||
l = len(ip_buf) + len(ic_buf)
|
||||
eq_(l, p_ipv4.total_length)
|
||||
eq_(0, p_ipv4.identification)
|
||||
eq_(0, p_ipv4.flags)
|
||||
eq_(255, p_ipv4.ttl)
|
||||
eq_(inet.IPPROTO_ICMP, p_ipv4.proto)
|
||||
eq_('0.0.0.0', p_ipv4.src)
|
||||
eq_('0.0.0.0', p_ipv4.dst)
|
||||
t = bytearray(ip_buf)
|
||||
struct.pack_into('!H', t, 10, p_ipv4.csum)
|
||||
eq_(packet_utils.checksum(t), 0)
|
||||
|
||||
# icmp
|
||||
ok_(p_icmp)
|
||||
eq_(8, p_icmp.type)
|
||||
eq_(0, p_icmp.code)
|
||||
eq_(0, p_icmp.data.id)
|
||||
eq_(0, p_icmp.data.seq)
|
||||
eq_(len(ic_buf), len(p_icmp))
|
||||
t = bytearray(ic_buf)
|
||||
struct.pack_into('!H', t, 2, p_icmp.csum)
|
||||
eq_(packet_utils.checksum(t), 0)
|
||||
|
||||
# to string
|
||||
eth_values = {'dst': self.dst_mac,
|
||||
'src': self.src_mac,
|
||||
'ethertype': ether.ETH_TYPE_IP}
|
||||
_eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
|
||||
for k, _ in inspect.getmembers(p_eth)
|
||||
if k in eth_values])
|
||||
eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
|
||||
|
||||
ipv4_values = {'version': 4,
|
||||
'header_length': 5,
|
||||
'tos': 0,
|
||||
'total_length': l,
|
||||
'identification': 0,
|
||||
'flags': 0,
|
||||
'offset': p_ipv4.offset,
|
||||
'ttl': 255,
|
||||
'proto': inet.IPPROTO_ICMP,
|
||||
'csum': p_ipv4.csum,
|
||||
'src': '0.0.0.0',
|
||||
'dst': '0.0.0.0',
|
||||
'option': None}
|
||||
_ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
|
||||
for k, _ in inspect.getmembers(p_ipv4)
|
||||
if k in ipv4_values])
|
||||
ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
|
||||
|
||||
echo_values = {'id': 0,
|
||||
'seq': 0,
|
||||
'data': None}
|
||||
_echo_str = ','.join(['%s=%s' % (k, repr(echo_values[k]))
|
||||
for k in sorted(echo_values.keys())])
|
||||
echo_str = '%s(%s)' % (icmp.echo.__name__, _echo_str)
|
||||
icmp_values = {'type': 8,
|
||||
'code': 0,
|
||||
'csum': p_icmp.csum,
|
||||
'data': echo_str}
|
||||
_icmp_str = ','.join(['%s=%s' % (k, icmp_values[k])
|
||||
for k, _ in inspect.getmembers(p_icmp)
|
||||
if k in icmp_values])
|
||||
icmp_str = '%s(%s)' % (icmp.icmp.__name__, _icmp_str)
|
||||
|
||||
pkt_str = '%s, %s, %s' % (eth_str, ipv4_str, icmp_str)
|
||||
|
||||
eq_(eth_str, str(p_eth))
|
||||
eq_(eth_str, repr(p_eth))
|
||||
|
||||
eq_(ipv4_str, str(p_ipv4))
|
||||
eq_(ipv4_str, repr(p_ipv4))
|
||||
|
||||
eq_(icmp_str, str(p_icmp))
|
||||
eq_(icmp_str, repr(p_icmp))
|
||||
|
||||
eq_(pkt_str, str(pkt))
|
||||
eq_(pkt_str, repr(pkt))
|
||||
|
||||
def test_ipv6_udp(self):
|
||||
# build packet
|
||||
e = ethernet.ethernet(self.dst_mac, self.src_mac,
|
||||
@ -1124,6 +1259,120 @@ class TestPacket(unittest.TestCase):
|
||||
eq_(pkt_str, str(pkt))
|
||||
eq_(pkt_str, repr(pkt))
|
||||
|
||||
def test_ipv6_icmpv6(self):
|
||||
# build packet
|
||||
e = ethernet.ethernet(self.dst_mac, self.src_mac,
|
||||
ether.ETH_TYPE_IPV6)
|
||||
ip = ipv6.ipv6(nxt=inet.IPPROTO_ICMPV6)
|
||||
ic = icmpv6.icmpv6()
|
||||
|
||||
p = e/ip/ic
|
||||
p.serialize()
|
||||
|
||||
ipaddr = addrconv.ipv6.text_to_bin('::')
|
||||
|
||||
# ethernet !6s6sH
|
||||
e_buf = self.dst_mac_bin \
|
||||
+ self.src_mac_bin \
|
||||
+ '\x86\xdd'
|
||||
|
||||
# ipv6 !IHBB16s16s'
|
||||
ip_buf = '\x60\x00\x00\x00' \
|
||||
+ '\x00\x00' \
|
||||
+ '\x3a' \
|
||||
+ '\x00' \
|
||||
+ '\x00\x00' \
|
||||
+ ipaddr \
|
||||
+ ipaddr
|
||||
|
||||
# icmpv6 !BBH
|
||||
ic_buf = '\x00' \
|
||||
+ '\x00' \
|
||||
+ '\x00\x00'
|
||||
|
||||
buf = e_buf + ip_buf + ic_buf
|
||||
|
||||
# parse
|
||||
pkt = packet.Packet(array.array('B', p.data))
|
||||
protocols = self.get_protocols(pkt)
|
||||
p_eth = protocols['ethernet']
|
||||
p_ipv6 = protocols['ipv6']
|
||||
p_icmpv6 = protocols['icmpv6']
|
||||
|
||||
# ethernet
|
||||
ok_(p_eth)
|
||||
eq_(self.dst_mac, p_eth.dst)
|
||||
eq_(self.src_mac, p_eth.src)
|
||||
eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
|
||||
|
||||
# ipv6
|
||||
ok_(p_ipv6)
|
||||
eq_(6, p_ipv6.version)
|
||||
eq_(0, p_ipv6.traffic_class)
|
||||
eq_(0, p_ipv6.flow_label)
|
||||
eq_(len(ic_buf), p_ipv6.payload_length)
|
||||
eq_(inet.IPPROTO_ICMPV6, p_ipv6.nxt)
|
||||
eq_(0, p_ipv6.hop_limit)
|
||||
eq_('::', p_ipv6.src)
|
||||
eq_('::', p_ipv6.dst)
|
||||
|
||||
# icmpv6
|
||||
ok_(p_icmpv6)
|
||||
eq_(0, p_icmpv6.type_)
|
||||
eq_(0, p_icmpv6.code)
|
||||
eq_(len(ic_buf), len(p_icmpv6))
|
||||
t = bytearray(ic_buf)
|
||||
struct.pack_into('!H', t, 2, p_icmpv6.csum)
|
||||
ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr, len(ic_buf), 58)
|
||||
t = ph + t
|
||||
eq_(packet_utils.checksum(t), 0)
|
||||
|
||||
# to string
|
||||
eth_values = {'dst': self.dst_mac,
|
||||
'src': self.src_mac,
|
||||
'ethertype': ether.ETH_TYPE_IPV6}
|
||||
_eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
|
||||
for k, _ in inspect.getmembers(p_eth)
|
||||
if k in eth_values])
|
||||
eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
|
||||
|
||||
ipv6_values = {'version': 6,
|
||||
'traffic_class': 0,
|
||||
'flow_label': 0,
|
||||
'payload_length': len(ic_buf),
|
||||
'nxt': inet.IPPROTO_ICMPV6,
|
||||
'hop_limit': 0,
|
||||
'src': '::',
|
||||
'dst': '::',
|
||||
'ext_hdrs': []}
|
||||
_ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
|
||||
for k, _ in inspect.getmembers(p_ipv6)
|
||||
if k in ipv6_values])
|
||||
ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
|
||||
|
||||
icmpv6_values = {'type_': 0,
|
||||
'code': 0,
|
||||
'csum': p_icmpv6.csum,
|
||||
'data': None}
|
||||
_icmpv6_str = ','.join(['%s=%s' % (k, repr(icmpv6_values[k]))
|
||||
for k, _ in inspect.getmembers(p_icmpv6)
|
||||
if k in icmpv6_values])
|
||||
icmpv6_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _icmpv6_str)
|
||||
|
||||
pkt_str = '%s, %s, %s' % (eth_str, ipv6_str, icmpv6_str)
|
||||
|
||||
eq_(eth_str, str(p_eth))
|
||||
eq_(eth_str, repr(p_eth))
|
||||
|
||||
eq_(ipv6_str, str(p_ipv6))
|
||||
eq_(ipv6_str, repr(p_ipv6))
|
||||
|
||||
eq_(icmpv6_str, str(p_icmpv6))
|
||||
eq_(icmpv6_str, repr(p_icmpv6))
|
||||
|
||||
eq_(pkt_str, str(pkt))
|
||||
eq_(pkt_str, repr(pkt))
|
||||
|
||||
def test_llc_bpdu(self):
|
||||
# buid packet
|
||||
e = ethernet.ethernet(self.dst_mac, self.src_mac,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user