packet lib: icmpv6: include type and length in nd_option object

the purpose is to simplify the creation of the nd_router_advert object.

Signed-off-by: Yuichi Ito <ito.yuichi0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
Yuichi Ito 2013-11-06 17:15:06 +09:00 committed by FUJITA Tomonori
parent 7cce0f9e5f
commit 418384f233
2 changed files with 139 additions and 158 deletions

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import struct
import sys
import array
@ -157,14 +158,8 @@ class nd_neighbor(stringify.StringifyMixin):
res R,S,O Flags for Neighbor Advertisement. \
The 3 MSBs of "Reserved" field for Neighbor Solicitation.
dst Target Address
type\_ "Type" field of the first option. None if no options. \
NOTE: This implementation doesn't support two or more \
options.
length "Length" field of the first option. None if no options.
data An object to describe the first option. \
None if no options. \
Either ryu.lib.packet.icmpv6.nd_option_la object \
or a bytearray.
option a derived object of ryu.lib.packet.icmpv6.nd_option \
or a bytearray. None if no options.
============== ====================
"""
@ -180,27 +175,24 @@ class nd_neighbor(stringify.StringifyMixin):
return cls
return _register_nd_option_type
def __init__(self, res, dst, type_=None, length=None, data=None):
def __init__(self, res, dst, option=None):
self.res = res
self.dst = dst
self.type_ = type_
self.length = length
self.data = data
self.option = option
@classmethod
def parser(cls, buf, offset):
(res, dst) = struct.unpack_from(cls._PACK_STR, buf, offset)
msg = cls(res >> 29, addrconv.ipv6.bin_to_text(dst))
offset += cls._MIN_LEN
option = None
if len(buf) > offset:
(msg.type_, msg.length) = struct.unpack_from('!BB', buf, offset)
cls_ = cls._ND_OPTION_TYPES.get(msg.type_, None)
offset += 2
if cls_:
msg.data = cls_.parser(buf, offset)
(type_, ) = struct.unpack_from('!B', buf, offset)
cls_ = cls._ND_OPTION_TYPES.get(type_)
if cls_ is not None:
option = cls_.parser(buf, offset)
else:
msg.data = buf[offset:]
option = buf[offset:]
msg = cls(res >> 29, addrconv.ipv6.bin_to_text(dst), option)
return msg
def serialize(self):
@ -208,14 +200,12 @@ class nd_neighbor(stringify.StringifyMixin):
hdr = bytearray(struct.pack(
nd_neighbor._PACK_STR, res,
addrconv.ipv6.text_to_bin(self.dst)))
if self.type_ is not None:
hdr += bytearray(struct.pack('!BB', self.type_, self.length))
if self.type_ in nd_neighbor._ND_OPTION_TYPES:
hdr += self.data.serialize()
elif self.data is not None:
hdr += bytearray(self.data)
return hdr
if self.option is not None:
if isinstance(self.option, nd_option):
hdr.extend(self.option.serialize())
else:
hdr.extend(self.option)
return str(hdr)
@icmpv6.register_icmpv6_type(ND_ROUTER_SOLICIT)
@ -235,14 +225,8 @@ class nd_router_solicit(stringify.StringifyMixin):
Attribute Description
============== ====================
res This field is unused. It MUST be initialized to zero.
type\_ "Type" field of the first option. None if no options. \
NOTE: This implementation doesn't support two or more \
options.
length "Length" field of the first option. None if no options.
data An object to describe the first option. \
None if no options. \
Either ryu.lib.packet.icmpv6.nd_option_la object \
or a bytearray.
option a derived object of ryu.lib.packet.icmpv6.nd_option \
or a bytearray. None if no options.
============== ====================
"""
@ -258,39 +242,34 @@ class nd_router_solicit(stringify.StringifyMixin):
return cls
return _register_nd_option_type
def __init__(self, res, type_=None, length=None, data=None):
def __init__(self, res, option=None):
self.res = res
self.type_ = type_
self.length = length
self.data = data
self.option = option
@classmethod
def parser(cls, buf, offset):
res = struct.unpack_from(cls._PACK_STR, buf, offset)
msg = cls(res)
(res, ) = struct.unpack_from(cls._PACK_STR, buf, offset)
offset += cls._MIN_LEN
option = None
if len(buf) > offset:
(msg.type_, msg.length) = struct.unpack_from('!BB', buf, offset)
cls_ = cls._ND_OPTION_TYPES.get(msg.type_, None)
offset += 2
if cls_:
msg.data = cls_.parser(buf, offset)
(type_, ) = struct.unpack_from('!B', buf, offset)
cls_ = cls._ND_OPTION_TYPES.get(type_)
if cls_ is not None:
option = cls_.parser(buf, offset)
else:
msg.data = buf[offset:]
option = buf[offset:]
msg = cls(res, option)
return msg
def serialize(self):
hdr = bytearray(struct.pack(nd_router_solicit._PACK_STR, self.res))
if self.type_ is not None:
hdr += bytearray(struct.pack('!BB', self.type_, self.length))
if self.type_ in nd_router_solicit._ND_OPTION_TYPES:
hdr += self.data.serialize()
elif self.data is not None:
hdr += bytearray(self.data)
return hdr
hdr = bytearray(struct.pack(
nd_router_solicit._PACK_STR, self.res))
if self.option is not None:
if isinstance(self.option, nd_option):
hdr.extend(self.option.serialize())
else:
hdr.extend(self.option)
return str(hdr)
@icmpv6.register_icmpv6_type(ND_ROUTER_ADVERT)
@ -314,17 +293,9 @@ class nd_router_advert(stringify.StringifyMixin):
rou_l Router Lifetime.
rea_t Reachable Time.
ret_t Retrans Timer.
type\_ List of option type. Each index refers to an option. \
None if no options. \
NOTE: This implementation support one or more \
options.
length List of option length. Each index refers to an option. \
None if no options. \
data List of option data. Each index refers to an option. \
None if no options. \
ryu.lib.packet.icmpv6.nd_option_la object, \
ryu.lib.packet.icmpv6.nd_option_pi object \
or a bytearray.
options List of a derived object of \
ryu.lib.packet.icmpv6.nd_option or a bytearray. \
None if no options.
============== ====================
"""
@ -340,41 +311,32 @@ class nd_router_advert(stringify.StringifyMixin):
return cls
return _register_nd_option_type
def __init__(self, ch_l, res, rou_l, rea_t, ret_t, type_=None, length=None,
data=None):
def __init__(self, ch_l, res, rou_l, rea_t, ret_t, options=None):
self.ch_l = ch_l
self.res = res
self.rou_l = rou_l
self.rea_t = rea_t
self.ret_t = ret_t
self.type_ = type_
self.length = length
self.data = data
options = options or []
assert isinstance(options, list)
self.options = options
@classmethod
def parser(cls, buf, offset):
(ch_l, res, rou_l, rea_t, ret_t) = struct.unpack_from(cls._PACK_STR,
buf, offset)
msg = cls(ch_l, res >> 6, rou_l, rea_t, ret_t)
(ch_l, res, rou_l, rea_t, ret_t
) = struct.unpack_from(cls._PACK_STR, buf, offset)
offset += cls._MIN_LEN
msg.type_ = list()
msg.length = list()
msg.data = list()
options = []
while len(buf) > offset:
(type_, length) = struct.unpack_from('!BB', buf, offset)
msg.type_.append(type_)
msg.length.append(length)
cls_ = cls._ND_OPTION_TYPES.get(type_, None)
offset += 2
byte_len = length * 8 - 2
if cls_:
msg.data.append(cls_.parser(buf[:offset+cls_._MIN_LEN],
offset))
offset += cls_._MIN_LEN
cls_ = cls._ND_OPTION_TYPES.get(type_)
if cls_ is not None:
option = cls_.parser(buf, offset)
else:
msg.data.append(buf[offset:offset + byte_len])
offset += byte_len
option = buf[offset:offset + (length * 8 - 2)]
options.append(option)
offset += len(option)
msg = cls(ch_l, res >> 6, rou_l, rea_t, ret_t, options)
return msg
def serialize(self):
@ -382,22 +344,37 @@ class nd_router_advert(stringify.StringifyMixin):
hdr = bytearray(struct.pack(
nd_router_advert._PACK_STR, self.ch_l, res, self.rou_l,
self.rea_t, self.ret_t))
if self.type_ is not None:
for i in range(len(self.type_)):
hdr += bytearray(struct.pack('!BB', self.type_[i],
self.length[i]))
if self.type_[i] in nd_router_advert._ND_OPTION_TYPES:
hdr += self.data[i].serialize()
elif self.data[i] is not None:
hdr += bytearray(self.data[i])
for option in self.options:
if isinstance(option, nd_option):
hdr.extend(option.serialize())
else:
hdr.extend(option)
return str(hdr)
return hdr
class nd_option(stringify.StringifyMixin):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __init__(self, type_, length):
self.type_ = type_
self.length = length
@classmethod
@abc.abstractmethod
def parser(cls, buf):
pass
@abc.abstractmethod
def serialize(self):
pass
@nd_neighbor.register_nd_option_type(ND_OPTION_SLA, ND_OPTION_TLA)
@nd_router_solicit.register_nd_option_type(ND_OPTION_SLA)
@nd_router_advert.register_nd_option_type(ND_OPTION_SLA)
class nd_option_la(stringify.StringifyMixin):
class nd_option_la(nd_option):
"""ICMPv6 sub encoder/decoder class for Neighbor discovery
Source/Target Link-Layer Address Option. (RFC 4861)
@ -414,6 +391,8 @@ class nd_option_la(stringify.StringifyMixin):
============== ====================
Attribute Description
============== ====================
type\_ type of the option.
length length of the option.
hw_src Link-Layer Address. \
NOTE: If the address is longer than 6 octets this contains \
the first 6 octets in the address. \
@ -426,17 +405,19 @@ class nd_option_la(stringify.StringifyMixin):
============== ====================
"""
_PACK_STR = '!6s'
_PACK_STR = '!BB6s'
_MIN_LEN = struct.calcsize(_PACK_STR)
def __init__(self, hw_src, data=None):
def __init__(self, type_, length, hw_src, data=None):
super(nd_option_la, self).__init__(type_, length)
self.hw_src = hw_src
self.data = data
@classmethod
def parser(cls, buf, offset):
(hw_src, ) = struct.unpack_from(cls._PACK_STR, buf, offset)
msg = cls(addrconv.mac.bin_to_text(hw_src))
(type_, length, hw_src
) = struct.unpack_from(cls._PACK_STR, buf, offset)
msg = cls(type_, length, addrconv.mac.bin_to_text(hw_src))
offset += cls._MIN_LEN
if len(buf) > offset:
msg.data = buf[offset:]
@ -444,17 +425,19 @@ class nd_option_la(stringify.StringifyMixin):
return msg
def serialize(self):
hdr = bytearray(struct.pack(self._PACK_STR,
addrconv.mac.text_to_bin(self.hw_src)))
buf = bytearray(struct.pack(
self._PACK_STR, self.type_, self.length,
addrconv.mac.text_to_bin(self.hw_src)))
if self.data is not None:
hdr += bytearray(self.data)
return hdr
buf.extend(self.data)
mod = len(buf) % 8
if mod:
buf.extend(bytearray(8 - mod))
return str(buf)
@nd_router_advert.register_nd_option_type(ND_OPTION_PI)
class nd_option_pi(stringify.StringifyMixin):
class nd_option_pi(nd_option):
"""ICMPv6 sub encoder/decoder class for Neighbor discovery
Prefix Information Option. (RFC 4861)
@ -469,6 +452,8 @@ class nd_option_pi(stringify.StringifyMixin):
============== ====================
Attribute Description
============== ====================
type\_ type of the option.
length length of the option.
pl Prefix Length.
res1 L,A,R\* Flags for Prefix Information.
val_l Valid Lifetime.
@ -483,7 +468,8 @@ class nd_option_pi(stringify.StringifyMixin):
_PACK_STR = '!BBIII16s'
_MIN_LEN = struct.calcsize(_PACK_STR)
def __init__(self, pl, res1, val_l, pre_l, res2, prefix):
def __init__(self, type_, length, pl, res1, val_l, pre_l, res2, prefix):
super(nd_option_pi, self).__init__(self, type_, length)
self.pl = pl
self.res1 = res1
self.val_l = val_l
@ -493,20 +479,19 @@ class nd_option_pi(stringify.StringifyMixin):
@classmethod
def parser(cls, buf, offset):
(pl, res1, val_l, pre_l, res2, prefix) = struct.unpack_from(cls.
_PACK_STR,
buf,
offset)
msg = cls(pl, res1 >> 5, val_l, pre_l, res2,
(type_, length, pl, res1, val_l, pre_l, res2, prefix
) = struct.unpack_from(cls._PACK_STR, buf, offset)
msg = cls(type_, length, pl, res1 >> 5, val_l, pre_l, res2,
addrconv.ipv6.bin_to_text(prefix))
return msg
def serialize(self):
res1 = self.res1 << 5
hdr = bytearray(struct.pack(self._PACK_STR, self.pl, res1,
self.val_l, self.pre_l, self.res2,
addrconv.ipv6.text_to_bin(self.prefix)))
hdr = bytearray(struct.pack(
self._PACK_STR, self.type_, self.length, self.pl,
res1, self.val_l, self.pre_l, self.res2,
addrconv.ipv6.text_to_bin(self.prefix)))
return hdr

View File

@ -216,9 +216,7 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase):
nd = icmpv6.nd_neighbor(self.res, self.dst)
eq_(nd.res, self.res)
eq_(nd.dst, self.dst)
eq_(nd.type_, None)
eq_(nd.length, None)
eq_(nd.data, None)
eq_(nd.option, None)
def _test_parser(self, data=None):
buf = self.buf + str(data or '')
@ -232,11 +230,11 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase):
addrconv.ipv6.text_to_bin(self.dst))
eq_(n, None)
if data:
nd = msg.data
nd = msg.data.option
eq_(nd.type_, self.nd_type)
eq_(nd.length, self.nd_length)
eq_(nd.data.hw_src, self.nd_hw_src)
eq_(nd.data.data, None)
eq_(nd.hw_src, self.nd_hw_src)
eq_(nd.data, None)
def test_parser_without_data(self):
self._test_parser()
@ -264,9 +262,9 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase):
eq_(data, '')
def test_serialize_with_data(self):
nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
nd = icmpv6.nd_neighbor(
self.res, self.dst, self.nd_type, self.nd_length, nd_opt)
nd_opt = icmpv6.nd_option_la(
self.nd_type, self.nd_length, self.nd_hw_src)
nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
prev = ipv6(6, 0, 0, 32, 64, 255, self.src_ipv6, self.dst_ipv6)
nd_csum = icmpv6_csum(prev, self.buf + self.data)
@ -276,7 +274,7 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase):
(type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
(res, dst) = struct.unpack_from(nd._PACK_STR, buf, icmp._MIN_LEN)
(nd_type, nd_length, nd_hw_src) = struct.unpack_from(
'!BB6s', buf, icmp._MIN_LEN + nd._MIN_LEN)
nd_opt._PACK_STR, buf, icmp._MIN_LEN + nd._MIN_LEN)
data = buf[(icmp._MIN_LEN + nd._MIN_LEN + 8):]
eq_(type_, self.type_)
@ -289,12 +287,14 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase):
eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
def test_to_string(self):
nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
nd = icmpv6.nd_neighbor(
self.res, self.dst, self.nd_type, self.nd_length, nd_opt)
nd_opt = icmpv6.nd_option_la(
self.nd_type, self.nd_length, self.nd_hw_src)
nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
ic = icmpv6.icmpv6(self.type_, self.code, self.csum, nd)
nd_opt_values = {'hw_src': self.nd_hw_src,
nd_opt_values = {'type_': self.nd_type,
'length': self.nd_length,
'hw_src': self.nd_hw_src,
'data': None}
_nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
for k, v in inspect.getmembers(nd_opt)
@ -303,9 +303,7 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase):
nd_values = {'res': repr(nd.res),
'dst': repr(self.dst),
'type_': repr(self.nd_type),
'length': repr(self.nd_length),
'data': nd_opt_str}
'option': nd_opt_str}
_nd_str = ','.join(['%s=%s' % (k, nd_values[k])
for k, v in inspect.getmembers(nd)
if k in nd_values])
@ -362,9 +360,7 @@ class Test_icmpv6_router_solicit(unittest.TestCase):
def test_init(self):
rs = icmpv6.nd_router_solicit(self.res)
eq_(rs.res, self.res)
eq_(rs.type_, None)
eq_(rs.length, None)
eq_(rs.data, None)
eq_(rs.option, None)
def _test_parser(self, data=None):
buf = self.buf + str(data or '')
@ -374,14 +370,14 @@ class Test_icmpv6_router_solicit(unittest.TestCase):
eq_(msg.code, self.code)
eq_(msg.csum, self.csum)
if data is not None:
eq_(msg.data.res[0], self.res)
eq_(msg.data.res, self.res)
eq_(n, None)
if data:
rs = msg.data
rs = msg.data.option
eq_(rs.type_, self.nd_type)
eq_(rs.length, self.nd_length)
eq_(rs.data.hw_src, self.nd_hw_src)
eq_(rs.data.data, None)
eq_(rs.hw_src, self.nd_hw_src)
eq_(rs.data, None)
def test_parser_without_data(self):
self._test_parser()
@ -408,9 +404,9 @@ class Test_icmpv6_router_solicit(unittest.TestCase):
eq_(data, '')
def test_serialize_with_data(self):
nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
rs = icmpv6.nd_router_solicit(self.res, self.nd_type, self.nd_length,
nd_opt)
nd_opt = icmpv6.nd_option_la(
self.nd_type, self.nd_length, self.nd_hw_src)
rs = icmpv6.nd_router_solicit(self.res, nd_opt)
prev = ipv6(6, 0, 0, 16, 64, 255, self.src_ipv6, self.dst_ipv6)
rs_csum = icmpv6_csum(prev, self.buf + self.data)
@ -420,7 +416,7 @@ class Test_icmpv6_router_solicit(unittest.TestCase):
(type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
res = struct.unpack_from(rs._PACK_STR, buf, icmp._MIN_LEN)
(nd_type, nd_length, nd_hw_src) = struct.unpack_from(
'!BB6s', buf, icmp._MIN_LEN + rs._MIN_LEN)
nd_opt._PACK_STR, buf, icmp._MIN_LEN + rs._MIN_LEN)
data = buf[(icmp._MIN_LEN + rs._MIN_LEN + 8):]
eq_(type_, self.type_)
@ -432,12 +428,14 @@ class Test_icmpv6_router_solicit(unittest.TestCase):
eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
def test_to_string(self):
nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
rs = icmpv6.nd_router_solicit(
self.res, self.nd_type, self.nd_length, nd_opt)
nd_opt = icmpv6.nd_option_la(
self.nd_type, self.nd_length, self.nd_hw_src)
rs = icmpv6.nd_router_solicit(self.res, nd_opt)
ic = icmpv6.icmpv6(self.type_, self.code, self.csum, rs)
nd_opt_values = {'hw_src': self.nd_hw_src,
nd_opt_values = {'type_': self.nd_type,
'length': self.nd_length,
'hw_src': self.nd_hw_src,
'data': None}
_nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
for k, v in inspect.getmembers(nd_opt)
@ -445,9 +443,7 @@ class Test_icmpv6_router_solicit(unittest.TestCase):
nd_opt_str = '%s(%s)' % (icmpv6.nd_option_la.__name__, _nd_opt_str)
rs_values = {'res': repr(rs.res),
'type_': repr(self.nd_type),
'length': repr(self.nd_length),
'data': nd_opt_str}
'option': nd_opt_str}
_rs_str = ','.join(['%s=%s' % (k, rs_values[k])
for k, v in inspect.getmembers(rs)
if k in rs_values])