mirror of
https://github.com/faucetsdn/ryu.git
synced 2026-02-04 07:21:25 +01:00
packet lib: ipv6: support Hop-by-Hop Options header
Signed-off-by: itoyuichi <ito.yuichi0@gmail.com> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
parent
95909af095
commit
e49266946d
@ -215,6 +215,32 @@ class opt_header(header):
|
||||
return self._FIX_SIZE + self.size
|
||||
|
||||
|
||||
@ipv6.register_header_type(inet.IPPROTO_HOPOPTS)
|
||||
class hop_opts(opt_header):
|
||||
"""IPv6 (RFC 2460) Hop-by-Hop Options header encoder/decoder class.
|
||||
|
||||
This is used with ryu.lib.packet.ipv6.ipv6.
|
||||
|
||||
An instance has the following attributes at least.
|
||||
Most of them are same to the on-wire counterparts but in host byte order.
|
||||
__init__ takes the corresponding args in this order.
|
||||
|
||||
.. tabularcolumns:: |l|L|
|
||||
|
||||
============== =======================================
|
||||
Attribute Description
|
||||
============== =======================================
|
||||
size the length of the Hop-by-Hop Options header,
|
||||
not include the first 8 octet.
|
||||
data IPv6 options.
|
||||
============== =======================================
|
||||
"""
|
||||
TYPE = inet.IPPROTO_HOPOPTS
|
||||
|
||||
def __init__(self, size, data):
|
||||
super(hop_opts, self).__init__(size, data)
|
||||
|
||||
|
||||
class option(stringify.StringifyMixin):
|
||||
"""IPv6 (RFC 2460) Options header encoder/decoder class.
|
||||
|
||||
|
||||
@ -55,6 +55,34 @@ class Test_ipv6(unittest.TestCase):
|
||||
addrconv.ipv6.text_to_bin(self.src),
|
||||
addrconv.ipv6.text_to_bin(self.dst))
|
||||
|
||||
def setUp_with_hop_opts(self):
|
||||
self.opt1_type = 5
|
||||
self.opt1_len = 2
|
||||
self.opt1_data = '\x00\x00'
|
||||
self.opt2_type = 1
|
||||
self.opt2_len = 0
|
||||
self.opt2_data = None
|
||||
self.options = [
|
||||
ipv6.option(self.opt1_type, self.opt1_len, self.opt1_data),
|
||||
ipv6.option(self.opt2_type, self.opt2_len, self.opt2_data),
|
||||
]
|
||||
self.hop_opts_size = 0
|
||||
self.hop_opts = ipv6.hop_opts(self.hop_opts_size, self.options)
|
||||
self.ext_hdrs = [self.hop_opts]
|
||||
self.payload_length += len(self.hop_opts)
|
||||
self.ip = ipv6.ipv6(
|
||||
self.version, self.traffic_class, self.flow_label,
|
||||
self.payload_length, self.nxt, self.hop_limit, self.src,
|
||||
self.dst, self.ext_hdrs)
|
||||
self.hop_opts.nxt = self.nxt
|
||||
self.nxt = self.hop_opts.TYPE
|
||||
self.buf = struct.pack(
|
||||
ipv6.ipv6._PACK_STR, self.v_tc_flow,
|
||||
self.payload_length, self.nxt, self.hop_limit,
|
||||
addrconv.ipv6.text_to_bin(self.src),
|
||||
addrconv.ipv6.text_to_bin(self.dst))
|
||||
self.buf += self.hop_opts.serialize()
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
@ -69,6 +97,10 @@ class Test_ipv6(unittest.TestCase):
|
||||
eq_(self.dst, self.ip.dst)
|
||||
eq_(str(self.ext_hdrs), str(self.ip.ext_hdrs))
|
||||
|
||||
def test_init_with_hop_opts(self):
|
||||
self.setUp_with_hop_opts()
|
||||
self.test_init()
|
||||
|
||||
def test_parser(self):
|
||||
_res = self.ip.parser(str(self.buf))
|
||||
if type(_res) is tuple:
|
||||
@ -86,6 +118,10 @@ class Test_ipv6(unittest.TestCase):
|
||||
eq_(self.dst, res.dst)
|
||||
eq_(str(self.ext_hdrs), str(res.ext_hdrs))
|
||||
|
||||
def test_parser_with_hop_opts(self):
|
||||
self.setUp_with_hop_opts()
|
||||
self.test_parser()
|
||||
|
||||
def test_serialize(self):
|
||||
data = bytearray()
|
||||
prev = None
|
||||
@ -100,6 +136,16 @@ class Test_ipv6(unittest.TestCase):
|
||||
eq_(self.src, addrconv.ipv6.bin_to_text(res[4]))
|
||||
eq_(self.dst, addrconv.ipv6.bin_to_text(res[5]))
|
||||
|
||||
def test_serialize_with_hop_opts(self):
|
||||
self.setUp_with_hop_opts()
|
||||
self.test_serialize()
|
||||
|
||||
data = bytearray()
|
||||
prev = None
|
||||
buf = self.ip.serialize(data, prev)
|
||||
hop_opts = ipv6.hop_opts.parser(str(buf[ipv6.ipv6._MIN_LEN:]))
|
||||
eq_(repr(self.hop_opts), repr(hop_opts))
|
||||
|
||||
def test_to_string(self):
|
||||
ipv6_values = {'version': self.version,
|
||||
'traffic_class': self.traffic_class,
|
||||
@ -118,9 +164,89 @@ class Test_ipv6(unittest.TestCase):
|
||||
eq_(str(self.ip), ipv6_str)
|
||||
eq_(repr(self.ip), ipv6_str)
|
||||
|
||||
def test_to_string_with_hop_opts(self):
|
||||
self.setUp_with_hop_opts()
|
||||
self.test_to_string()
|
||||
|
||||
def test_len(self):
|
||||
eq_(len(self.ip), 40)
|
||||
|
||||
def test_len_with_hop_opts(self):
|
||||
self.setUp_with_hop_opts()
|
||||
eq_(len(self.ip), 40 + len(self.hop_opts))
|
||||
|
||||
|
||||
class Test_hop_opts(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.nxt = 0
|
||||
self.size = 8
|
||||
self.data = [
|
||||
ipv6.option(5, 2, '\x00\x00'),
|
||||
ipv6.option(1, 0, None),
|
||||
ipv6.option(0xc2, 4, '\x00\x01\x00\x00'),
|
||||
ipv6.option(1, 0, None),
|
||||
]
|
||||
self.hop = ipv6.hop_opts(self.size, self.data)
|
||||
self.hop.set_nxt(self.nxt)
|
||||
self.form = '!BB'
|
||||
self.buf = struct.pack(self.form, self.nxt, self.size) \
|
||||
+ self.data[0].serialize() \
|
||||
+ self.data[1].serialize() \
|
||||
+ self.data[2].serialize() \
|
||||
+ self.data[3].serialize()
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def test_init(self):
|
||||
eq_(self.nxt, self.hop.nxt)
|
||||
eq_(self.size, self.hop.size)
|
||||
eq_(self.data, self.hop.data)
|
||||
|
||||
@raises(Exception)
|
||||
def test_invalid_size(self):
|
||||
ipv6.hop_opts(self.nxt, 1, self.data)
|
||||
|
||||
def test_parser(self):
|
||||
_res = ipv6.hop_opts.parser(self.buf)
|
||||
if type(_res) is tuple:
|
||||
res = _res[0]
|
||||
else:
|
||||
res = _res
|
||||
eq_(self.nxt, res.nxt)
|
||||
eq_(self.size, res.size)
|
||||
eq_(str(self.data), str(res.data))
|
||||
|
||||
def test_serialize(self):
|
||||
buf = self.hop.serialize()
|
||||
res = struct.unpack_from(self.form, str(buf))
|
||||
eq_(self.nxt, res[0])
|
||||
eq_(self.size, res[1])
|
||||
offset = struct.calcsize(self.form)
|
||||
opt1 = ipv6.option.parser(str(buf[offset:]))
|
||||
offset += len(opt1)
|
||||
opt2 = ipv6.option.parser(str(buf[offset:]))
|
||||
offset += len(opt2)
|
||||
opt3 = ipv6.option.parser(str(buf[offset:]))
|
||||
offset += len(opt3)
|
||||
opt4 = ipv6.option.parser(str(buf[offset:]))
|
||||
eq_(5, opt1.type_)
|
||||
eq_(2, opt1.len_)
|
||||
eq_('\x00\x00', opt1.data)
|
||||
eq_(1, opt2.type_)
|
||||
eq_(0, opt2.len_)
|
||||
eq_(None, opt2.data)
|
||||
eq_(0xc2, opt3.type_)
|
||||
eq_(4, opt3.len_)
|
||||
eq_('\x00\x01\x00\x00', opt3.data)
|
||||
eq_(1, opt4.type_)
|
||||
eq_(0, opt4.len_)
|
||||
eq_(None, opt4.data)
|
||||
|
||||
def test_len(self):
|
||||
eq_(16, len(self.hop))
|
||||
|
||||
|
||||
class Test_option(unittest.TestCase):
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user