packet lib: icmpv6: add nd_router messages

add ICMPv6 sub encoder/decoder class for Router Solicitation and
Router Advertisement messages.

add ICMPv6 sub encoder/decoder class for Neighbor discovery
Prefix Information Option.

Signed-off-by: Ygor Amaral <yabls@cin.ufpe.br>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
Ygor Amaral 2013-09-02 03:56:17 -03:00 committed by FUJITA Tomonori
parent cdbc7394c7
commit a6b71f671e
2 changed files with 316 additions and 25 deletions

View File

@ -214,8 +214,192 @@ class nd_neighbor(stringify.StringifyMixin):
return hdr
@icmpv6.register_icmpv6_type(ND_ROUTER_SOLICIT)
class nd_router_solicit(stringify.StringifyMixin):
"""ICMPv6 sub encoder/decoder class for Router Solicitation messages.
(RFC 4861)
This is used with ryu.lib.packet.icmpv6.icmpv6.
An instance has the following attributes at least.
Most of them are same to the on-wire counterparts but in host byte order.
__init__ takes the correspondig args in this order.
.. tabularcolumns:: |l|p{35em}|
============== ====================
Attribute Description
============== ====================
res This field is unused. It MUST be initialized to zero.
type\_ "Type" field of the first option. None if no options. \
NOTE: This implementation doesn't support two or more \
options.
length "Length" field of the first option. None if no options.
data An object to describe the first option. \
None if no options. \
Either ryu.lib.packet.icmpv6.nd_option_la object \
or a bytearray.
============== ====================
"""
_PACK_STR = '!I'
_MIN_LEN = struct.calcsize(_PACK_STR)
_ND_OPTION_TYPES = {}
# ND option type
ND_OPTION_SLA = 1 # Source Link-Layer Address
@staticmethod
def register_nd_option_type(*args):
def _register_nd_option_type(cls):
for type_ in args:
nd_router_solicit._ND_OPTION_TYPES[type_] = cls
return cls
return _register_nd_option_type
def __init__(self, res, type_=None, length=None, data=None):
self.res = res
self.type_ = type_
self.length = length
self.data = data
@classmethod
def parser(cls, buf, offset):
res = struct.unpack_from(cls._PACK_STR, buf, offset)
msg = cls(res)
offset += cls._MIN_LEN
if len(buf) > offset:
(msg.type_, msg.length) = struct.unpack_from('!BB', buf, offset)
cls_ = cls._ND_OPTION_TYPES.get(msg.type_, None)
offset += 2
if cls_:
msg.data = cls_.parser(buf, offset)
else:
msg.data = buf[offset:]
return msg
def serialize(self):
hdr = bytearray(struct.pack(nd_router_solicit._PACK_STR, self.res))
if self.type_ is not None:
hdr += bytearray(struct.pack('!BB', self.type_, self.length))
if self.type_ in nd_router_solicit._ND_OPTION_TYPES:
hdr += self.data.serialize()
elif self.data is not None:
hdr += bytearray(self.data)
return hdr
@icmpv6.register_icmpv6_type(ND_ROUTER_ADVERT)
class nd_router_advert(stringify.StringifyMixin):
"""ICMPv6 sub encoder/decoder class for Router Advertisement messages.
(RFC 4861)
This is used with ryu.lib.packet.icmpv6.icmpv6.
An instance has the following attributes at least.
Most of them are same to the on-wire counterparts but in host byte order.
__init__ takes the correspondig args in this order.
.. tabularcolumns:: |l|p{35em}|
============== ====================
Attribute Description
============== ====================
ch_l Cur Hop Limit.
res M,O Flags for Router Advertisement.
rou_l Router Lifetime.
rea_t Reachable Time.
ret_t Retrans Timer.
type\_ List of option type. Each index refers to an option. \
None if no options. \
NOTE: This implementation support one or more \
options.
length List of option length. Each index refers to an option. \
None if no options. \
data List of option data. Each index refers to an option. \
None if no options. \
ryu.lib.packet.icmpv6.nd_option_la object, \
ryu.lib.packet.icmpv6.nd_option_pi object \
or a bytearray.
============== ====================
"""
_PACK_STR = '!BBHII'
_MIN_LEN = struct.calcsize(_PACK_STR)
_ND_OPTION_TYPES = {}
# ND option type
ND_OPTION_SLA = 1 # Source Link-Layer Address
ND_OPTION_PI = 3 # Prefix Information
ND_OPTION_MTU = 5 # MTU
@staticmethod
def register_nd_option_type(*args):
def _register_nd_option_type(cls):
for type_ in args:
nd_router_advert._ND_OPTION_TYPES[type_] = cls
return cls
return _register_nd_option_type
def __init__(self, ch_l, res, rou_l, rea_t, ret_t, type_=None, length=None,
data=None):
self.ch_l = ch_l
self.res = res << 6
self.rou_l = rou_l
self.rea_t = rea_t
self.ret_t = ret_t
self.type_ = type_
self.length = length
self.data = data
@classmethod
def parser(cls, buf, offset):
(ch_l, res, rou_l, rea_t, ret_t) = struct.unpack_from(cls._PACK_STR,
buf, offset)
msg = cls(ch_l, res >> 6, rou_l, rea_t, ret_t)
offset += cls._MIN_LEN
msg.type_ = list()
msg.length = list()
msg.data = list()
while len(buf) > offset:
(type_, length) = struct.unpack_from('!BB', buf, offset)
msg.type_.append(type_)
msg.length.append(length)
cls_ = cls._ND_OPTION_TYPES.get(type_, None)
offset += 2
if cls_:
msg.data.append(cls_.parser(buf[:offset+cls_._MIN_LEN],
offset))
offset += cls_._MIN_LEN
else:
msg.data.append(buf[offset:])
offset = len(buf)
return msg
def serialize(self):
hdr = bytearray(struct.pack(nd_router_advert._PACK_STR, self.ch_l,
self.res, self.rou_l, self.rea_t,
self.ret_t))
if self.type_ is not None:
for i in range(len(self.type_)):
hdr += bytearray(struct.pack('!BB', self.type_[i],
self.length[i]))
if self.type_[i] in nd_router_advert._ND_OPTION_TYPES:
hdr += self.data[i].serialize()
elif self.data[i] is not None:
hdr += bytearray(self.data[i])
return hdr
@nd_neighbor.register_nd_option_type(nd_neighbor.ND_OPTION_SLA,
nd_neighbor.ND_OPTION_TLA)
@nd_router_solicit.register_nd_option_type(nd_router_solicit.ND_OPTION_SLA)
@nd_router_advert.register_nd_option_type(nd_router_advert.ND_OPTION_SLA)
class nd_option_la(stringify.StringifyMixin):
"""ICMPv6 sub encoder/decoder class for Neighbor discovery
Source/Target Link-Layer Address Option. (RFC 4861)
@ -270,6 +454,63 @@ class nd_option_la(stringify.StringifyMixin):
return hdr
@nd_router_advert.register_nd_option_type(nd_router_advert.ND_OPTION_PI)
class nd_option_pi(stringify.StringifyMixin):
"""ICMPv6 sub encoder/decoder class for Neighbor discovery
Prefix Information Option. (RFC 4861)
This is used with ryu.lib.packet.icmpv6.nd_neighbor.
An instance has the following attributes at least.
Most of them are same to the on-wire counterparts but in host byte order.
__init__ takes the correspondig args in this order.
.. tabularcolumns:: |l|p{35em}|
============== ====================
Attribute Description
============== ====================
pl Prefix Length.
res1 L,A,R* Flags for Prefix Information.
val_l Valid Lifetime.
pre_l Preferred Lifetime.
res2 This field is unused. It MUST be initialized to zero.
prefix An IP address or a prefix of an IP address.
============== ====================
*R flag is defined in (RFC 3775)
"""
_PACK_STR = '!BBIII16s'
_MIN_LEN = struct.calcsize(_PACK_STR)
def __init__(self, pl, res1, val_l, pre_l, res2, prefix):
self.pl = pl
self.res1 = res1 << 5
self.val_l = val_l
self.pre_l = pre_l
self.res2 = res2
self.prefix = prefix
@classmethod
def parser(cls, buf, offset):
(pl, res1, val_l, pre_l, res2, prefix) = struct.unpack_from(cls.
_PACK_STR,
buf,
offset)
msg = cls(pl, res1 >> 5, val_l, pre_l, res2,
addrconv.ipv6.bin_to_text(prefix))
return msg
def serialize(self):
hdr = bytearray(struct.pack(self._PACK_STR, self.pl, self.res1,
self.val_l, self.pre_l, self.res2,
addrconv.ipv6.text_to_bin(self.prefix)))
return hdr
@icmpv6.register_icmpv6_type(ICMPV6_ECHO_REPLY, ICMPV6_ECHO_REQUEST)
class echo(stringify.StringifyMixin):
"""ICMPv6 sub encoder/decoder class for Echo Request and Echo Reply

