packet lib: ipv6: support IP Authentication header

Signed-off-by: itoyuichi <ito.yuichi0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
Yuichi Ito 2013-09-17 13:37:56 +09:00 committed by FUJITA Tomonori
parent dc26a90bbe
commit 3905f4c019
2 changed files with 153 additions and 0 deletions

View File

@ -376,3 +376,62 @@ class fragment(header):
def __len__(self):
return self._MIN_LEN
@ipv6.register_header_type(inet.IPPROTO_AH)
class auth(header):
"""IP Authentication header (RFC 2402) encoder/decoder class.
This is used with ryu.lib.packet.ipv6.ipv6.
An instance has the following attributes at least.
Most of them are same to the on-wire counterparts but in host byte order.
__init__ takes the corresponding args in this order.
.. tabularcolumns:: |l|L|
============== =======================================
Attribute Description
============== =======================================
size the length of the Authentication Header
in 64-bit words, subtracting 1.
spi security parameters index.
seq sequence number.
data authentication data.
============== =======================================
"""
TYPE = inet.IPPROTO_AH
_PACK_STR = '!BB2xII'
_MIN_LEN = struct.calcsize(_PACK_STR)
def __init__(self, size, spi, seq, data):
super(auth, self).__init__()
self.size = size
self.spi = spi
self.seq = seq
self.data = data
@classmethod
def _get_size(cls, size):
return (int(size) - 1) * 8
@classmethod
def parser(cls, buf):
(nxt, size, spi, seq) = struct.unpack_from(cls._PACK_STR, buf)
form = "%ds" % (cls._get_size(size) - cls._MIN_LEN)
(data, ) = struct.unpack_from(form, buf, cls._MIN_LEN)
ret = cls(size, spi, seq, data)
ret.set_nxt(nxt)
return ret
def serialize(self):
buf = struct.pack(self._PACK_STR, self.nxt, self.size, self.spi,
self.seq)
buf = bytearray(buf)
form = "%ds" % (auth._get_size(self.size) - self._MIN_LEN)
buf.extend(struct.pack(form, self.data))
return buf
def __len__(self):
return auth._get_size(self.size)

View File

@ -132,6 +132,28 @@ class Test_ipv6(unittest.TestCase):
addrconv.ipv6.text_to_bin(self.dst))
self.buf += self.fragment.serialize()
def setUp_with_auth(self):
self.auth_size = 4
self.auth_spi = 256
self.auth_seq = 1
self.auth_data = '\xa0\xe7\xf8\xab\xf9\x69\x1a\x8b\xf3\x9f\x7c\xae'
self.auth = ipv6.auth(
self.auth_size, self.auth_spi, self.auth_seq, self.auth_data)
self.ext_hdrs = [self.auth]
self.payload_length += len(self.auth)
self.ip = ipv6.ipv6(
self.version, self.traffic_class, self.flow_label,
self.payload_length, self.nxt, self.hop_limit, self.src,
self.dst, self.ext_hdrs)
self.auth.nxt = self.nxt
self.nxt = self.auth.TYPE
self.buf = struct.pack(
ipv6.ipv6._PACK_STR, self.v_tc_flow,
self.payload_length, self.nxt, self.hop_limit,
addrconv.ipv6.text_to_bin(self.src),
addrconv.ipv6.text_to_bin(self.dst))
self.buf += self.auth.serialize()
def tearDown(self):
pass
@ -158,6 +180,10 @@ class Test_ipv6(unittest.TestCase):
self.setUp_with_fragment()
self.test_init()
def test_init_with_auth(self):
self.setUp_with_auth()
self.test_init()
def test_parser(self):
_res = self.ip.parser(str(self.buf))
if type(_res) is tuple:
@ -187,6 +213,10 @@ class Test_ipv6(unittest.TestCase):
self.setUp_with_fragment()
self.test_parser()
def test_parser_with_auth(self):
self.setUp_with_auth()
self.test_parser()
def test_serialize(self):
data = bytearray()
prev = None
@ -231,6 +261,16 @@ class Test_ipv6(unittest.TestCase):
fragment = ipv6.fragment.parser(str(buf[ipv6.ipv6._MIN_LEN:]))
eq_(repr(self.fragment), repr(fragment))
def test_serialize_with_auth(self):
self.setUp_with_auth()
self.test_serialize()
data = bytearray()
prev = None
buf = self.ip.serialize(data, prev)
auth = ipv6.auth.parser(str(buf[ipv6.ipv6._MIN_LEN:]))
eq_(repr(self.auth), repr(auth))
def test_to_string(self):
ipv6_values = {'version': self.version,
'traffic_class': self.traffic_class,
@ -261,6 +301,10 @@ class Test_ipv6(unittest.TestCase):
self.setUp_with_fragment()
self.test_to_string()
def test_to_string_with_auth(self):
self.setUp_with_auth()
self.test_to_string()
def test_len(self):
eq_(len(self.ip), 40)
@ -276,6 +320,10 @@ class Test_ipv6(unittest.TestCase):
self.setUp_with_fragment()
eq_(len(self.ip), 40 + len(self.fragment))
def test_len_with_auth(self):
self.setUp_with_auth()
eq_(len(self.ip), 40 + len(self.auth))
class Test_hop_opts(unittest.TestCase):
@ -533,3 +581,49 @@ class Test_fragment(unittest.TestCase):
def test_len(self):
eq_(8, len(self.fragment))
class Test_auth(unittest.TestCase):
def setUp(self):
self.nxt = 0
self.size = 4
self.spi = 256
self.seq = 1
self.data = '\x21\xd3\xa9\x5c\x5f\xfd\x4d\x18\x46\x22\xb9\xf8'
self.auth = ipv6.auth(self.size, self.spi, self.seq, self.data)
self.auth.set_nxt(self.nxt)
self.form = '!BB2xII12s'
self.buf = struct.pack(self.form, self.nxt, self.size, self.spi,
self.seq, self.data)
def test_init(self):
eq_(self.nxt, self.auth.nxt)
eq_(self.size, self.auth.size)
eq_(self.spi, self.auth.spi)
eq_(self.seq, self.auth.seq)
eq_(self.data, self.auth.data)
def test_parser(self):
_res = ipv6.auth.parser(self.buf)
if type(_res) is tuple:
res = _res[0]
else:
res = _res
eq_(self.nxt, res.nxt)
eq_(self.size, res.size)
eq_(self.spi, res.spi)
eq_(self.seq, res.seq)
eq_(self.data, res.data)
def test_serialize(self):
buf = self.auth.serialize()
res = struct.unpack_from(self.form, str(buf))
eq_(self.nxt, res[0])
eq_(self.size, res[1])
eq_(self.spi, res[2])
eq_(self.seq, res[3])
eq_(self.data, res[4])
def test_len(self):
eq_((4 - 1) * 8, len(self.auth))