packet lib: icmpv6: support default parameters and the auto calculation of lengths

Signed-off-by: Yuichi Ito <ito.yuichi0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
Yuichi Ito 2013-11-06 17:16:04 +09:00 committed by FUJITA Tomonori
parent 42942c7c5d
commit 1d1cdeea99
2 changed files with 443 additions and 13 deletions

View File

@ -102,7 +102,7 @@ class icmpv6(packet_base.PacketBase):
return cls
return _register_icmpv6_type
def __init__(self, type_, code, csum, data=None):
def __init__(self, type_=0, code=0, csum=0, data=None):
super(icmpv6, self).__init__()
self.type_ = type_
self.code = code
@ -180,7 +180,7 @@ class nd_neighbor(stringify.StringifyMixin):
return cls
return _register_nd_option_type(args[0])
def __init__(self, res, dst, option=None):
def __init__(self, res=0, dst='::', option=None):
self.res = res
self.dst = dst
self.option = option
@ -252,7 +252,7 @@ class nd_router_solicit(stringify.StringifyMixin):
return cls
return _register_nd_option_type(args[0])
def __init__(self, res, option=None):
def __init__(self, res=0, option=None):
self.res = res
self.option = option
@ -326,7 +326,7 @@ class nd_router_advert(stringify.StringifyMixin):
return cls
return _register_nd_option_type(args[0])
def __init__(self, ch_l, res, rou_l, rea_t, ret_t, options=None):
def __init__(self, ch_l=0, res=0, rou_l=0, rea_t=0, ret_t=0, options=None):
self.ch_l = ch_l
self.res = res
self.rou_l = rou_l
@ -430,6 +430,9 @@ class nd_option_la(nd_option):
mod = len(buf) % 8
if mod:
buf.extend(bytearray(8 - mod))
if 0 == self.length:
self.length = len(buf) / 8
struct.pack_into('!B', buf, 1, self.length)
return str(buf)
def __len__(self):
@ -459,7 +462,8 @@ class nd_option_sla(nd_option_la):
============== ====================
Attribute Description
============== ====================
length length of the option.
length length of the option. \
(0 means automatically-calculate when encoding)
hw_src Link-Layer Address. \
NOTE: If the address is longer than 6 octets this contains \
the first 6 octets in the address. \
@ -476,7 +480,7 @@ class nd_option_sla(nd_option_la):
def option_type(cls):
return ND_OPTION_SLA
def __init__(self, length, hw_src, data=None):
def __init__(self, length=0, hw_src='00:00:00:00:00:00', data=None):
super(nd_option_sla, self).__init__(length, hw_src, data)
@ -496,7 +500,8 @@ class nd_option_tla(nd_option_la):
============== ====================
Attribute Description
============== ====================
length length of the option.
length length of the option. \
(0 means automatically-calculate when encoding)
hw_src Link-Layer Address. \
NOTE: If the address is longer than 6 octets this contains \
the first 6 octets in the address. \
@ -513,7 +518,7 @@ class nd_option_tla(nd_option_la):
def option_type(cls):
return ND_OPTION_TLA
def __init__(self, length, hw_src, data=None):
def __init__(self, length=0, hw_src='00:00:00:00:00:00', data=None):
super(nd_option_tla, self).__init__(length, hw_src, data)
@ -533,7 +538,8 @@ class nd_option_pi(nd_option):
============== ====================
Attribute Description
============== ====================
length length of the option.
length length of the option. \
(0 means automatically-calculate when encoding)
pl Prefix Length.
res1 L,A,R\* Flags for Prefix Information.
val_l Valid Lifetime.
@ -552,7 +558,8 @@ class nd_option_pi(nd_option):
def option_type(cls):
return ND_OPTION_PI
def __init__(self, length, pl, res1, val_l, pre_l, res2, prefix):
def __init__(self, length=0, pl=0, res1=0, val_l=0, pre_l=0, res2=0,
prefix='::'):
super(nd_option_pi, self).__init__(self.option_type(), length)
self.pl = pl
self.res1 = res1
@ -576,8 +583,10 @@ class nd_option_pi(nd_option):
self._PACK_STR, self.option_type(), self.length, self.pl,
res1, self.val_l, self.pre_l, self.res2,
addrconv.ipv6.text_to_bin(self.prefix)))
return hdr
if 0 == self.length:
self.length = len(hdr) / 8
struct.pack_into('!B', hdr, 1, self.length)
return str(hdr)
@icmpv6.register_icmpv6_type(ICMPV6_ECHO_REPLY, ICMPV6_ECHO_REQUEST)
@ -604,7 +613,7 @@ class echo(stringify.StringifyMixin):
_PACK_STR = '!HH'
_MIN_LEN = struct.calcsize(_PACK_STR)
def __init__(self, id_, seq, data=None):
def __init__(self, id_=0, seq=0, data=None):
self.id = id_
self.seq = seq
self.data = data