View File

@ -347,10 +347,11 @@ class Test_icmpv6_router_solict(unittest.TestCase):
res = 0
nd_type = 1
nd_length = 1
nd_data = None
nd_hw_src = '\x12\x2d\xa5\x6d\xbc\x0f'
nd_hw_src = '12:2d:a5:6d:bc:0f'
data = '\x00\x00\x00\x00\x01\x01\x12\x2d\xa5\x6d\xbc\x0f'
buf = '\x85\x00\x97\xd9'
src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
dst_ipv6 = '3ffe:501:0:1001::2'
def setUp(self):
pass
@ -359,7 +360,11 @@ class Test_icmpv6_router_solict(unittest.TestCase):
pass
def test_init(self):
pass
rs = icmpv6.nd_router_solicit(self.res)
eq_(rs.res, self.res)
eq_(rs.type_, None)
eq_(rs.length, None)
eq_(rs.data, None)
def _test_parser(self, data=None):
buf = self.buf + str(data or '')
@ -368,7 +373,15 @@ class Test_icmpv6_router_solict(unittest.TestCase):
eq_(msg.type_, self.type_)
eq_(msg.code, self.code)
eq_(msg.csum, self.csum)
eq_(msg.data, data)
if data is not None:
eq_(msg.data.res[0], self.res)
eq_(n, None)
if data:
rs = msg.data
eq_(rs.type_, self.nd_type)
eq_(rs.length, self.nd_length)
eq_(rs.data.hw_src, self.nd_hw_src)
eq_(rs.data.data, None)
def test_parser_without_data(self):
self._test_parser()
@ -376,38 +389,75 @@ class Test_icmpv6_router_solict(unittest.TestCase):
def test_parser_with_data(self):
self._test_parser(self.data)
def _test_serialize(self, nd_data=None):
nd_data = str(nd_data or '')
buf = self.buf + nd_data
src_ipv6 = 'fe80::102d:a5ff:fe6d:bc0f'
dst_ipv6 = 'ff02::2'
prev = ipv6(6, 0, 0, len(buf), 58, 255, src_ipv6, dst_ipv6)
nd_csum = icmpv6_csum(prev, buf)
def test_serialize_without_data(self):
rs = icmpv6.nd_router_solicit(self.res)
prev = ipv6(6, 0, 0, 8, 64, 255, self.src_ipv6, self.dst_ipv6)
rs_csum = icmpv6_csum(prev, self.buf)
icmp = icmpv6.icmpv6(self.type_, self.code, 0, nd_data)
icmp = icmpv6.icmpv6(self.type_, self.code, 0, rs)
buf = buffer(icmp.serialize(bytearray(), prev))
(type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
data = buf[icmp._MIN_LEN:]
res = struct.unpack_from(rs._PACK_STR, buf, icmp._MIN_LEN)
data = buf[(icmp._MIN_LEN + rs._MIN_LEN):]
eq_(type_, self.type_)
eq_(code, self.code)
eq_(csum, nd_csum)
eq_(data, nd_data)
def test_serialize_without_data(self):
self._test_serialize()
eq_(csum, rs_csum)
eq_(res[0], self.res)
eq_(data, '')
def test_serialize_with_data(self):
self._test_serialize(self.data)
nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
rs = icmpv6.nd_router_solicit(self.res, self.nd_type, self.nd_length,
nd_opt)
prev = ipv6(6, 0, 0, 16, 64, 255, self.src_ipv6, self.dst_ipv6)
rs_csum = icmpv6_csum(prev, self.buf + self.data)
icmp = icmpv6.icmpv6(self.type_, self.code, 0, rs)
buf = buffer(icmp.serialize(bytearray(), prev))
(type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
res = struct.unpack_from(rs._PACK_STR, buf, icmp._MIN_LEN)
(nd_type, nd_length, nd_hw_src) = struct.unpack_from(
'!BB6s', buf, icmp._MIN_LEN + rs._MIN_LEN)
data = buf[(icmp._MIN_LEN + rs._MIN_LEN + 8):]
eq_(type_, self.type_)
eq_(code, self.code)
eq_(csum, rs_csum)
eq_(res[0], self.res)
eq_(nd_type, self.nd_type)
eq_(nd_length, self.nd_length)
eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
def test_to_string(self):
ic = icmpv6.icmpv6(self.type_, self.code, self.csum, self.data)
nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
rs = icmpv6.nd_router_solicit(
self.res, self.nd_type, self.nd_length, nd_opt)
ic = icmpv6.icmpv6(self.type_, self.code, self.csum, rs)
icmp_values = {'type_': self.type_,
'code': self.code,
'csum': self.csum,
'data': self.data}
_ic_str = ','.join(['%s=%s' % (k, repr(icmp_values[k]))
nd_opt_values = {'hw_src': self.nd_hw_src,
'data': None}
_nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
for k, v in inspect.getmembers(nd_opt)
if k in nd_opt_values])
nd_opt_str = '%s(%s)' % (icmpv6.nd_option_la.__name__, _nd_opt_str)
rs_values = {'res': repr(rs.res),
'type_': repr(self.nd_type),
'length': repr(self.nd_length),
'data': nd_opt_str}
_rs_str = ','.join(['%s=%s' % (k, rs_values[k])
for k, v in inspect.getmembers(rs)
if k in rs_values])
rs_str = '%s(%s)' % (icmpv6.nd_router_solicit.__name__, _rs_str)
icmp_values = {'type_': repr(self.type_),
'code': repr(self.code),
'csum': repr(self.csum),
'data': rs_str}
_ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
for k, v in inspect.getmembers(ic)
if k in icmp_values])
ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)