mirror of
https://github.com/faucetsdn/ryu.git
synced 2026-01-28 20:11:20 +01:00
test_tcp: Update test cases for tcp option
Signed-off-by: Minoru TAKAHASHI <takahashi.minoru7@gmail.com> Signed-off-by: IWASE Yusuke <iwase.yusuke0@gmail.com> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
parent
9521e46181
commit
541dc65026
@ -1,4 +1,4 @@
|
||||
# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
|
||||
# Copyright (C) 2012-2015 Nippon Telegraph and Telephone Corporation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -21,9 +21,8 @@ import six
|
||||
import struct
|
||||
from struct import *
|
||||
from nose.tools import *
|
||||
from ryu.ofproto import ether, inet
|
||||
from ryu.lib.packet.packet import Packet
|
||||
from ryu.lib.packet.tcp import tcp
|
||||
from ryu.ofproto import inet
|
||||
from ryu.lib.packet import tcp
|
||||
from ryu.lib.packet.ipv4 import ipv4
|
||||
from ryu.lib.packet import packet_utils
|
||||
from ryu.lib import addrconv
|
||||
@ -46,10 +45,10 @@ class Test_tcp(unittest.TestCase):
|
||||
urgent = 128
|
||||
option = b'\x01\x02\x03\x04'
|
||||
|
||||
t = tcp(src_port, dst_port, seq, ack, offset, bits,
|
||||
window_size, csum, urgent, option)
|
||||
t = tcp.tcp(src_port, dst_port, seq, ack, offset, bits,
|
||||
window_size, csum, urgent, option)
|
||||
|
||||
buf = pack(tcp._PACK_STR, src_port, dst_port, seq, ack,
|
||||
buf = pack(tcp.tcp._PACK_STR, src_port, dst_port, seq, ack,
|
||||
offset << 4, bits, window_size, csum, urgent)
|
||||
buf += option
|
||||
|
||||
@ -95,10 +94,10 @@ class Test_tcp(unittest.TestCase):
|
||||
prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
|
||||
inet.IPPROTO_TCP, 0, src_ip, dst_ip)
|
||||
|
||||
t = tcp(self.src_port, self.dst_port, self.seq, self.ack,
|
||||
offset, self.bits, self.window_size, csum, self.urgent)
|
||||
t = tcp.tcp(self.src_port, self.dst_port, self.seq, self.ack,
|
||||
offset, self.bits, self.window_size, csum, self.urgent)
|
||||
buf = t.serialize(bytearray(), prev)
|
||||
res = struct.unpack(tcp._PACK_STR, six.binary_type(buf))
|
||||
res = struct.unpack(tcp.tcp._PACK_STR, six.binary_type(buf))
|
||||
|
||||
eq_(res[0], self.src_port)
|
||||
eq_(res[1], self.dst_port)
|
||||
@ -109,41 +108,63 @@ class Test_tcp(unittest.TestCase):
|
||||
eq_(res[6], self.window_size)
|
||||
eq_(res[8], self.urgent)
|
||||
|
||||
# test __len__
|
||||
# offset indicates the number of 32 bit (= 4 bytes)
|
||||
# words in the TCP Header.
|
||||
# So, we compare len(tcp) with offset * 4, here.
|
||||
eq_(offset * 4, len(t))
|
||||
|
||||
# checksum
|
||||
ph = struct.pack('!4s4sBBH',
|
||||
addrconv.ipv4.text_to_bin(src_ip),
|
||||
addrconv.ipv4.text_to_bin(dst_ip), 0, 6, offset * 4)
|
||||
d = ph + buf + bytearray()
|
||||
d = ph + buf
|
||||
s = packet_utils.checksum(d)
|
||||
eq_(0, s)
|
||||
|
||||
def test_serialize_option(self):
|
||||
offset = 6
|
||||
# prepare test data
|
||||
offset = 0
|
||||
csum = 0
|
||||
option = b'\x01\x02'
|
||||
|
||||
src_ip = '192.168.10.1'
|
||||
dst_ip = '192.168.100.1'
|
||||
option = [
|
||||
tcp.TCPOptionMaximumSegmentSize(max_seg_size=1460),
|
||||
tcp.TCPOptionSACKPermitted(),
|
||||
tcp.TCPOptionTimestamps(ts_val=287454020, ts_ecr=1432778632),
|
||||
tcp.TCPOptionNoOperation(),
|
||||
tcp.TCPOptionWindowScale(shift_cnt=9),
|
||||
]
|
||||
option_buf = (
|
||||
b'\x02\x04\x05\xb4'
|
||||
b'\x04\x02'
|
||||
b'\x08\x0a\x11\x22\x33\x44\x55\x66\x77\x88'
|
||||
b'\x01'
|
||||
b'\x03\x03\x09'
|
||||
)
|
||||
prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
|
||||
inet.IPPROTO_TCP, 0, src_ip, dst_ip)
|
||||
inet.IPPROTO_TCP, 0, '192.168.10.1', '192.168.100.1')
|
||||
|
||||
t = tcp(self.src_port, self.dst_port, self.seq, self.ack,
|
||||
offset, self.bits, self.window_size, csum, self.urgent,
|
||||
option)
|
||||
# test serializer
|
||||
t = tcp.tcp(self.src_port, self.dst_port, self.seq, self.ack,
|
||||
offset, self.bits, self.window_size, csum, self.urgent,
|
||||
option)
|
||||
buf = t.serialize(bytearray(), prev)
|
||||
r_option = buf[tcp._MIN_LEN:tcp._MIN_LEN + len(option)]
|
||||
eq_(option, r_option)
|
||||
r_option_buf = buf[tcp.tcp._MIN_LEN:tcp.tcp._MIN_LEN + len(option_buf)]
|
||||
eq_(option_buf, r_option_buf)
|
||||
|
||||
# test parser
|
||||
(r_tcp, _, _) = tcp.tcp.parser(buf)
|
||||
eq_(str(option), str(r_tcp.option))
|
||||
|
||||
@raises(Exception)
|
||||
def test_malformed_tcp(self):
|
||||
m_short_buf = self.buf[1:tcp._MIN_LEN]
|
||||
tcp.parser(m_short_buf)
|
||||
m_short_buf = self.buf[1:tcp.tcp._MIN_LEN]
|
||||
tcp.tcp.parser(m_short_buf)
|
||||
|
||||
def test_default_args(self):
|
||||
prev = ipv4(proto=inet.IPPROTO_TCP)
|
||||
t = tcp()
|
||||
t = tcp.tcp()
|
||||
buf = t.serialize(bytearray(), prev)
|
||||
res = struct.unpack(tcp._PACK_STR, buf)
|
||||
res = struct.unpack(tcp.tcp._PACK_STR, buf)
|
||||
|
||||
eq_(res[0], 1)
|
||||
eq_(res[1], 1)
|
||||
@ -155,9 +176,9 @@ class Test_tcp(unittest.TestCase):
|
||||
eq_(res[8], 0)
|
||||
|
||||
# with option, without offset
|
||||
t = tcp(option=b'\x01\x02\x03')
|
||||
t = tcp.tcp(option=[tcp.TCPOptionMaximumSegmentSize(1460)])
|
||||
buf = t.serialize(bytearray(), prev)
|
||||
res = struct.unpack(tcp._PACK_STR + '4s', buf)
|
||||
res = struct.unpack(tcp.tcp._PACK_STR + '4s', buf)
|
||||
|
||||
eq_(res[0], 1)
|
||||
eq_(res[1], 1)
|
||||
@ -167,12 +188,12 @@ class Test_tcp(unittest.TestCase):
|
||||
eq_(res[5], 0)
|
||||
eq_(res[6], 0)
|
||||
eq_(res[8], 0)
|
||||
eq_(res[9], b'\x01\x02\x03\x00')
|
||||
eq_(res[9], b'\x02\x04\x05\xb4')
|
||||
|
||||
# with option, with long offset
|
||||
t = tcp(offset=7, option=b'\x01\x02\x03')
|
||||
t = tcp.tcp(offset=7, option=[tcp.TCPOptionWindowScale(shift_cnt=9)])
|
||||
buf = t.serialize(bytearray(), prev)
|
||||
res = struct.unpack(tcp._PACK_STR + '8s', buf)
|
||||
res = struct.unpack(tcp.tcp._PACK_STR + '8s', buf)
|
||||
|
||||
eq_(res[0], 1)
|
||||
eq_(res[1], 1)
|
||||
@ -182,9 +203,65 @@ class Test_tcp(unittest.TestCase):
|
||||
eq_(res[5], 0)
|
||||
eq_(res[6], 0)
|
||||
eq_(res[8], 0)
|
||||
eq_(res[9], b'\x01\x02\x03\x00\x00\x00\x00\x00')
|
||||
eq_(res[9], b'\x03\x03\x09\x00\x00\x00\x00\x00')
|
||||
|
||||
def test_json(self):
|
||||
jsondict = self.t.to_jsondict()
|
||||
t = tcp.from_jsondict(jsondict['tcp'])
|
||||
t = tcp.tcp.from_jsondict(jsondict['tcp'])
|
||||
eq_(str(self.t), str(t))
|
||||
|
||||
|
||||
class Test_TCPOption(unittest.TestCase):
|
||||
# prepare test data
|
||||
input_options = [
|
||||
tcp.TCPOptionEndOfOptionList(),
|
||||
tcp.TCPOptionNoOperation(),
|
||||
tcp.TCPOptionMaximumSegmentSize(max_seg_size=1460),
|
||||
tcp.TCPOptionWindowScale(shift_cnt=9),
|
||||
tcp.TCPOptionSACKPermitted(),
|
||||
tcp.TCPOptionSACK(blocks=[(1, 2), (3, 4)], length=18),
|
||||
tcp.TCPOptionTimestamps(ts_val=287454020, ts_ecr=1432778632),
|
||||
tcp.TCPOptionUserTimeout(granularity=1, user_timeout=564),
|
||||
tcp.TCPOptionAuthentication(
|
||||
key_id=1, r_next_key_id=2,
|
||||
mac=b'abcdefghijkl', length=16),
|
||||
tcp.TCPOptionUnknown(value=b'foobar', kind=255, length=8),
|
||||
tcp.TCPOptionUnknown(value=b'', kind=255, length=2),
|
||||
]
|
||||
input_buf = (
|
||||
b'\x00' # End of Option List
|
||||
b'\x01' # No-Operation
|
||||
b'\x02\x04\x05\xb4' # Maximum Segment Size
|
||||
b'\x03\x03\x09' # Window Scale
|
||||
b'\x04\x02' # SACK Permitted
|
||||
b'\x05\x12' # SACK
|
||||
b'\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00\x04'
|
||||
b'\x08\x0a' # Timestamps
|
||||
b'\x11\x22\x33\x44\x55\x66\x77\x88'
|
||||
b'\x1c\x04\x82\x34' # User Timeout Option
|
||||
b'\x1d\x10\x01\x02' # TCP Authentication Option (TCP-AO)
|
||||
b'abcdefghijkl'
|
||||
b'\xff\x08' # Unknown with body
|
||||
b'foobar'
|
||||
b'\xff\x02' # Unknown
|
||||
)
|
||||
|
||||
def test_serialize(self):
|
||||
output_buf = bytearray()
|
||||
for option in self.input_options:
|
||||
output_buf += option.serialize()
|
||||
eq_(self.input_buf, output_buf)
|
||||
|
||||
def test_parser(self):
|
||||
buf = self.input_buf
|
||||
output_options = []
|
||||
while buf:
|
||||
opt, buf = tcp.TCPOption.parser(buf)
|
||||
output_options.append(opt)
|
||||
eq_(str(self.input_options), str(output_options))
|
||||
|
||||
def test_json(self):
|
||||
for option in self.input_options:
|
||||
json_dict = option.to_jsondict()[option.__class__.__name__]
|
||||
output_option = option.__class__.from_jsondict(json_dict)
|
||||
eq_(str(option), str(output_option))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user