View File

@ -89,6 +89,17 @@ class Test_icmpv6_header(unittest.TestCase):
m_short_buf = self.buf[1:self.icmp._MIN_LEN]
self.icmp.parser(m_short_buf)
def test_default_args(self):
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6()
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf))
eq_(res[0], 0)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
class Test_icmpv6_echo_request(unittest.TestCase):
type_ = 128
@ -182,6 +193,23 @@ class Test_icmpv6_echo_request(unittest.TestCase):
eq_(str(ic), ic_str)
eq_(repr(ic), ic_str)
def test_default_args(self):
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ICMPV6_ECHO_REQUEST, data=icmpv6.echo())
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ICMPV6_ECHO_REQUEST)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.echo._PACK_STR, str(buf[4:]))
eq_(res[0], 0)
eq_(res[1], 0)
class Test_icmpv6_echo_reply(Test_icmpv6_echo_request):
def setUp(self):
@ -189,6 +217,23 @@ class Test_icmpv6_echo_reply(Test_icmpv6_echo_request):
self.csum = 0xa472
self.buf = '\x81\x00\xa4\x72\x76\x20\x00\x00'
def test_default_args(self):
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ICMPV6_ECHO_REPLY, data=icmpv6.echo())
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ICMPV6_ECHO_REPLY)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.echo._PACK_STR, str(buf[4:]))
eq_(res[0], 0)
eq_(res[1], 0)
class Test_icmpv6_neighbor_solicit(unittest.TestCase):
type_ = 135
@ -317,6 +362,48 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase):
eq_(str(ic), ic_str)
eq_(repr(ic), ic_str)
def test_default_args(self):
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_NEIGHBOR_SOLICIT, data=icmpv6.nd_neighbor())
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ND_NEIGHBOR_SOLICIT)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.nd_neighbor._PACK_STR, str(buf[4:]))
eq_(res[0], 0)
eq_(res[1], addrconv.ipv6.text_to_bin('::'))
# with nd_option_sla
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_NEIGHBOR_SOLICIT,
data=icmpv6.nd_neighbor(
option=icmpv6.nd_option_sla()))
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ND_NEIGHBOR_SOLICIT)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.nd_neighbor._PACK_STR, str(buf[4:24]))
eq_(res[0], 0)
eq_(res[1], addrconv.ipv6.text_to_bin('::'))
res = struct.unpack(icmpv6.nd_option_sla._PACK_STR, str(buf[24:]))
eq_(res[0], icmpv6.ND_OPTION_SLA)
eq_(res[1], len(icmpv6.nd_option_sla()) / 8)
eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solicit):
def setUp(self):
@ -390,6 +477,48 @@ class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solicit):
eq_(str(ic), ic_str)
eq_(repr(ic), ic_str)
def test_default_args(self):
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_NEIGHBOR_ADVERT, data=icmpv6.nd_neighbor())
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ND_NEIGHBOR_ADVERT)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.nd_neighbor._PACK_STR, str(buf[4:]))
eq_(res[0], 0)
eq_(res[1], addrconv.ipv6.text_to_bin('::'))
# with nd_option_tla
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_NEIGHBOR_ADVERT,
data=icmpv6.nd_neighbor(
option=icmpv6.nd_option_tla()))
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ND_NEIGHBOR_ADVERT)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.nd_neighbor._PACK_STR, str(buf[4:24]))
eq_(res[0], 0)
eq_(res[1], addrconv.ipv6.text_to_bin('::'))
res = struct.unpack(icmpv6.nd_option_tla._PACK_STR, str(buf[24:]))
eq_(res[0], icmpv6.ND_OPTION_TLA)
eq_(res[1], len(icmpv6.nd_option_tla()) / 8)
eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
class Test_icmpv6_router_solicit(unittest.TestCase):
type_ = 133
@ -509,3 +638,295 @@ class Test_icmpv6_router_solicit(unittest.TestCase):
eq_(str(ic), ic_str)
eq_(repr(ic), ic_str)
def test_default_args(self):
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_ROUTER_SOLICIT, data=icmpv6.nd_router_solicit())
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ND_ROUTER_SOLICIT)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.nd_router_solicit._PACK_STR, str(buf[4:]))
eq_(res[0], 0)
# with nd_option_sla
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_ROUTER_SOLICIT,
data=icmpv6.nd_router_solicit(
option=icmpv6.nd_option_sla()))
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ND_ROUTER_SOLICIT)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.nd_router_solicit._PACK_STR, str(buf[4:8]))
eq_(res[0], 0)
res = struct.unpack(icmpv6.nd_option_sla._PACK_STR, str(buf[8:]))
eq_(res[0], icmpv6.ND_OPTION_SLA)
eq_(res[1], len(icmpv6.nd_option_sla()) / 8)
eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
class Test_icmpv6_router_advert(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_default_args(self):
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_ROUTER_ADVERT, data=icmpv6.nd_router_advert())
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ND_ROUTER_ADVERT)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.nd_router_advert._PACK_STR, str(buf[4:]))
eq_(res[0], 0)
eq_(res[1], 0)
eq_(res[2], 0)
eq_(res[3], 0)
eq_(res[4], 0)
# with nd_option_sla
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_ROUTER_ADVERT,
data=icmpv6.nd_router_advert(
options=[icmpv6.nd_option_sla()]))
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ND_ROUTER_ADVERT)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.nd_router_advert._PACK_STR, str(buf[4:16]))
eq_(res[0], 0)
eq_(res[1], 0)
eq_(res[2], 0)
eq_(res[3], 0)
eq_(res[4], 0)
res = struct.unpack(icmpv6.nd_option_sla._PACK_STR, str(buf[16:]))
eq_(res[0], icmpv6.ND_OPTION_SLA)
eq_(res[1], len(icmpv6.nd_option_sla()) / 8)
eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
# with nd_option_pi
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_ROUTER_ADVERT,
data=icmpv6.nd_router_advert(
options=[icmpv6.nd_option_pi()]))
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ND_ROUTER_ADVERT)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.nd_router_advert._PACK_STR, str(buf[4:16]))
eq_(res[0], 0)
eq_(res[1], 0)
eq_(res[2], 0)
eq_(res[3], 0)
eq_(res[4], 0)
res = struct.unpack(icmpv6.nd_option_pi._PACK_STR, str(buf[16:]))
eq_(res[0], icmpv6.ND_OPTION_PI)
eq_(res[1], 4)
eq_(res[2], 0)
eq_(res[3], 0)
eq_(res[4], 0)
eq_(res[5], 0)
eq_(res[6], 0)
eq_(res[7], addrconv.ipv6.text_to_bin('::'))
# with nd_option_sla and nd_option_pi
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_ROUTER_ADVERT,
data=icmpv6.nd_router_advert(
options=[icmpv6.nd_option_sla(), icmpv6.nd_option_pi()]))
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ND_ROUTER_ADVERT)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.nd_router_advert._PACK_STR, str(buf[4:16]))
eq_(res[0], 0)
eq_(res[1], 0)
eq_(res[2], 0)
eq_(res[3], 0)
eq_(res[4], 0)
res = struct.unpack(icmpv6.nd_option_sla._PACK_STR, str(buf[16:24]))
eq_(res[0], icmpv6.ND_OPTION_SLA)
eq_(res[1], len(icmpv6.nd_option_sla()) / 8)
eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
res = struct.unpack(icmpv6.nd_option_pi._PACK_STR, str(buf[24:]))
eq_(res[0], icmpv6.ND_OPTION_PI)
eq_(res[1], len(icmpv6.nd_option_pi()) / 8)
eq_(res[2], 0)
eq_(res[3], 0)
eq_(res[4], 0)
eq_(res[5], 0)
eq_(res[6], 0)
eq_(res[7], addrconv.ipv6.text_to_bin('::'))
class Test_icmpv6_nd_option_la(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_default_args(self):
la = icmpv6.nd_option_sla()
buf = la.serialize()
res = struct.unpack(icmpv6.nd_option_sla._PACK_STR, str(buf))
eq_(res[0], icmpv6.ND_OPTION_SLA)
eq_(res[1], len(icmpv6.nd_option_sla()) / 8)
eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
# with nd_neighbor
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_NEIGHBOR_ADVERT,
data=icmpv6.nd_neighbor(
option=icmpv6.nd_option_tla()))
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ND_NEIGHBOR_ADVERT)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.nd_neighbor._PACK_STR, str(buf[4:24]))
eq_(res[0], 0)
eq_(res[1], addrconv.ipv6.text_to_bin('::'))
res = struct.unpack(icmpv6.nd_option_tla._PACK_STR, str(buf[24:]))
eq_(res[0], icmpv6.ND_OPTION_TLA)
eq_(res[1], len(icmpv6.nd_option_tla()) / 8)
eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
# with nd_router_solicit
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_ROUTER_SOLICIT,
data=icmpv6.nd_router_solicit(
option=icmpv6.nd_option_sla()))
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ND_ROUTER_SOLICIT)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.nd_router_solicit._PACK_STR, str(buf[4:8]))
eq_(res[0], 0)
res = struct.unpack(icmpv6.nd_option_sla._PACK_STR, str(buf[8:]))
eq_(res[0], icmpv6.ND_OPTION_SLA)
eq_(res[1], len(icmpv6.nd_option_sla()) / 8)
eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
class Test_icmpv6_nd_option_pi(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_default_args(self):
pi = icmpv6.nd_option_pi()
buf = pi.serialize()
res = struct.unpack(icmpv6.nd_option_pi._PACK_STR, str(buf))
eq_(res[0], icmpv6.ND_OPTION_PI)
eq_(res[1], len(icmpv6.nd_option_pi()) / 8)
eq_(res[2], 0)
eq_(res[3], 0)
eq_(res[4], 0)
eq_(res[5], 0)
eq_(res[6], 0)
eq_(res[7], addrconv.ipv6.text_to_bin('::'))
# with nd_router_advert
prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
ic = icmpv6.icmpv6(
type_=icmpv6.ND_ROUTER_ADVERT,
data=icmpv6.nd_router_advert(
options=[icmpv6.nd_option_pi()]))
prev.serialize(ic, None)
buf = ic.serialize(bytearray(), prev)
res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
eq_(res[0], icmpv6.ND_ROUTER_ADVERT)
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
res = struct.unpack(icmpv6.nd_router_advert._PACK_STR, str(buf[4:16]))
eq_(res[0], 0)
eq_(res[1], 0)
eq_(res[2], 0)
eq_(res[3], 0)
eq_(res[4], 0)
res = struct.unpack(icmpv6.nd_option_pi._PACK_STR, str(buf[16:]))
eq_(res[0], icmpv6.ND_OPTION_PI)
eq_(res[1], 4)
eq_(res[2], 0)
eq_(res[3], 0)
eq_(res[4], 0)
eq_(res[5], 0)
eq_(res[6], 0)
eq_(res[7], addrconv.ipv6.text_to_bin('::'))