packet lib: add unittests for reversibility about json

this patch adds tests that examine reversibility about json for all packet library.
the test codes are written as:

    jsondict = msg1.to_jsondict()
    msg2 = cls.from_jsondict(jsondict['msg'])
    eq_(str(msg1), str(msg2))

TODO: make VRRP reversible.
since VRRP has an unusual construction method, VRRP is not reversible about json.

Signed-off-by: Yuichi Ito <ito.yuichi0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
Yuichi Ito 2013-12-12 15:35:27 +09:00 committed by FUJITA Tomonori
parent 1e26d5a982
commit f7118a89c1
19 changed files with 432 additions and 0 deletions

View File

@ -177,3 +177,8 @@ class Test_arp(unittest.TestCase):
def test_malformed_arp(self):
m_short_buf = self.buf[1:arp._MIN_LEN]
arp.parser(m_short_buf)
def test_json(self):
jsondict = self.a.to_jsondict()
a = arp.from_jsondict(jsondict['arp'])
eq_(str(self.a), str(a))

View File

@ -195,3 +195,88 @@ class Test_bgp(unittest.TestCase):
binmsg2 = msg.serialize()
eq_(binmsg, binmsg2)
eq_(rest, '')
def test_json1(self):
opt_param = [bgp.BGPOptParamCapabilityUnknown(cap_code=200,
cap_value='hoge'),
bgp.BGPOptParamCapabilityRouteRefresh(),
bgp.BGPOptParamCapabilityMultiprotocol(
afi=afi.IP, safi=safi.MPLS_VPN),
bgp.BGPOptParamCapabilityFourOctetAsNumber(
as_number=1234567),
bgp.BGPOptParamUnknown(type_=99, value='fuga')]
msg1 = bgp.BGPOpen(my_as=30000, bgp_identifier='192.0.2.2',
opt_param=opt_param)
jsondict = msg1.to_jsondict()
msg2 = bgp.BGPOpen.from_jsondict(jsondict['BGPOpen'])
eq_(str(msg1), str(msg2))
def test_json2(self):
withdrawn_routes = [bgp.BGPWithdrawnRoute(length=0,
addr='192.0.2.13'),
bgp.BGPWithdrawnRoute(length=1,
addr='192.0.2.13'),
bgp.BGPWithdrawnRoute(length=3,
addr='192.0.2.13'),
bgp.BGPWithdrawnRoute(length=7,
addr='192.0.2.13'),
bgp.BGPWithdrawnRoute(length=32,
addr='192.0.2.13')]
mp_nlri = [
bgp._BinAddrPrefix(32, 'efgh\0\0'),
bgp._BinAddrPrefix(16, 'ij\0\0\0\0'),
]
communities = [
bgp.BGP_COMMUNITY_NO_EXPORT,
bgp.BGP_COMMUNITY_NO_ADVERTISE,
]
ecommunities = [
bgp.BGPTwoOctetAsSpecificExtendedCommunity(subtype=1,
as_number=65500,
local_administrator=
3908876543),
bgp.BGPFourOctetAsSpecificExtendedCommunity(subtype=2,
as_number=10000000,
local_administrator=
59876),
bgp.BGPIPv4AddressSpecificExtendedCommunity(subtype=3,
ipv4_address=
'192.0.2.1',
local_administrator=
65432),
bgp.BGPOpaqueExtendedCommunity(opaque='abcdefg'),
bgp.BGPUnknownExtendedCommunity(type_=99, value='abcdefg'),
]
path_attributes = [
bgp.BGPPathAttributeOrigin(value=1),
bgp.BGPPathAttributeAsPath(value=[[1000], set([1001, 1002]),
[1003, 1004]]),
bgp.BGPPathAttributeNextHop(value='192.0.2.199'),
bgp.BGPPathAttributeMultiExitDisc(value=2000000000),
bgp.BGPPathAttributeLocalPref(value=1000000000),
bgp.BGPPathAttributeAtomicAggregate(),
bgp.BGPPathAttributeAggregator(as_number=40000,
addr='192.0.2.99'),
bgp.BGPPathAttributeCommunities(communities=communities),
bgp.BGPPathAttributeExtendedCommunities(communities=ecommunities),
bgp.BGPPathAttributeAs4Path(value=[[1000000], set([1000001, 1002]),
[1003, 1000004]]),
bgp.BGPPathAttributeAs4Aggregator(as_number=100040000,
addr='192.0.2.99'),
bgp.BGPPathAttributeMpReachNLRI(afi=afi.IP, safi=safi.MPLS_VPN,
next_hop='abcd',
nlri=mp_nlri),
bgp.BGPPathAttributeMpUnreachNLRI(afi=afi.IP, safi=safi.MPLS_VPN,
withdrawn_routes=mp_nlri),
bgp.BGPPathAttributeUnknown(flags=0, type_=100, value=300*'bar')
]
nlri = [
bgp.BGPNLRI(length=24, addr='203.0.113.1'),
bgp.BGPNLRI(length=16, addr='203.0.113.0')
]
msg1 = bgp.BGPUpdate(withdrawn_routes=withdrawn_routes,
path_attributes=path_attributes,
nlri=nlri)
jsondict = msg1.to_jsondict()
msg2 = bgp.BGPUpdate.from_jsondict(jsondict['BGPUpdate'])
eq_(str(msg1), str(msg2))

View File

@ -0,0 +1,54 @@
# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import unittest
import logging
from nose.tools import eq_
from ryu.lib.packet import bpdu
LOG = logging.getLogger(__name__)
class Test_ConfigurationBPDUs(unittest.TestCase):
msg = bpdu.ConfigurationBPDUs()
def test_json(self):
jsondict = self.msg.to_jsondict()
msg = bpdu.ConfigurationBPDUs.from_jsondict(
jsondict['ConfigurationBPDUs'])
eq_(str(self.msg), str(msg))
class Test_TopologyChangeNotificationBPDUs(unittest.TestCase):
msg = bpdu.TopologyChangeNotificationBPDUs()
def test_json(self):
jsondict = self.msg.to_jsondict()
msg = bpdu.TopologyChangeNotificationBPDUs.from_jsondict(
jsondict['TopologyChangeNotificationBPDUs'])
eq_(str(self.msg), str(msg))
class Test_RstBPDUs(unittest.TestCase):
msg = bpdu.RstBPDUs()
def test_json(self):
jsondict = self.msg.to_jsondict()
msg = bpdu.RstBPDUs.from_jsondict(jsondict['RstBPDUs'])
eq_(str(self.msg), str(msg))

View File

@ -196,3 +196,8 @@ class Test_dhcp_offer(unittest.TestCase):
eq_(str(self.dh), dh_str)
eq_(repr(self.dh), dh_str)
def test_json(self):
jsondict = self.dh.to_jsondict()
dh = dhcp.dhcp.from_jsondict(jsondict['dhcp'])
eq_(str(self.dh), str(dh))

View File

@ -96,3 +96,8 @@ class Test_ethernet(unittest.TestCase):
eq_(res[0], addrconv.mac.text_to_bin('ff:ff:ff:ff:ff:ff'))
eq_(res[1], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
eq_(res[2], ether.ETH_TYPE_IP)
def test_json(self):
jsondict = self.e.to_jsondict()
e = ethernet.from_jsondict(jsondict['ethernet'])
eq_(str(self.e), str(e))

View File

@ -235,6 +235,23 @@ class Test_icmp(unittest.TestCase):
eq_(res[1], 0)
eq_(buf[4:], '\x00\x00\x00\x00')
def test_json(self):
jsondict = self.ic.to_jsondict()
ic = icmp.icmp.from_jsondict(jsondict['icmp'])
eq_(str(self.ic), str(ic))
def test_json_with_echo(self):
self.setUp_with_echo()
self.test_json()
def test_json_with_dest_unreach(self):
self.setUp_with_dest_unreach()
self.test_json()
def test_json_with_TimeExceeded(self):
self.setUp_with_TimeExceeded()
self.test_json()
class Test_echo(unittest.TestCase):

View File

@ -100,6 +100,11 @@ class Test_icmpv6_header(unittest.TestCase):
eq_(res[1], 0)
eq_(res[2], icmpv6_csum(prev, buf))
def test_json(self):
jsondict = self.icmp.to_jsondict()
icmp = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
eq_(str(self.icmp), str(icmp))
class Test_icmpv6_echo_request(unittest.TestCase):
type_ = 128
@ -210,6 +215,13 @@ class Test_icmpv6_echo_request(unittest.TestCase):
eq_(res[0], 0)
eq_(res[1], 0)
def test_json(self):
ec = icmpv6.echo(self.id_, self.seq, self.data)
ic1 = icmpv6.icmpv6(self.type_, self.code, self.csum, ec)
jsondict = ic1.to_jsondict()
ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
eq_(str(ic1), str(ic2))
class Test_icmpv6_echo_reply(Test_icmpv6_echo_request):
def setUp(self):
@ -404,6 +416,14 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase):
eq_(res[1], len(icmpv6.nd_option_sla()) / 8)
eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
def test_json(self):
nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
ic1 = icmpv6.icmpv6(self.type_, self.code, self.csum, nd)
jsondict = ic1.to_jsondict()
ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
eq_(str(ic1), str(ic2))
class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solicit):
def setUp(self):
@ -679,6 +699,14 @@ class Test_icmpv6_router_solicit(unittest.TestCase):
eq_(res[1], len(icmpv6.nd_option_sla()) / 8)
eq_(res[2], addrconv.mac.text_to_bin('00:00:00:00:00:00'))
def test_json(self):
nd_opt = icmpv6.nd_option_sla(self.nd_length, self.nd_hw_src)
rs = icmpv6.nd_router_solicit(self.res, nd_opt)
ic1 = icmpv6.icmpv6(self.type_, self.code, self.csum, rs)
jsondict = ic1.to_jsondict()
ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
eq_(str(ic1), str(ic2))
class Test_icmpv6_router_advert(unittest.TestCase):
@ -808,6 +836,15 @@ class Test_icmpv6_router_advert(unittest.TestCase):
eq_(res[6], 0)
eq_(res[7], addrconv.ipv6.text_to_bin('::'))
def test_json(self):
ic1 = icmpv6.icmpv6(
type_=icmpv6.ND_ROUTER_ADVERT,
data=icmpv6.nd_router_advert(
options=[icmpv6.nd_option_sla(), icmpv6.nd_option_pi()]))
jsondict = ic1.to_jsondict()
ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
eq_(str(ic1), str(ic2))
class Test_icmpv6_nd_option_la(unittest.TestCase):

View File

@ -154,3 +154,8 @@ class Test_igmp(unittest.TestCase):
eq_(res[0], 0x11)
eq_(res[1], 0)
eq_(res[3], addrconv.ipv4.text_to_bin('0.0.0.0'))
def test_json(self):
jsondict = self.g.to_jsondict()
g = igmp.from_jsondict(jsondict['igmp'])
eq_(str(self.g), str(g))

View File

@ -130,3 +130,8 @@ class Test_ipv4(unittest.TestCase):
def test_malformed_ipv4(self):
m_short_buf = self.buf[1:ipv4._MIN_LEN]
ipv4.parser(m_short_buf)
def test_json(self):
jsondict = self.ip.to_jsondict()
ip = ipv4.from_jsondict(jsondict['ipv4'])
eq_(str(self.ip), str(ip))

View File

@ -425,6 +425,31 @@ class Test_ipv6(unittest.TestCase):
eq_(res[5], addrconv.ipv6.text_to_bin('::'))
eq_(res[6], '\x3a\x00\x05\x02\x00\x00\x01\x00')
def test_json(self):
jsondict = self.ip.to_jsondict()
ip = ipv6.ipv6.from_jsondict(jsondict['ipv6'])
eq_(str(self.ip), str(ip))
def test_json_with_hop_opts(self):
self.setUp_with_hop_opts()
self.test_json()
def test_json_with_dst_opts(self):
self.setUp_with_dst_opts()
self.test_json()
def test_json_with_fragment(self):
self.setUp_with_fragment()
self.test_json()
def test_json_with_auth(self):
self.setUp_with_auth()
self.test_json()
def test_json_with_multi_headers(self):
self.setUp_with_multi_headers()
self.test_json()
class Test_hop_opts(unittest.TestCase):

View File

@ -0,0 +1,42 @@
# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import unittest
import logging
from nose.tools import eq_
from ryu.lib.packet import llc
LOG = logging.getLogger(__name__)
class Test_ControlFormatI(unittest.TestCase):
msg = llc.llc(llc.SAP_BPDU, llc.SAP_BPDU, llc.ControlFormatI())
def test_json(self):
jsondict = self.msg.to_jsondict()
msg = llc.llc.from_jsondict(jsondict['llc'])
eq_(str(self.msg), str(msg))
class Test_ControlFormatS(Test_ControlFormatI):
msg = llc.llc(llc.SAP_BPDU, llc.SAP_BPDU, llc.ControlFormatS())
class Test_ControlFormatU(Test_ControlFormatI):
msg = llc.llc(llc.SAP_BPDU, llc.SAP_BPDU, llc.ControlFormatU())

View File

@ -178,6 +178,19 @@ class TestLLDPMandatoryTLV(unittest.TestCase):
eq_(str(lldp_pkt), lldp_str)
eq_(repr(lldp_pkt), lldp_str)
def test_json(self):
chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS,
chassis_id='\x00\x04\x96\x1f\xa7\x26')
port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME,
port_id='1/3')
ttl = lldp.TTL(ttl=120)
end = lldp.End()
tlvs = (chassis_id, port_id, ttl, end)
lldp1 = lldp.lldp(tlvs)
jsondict = lldp1.to_jsondict()
lldp2 = lldp.lldp.from_jsondict(jsondict['lldp'])
eq_(str(lldp1), str(lldp2))
class TestLLDPOptionalTLV(unittest.TestCase):
def setUp(self):
@ -474,3 +487,33 @@ class TestLLDPOptionalTLV(unittest.TestCase):
eq_(str(lldp_pkt), lldp_str)
eq_(repr(lldp_pkt), lldp_str)
def test_json(self):
chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS,
chassis_id='\x00\x01\x30\xf9\xad\xa0')
port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME,
port_id='1/1')
ttl = lldp.TTL(ttl=120)
port_desc = lldp.PortDescription(
port_description='Summit300-48-Port 1001\x00')
sys_name = lldp.SystemName(system_name='Summit300-48\x00')
sys_desc = lldp.SystemDescription(
system_description='Summit300-48 - Version 7.4e.1 (Build 5) '
+ 'by Release_Master 05/27/05 04:53:11\x00')
sys_cap = lldp.SystemCapabilities(
subtype=lldp.ChassisID.SUB_CHASSIS_COMPONENT,
system_cap=0x14,
enabled_cap=0x14)
man_addr = lldp.ManagementAddress(
addr_subtype=0x06, addr='\x00\x01\x30\xf9\xad\xa0',
intf_subtype=0x02, intf_num=1001,
oid='')
org_spec = lldp.OrganizationallySpecific(
oui='\x00\x12\x0f', subtype=0x02, info='\x07\x01\x00')
end = lldp.End()
tlvs = (chassis_id, port_id, ttl, port_desc, sys_name,
sys_desc, sys_cap, man_addr, org_spec, end)
lldp1 = lldp.lldp(tlvs)
jsondict = lldp1.to_jsondict()
lldp2 = lldp.lldp.from_jsondict(jsondict['lldp'])
eq_(str(lldp1), str(lldp2))

View File

@ -52,3 +52,8 @@ class Test_mpls(unittest.TestCase):
eq_(str(self.mp), mpls_str)
eq_(repr(self.mp), mpls_str)
def test_json(self):
jsondict = self.mp.to_jsondict()
mp = mpls.mpls.from_jsondict(jsondict['mpls'])
eq_(str(self.mp), str(mp))

View File

@ -165,3 +165,8 @@ class Test_itag(unittest.TestCase):
def test_malformed_itag(self):
m_short_buf = self.buf[1:pbb.itag._MIN_LEN]
pbb.itag.parser(m_short_buf)
def test_json(self):
jsondict = self.it.to_jsondict()
it = pbb.itag.from_jsondict(jsondict['itag'])
eq_(str(self.it), str(it))

View File

@ -1382,3 +1382,72 @@ class Test_sctp(unittest.TestCase):
def test_to_string_with_multi_chunks(self):
self.setUp_with_multi_chunks()
self.test_to_string()
def test_json(self):
jsondict = self.sc.to_jsondict()
sc = sctp.sctp.from_jsondict(jsondict['sctp'])
eq_(str(self.sc), str(sc))
def test_json_with_data(self):
self.setUp_with_data()
self.test_json()
def test_json_with_init(self):
self.setUp_with_init()
self.test_json()
def test_json_with_init_ack(self):
self.setUp_with_init_ack()
self.test_json()
def test_json_with_sack(self):
self.setUp_with_sack()
self.test_json()
def test_json_with_heartbeat(self):
self.setUp_with_heartbeat()
self.test_json()
def test_json_with_heartbeat_ack(self):
self.setUp_with_heartbeat_ack()
self.test_json()
def test_json_with_abort(self):
self.setUp_with_abort()
self.test_json()
def test_json_with_shutdown(self):
self.setUp_with_shutdown()
self.test_json()
def test_json_with_shutdown_ack(self):
self.setUp_with_shutdown_ack()
self.test_json()
def test_json_with_error(self):
self.setUp_with_error()
self.test_json()
def test_json_with_cookie_echo(self):
self.setUp_with_cookie_echo()
self.test_json()
def test_json_with_cookie_ack(self):
self.setUp_with_cookie_ack()
self.test_json()
def test_json_with_ecn_echo(self):
self.setUp_with_ecn_echo()
self.test_json()
def test_json_with_cwr(self):
self.setUp_with_cwr()
self.test_json()
def test_json_with_shutdown_complete(self):
self.setUp_with_shutdown_complete()
self.test_json()
def test_json_with_multi_chunks(self):
self.setUp_with_multi_chunks()
self.test_json()

View File

@ -1095,3 +1095,8 @@ class Test_lacp(unittest.TestCase):
-1,
self.collector_max_delay)
l.serialize()
def test_json(self):
jsondict = self.l.to_jsondict()
l = lacp.from_jsondict(jsondict['lacp'])
eq_(str(self.l), str(l))

View File

@ -183,3 +183,8 @@ class Test_tcp(unittest.TestCase):
eq_(res[6], 0)
eq_(res[8], 0)
eq_(res[9], '\x01\x02\x03\x00\x00\x00\x00\x00')
def test_json(self):
jsondict = self.t.to_jsondict()
t = tcp.from_jsondict(jsondict['tcp'])
eq_(str(self.t), str(t))

View File

@ -104,3 +104,8 @@ class Test_udp(unittest.TestCase):
eq_(res[0], 0)
eq_(res[1], 0)
eq_(res[2], udp._MIN_LEN)
def test_json(self):
jsondict = self.u.to_jsondict()
u = udp.from_jsondict(jsondict['udp'])
eq_(str(self.u), str(u))

View File

@ -138,6 +138,11 @@ class Test_vlan(unittest.TestCase):
m_short_buf = self.buf[1:vlan._MIN_LEN]
vlan.parser(m_short_buf)
def test_json(self):
jsondict = self.v.to_jsondict()
v = vlan.from_jsondict(jsondict['vlan'])
eq_(str(self.v), str(v))
class Test_svlan(unittest.TestCase):
@ -254,3 +259,8 @@ class Test_svlan(unittest.TestCase):
def test_malformed_svlan(self):
m_short_buf = self.buf[1:svlan._MIN_LEN]
svlan.parser(m_short_buf)
def test_json(self):
jsondict = self.sv.to_jsondict()
sv = svlan.from_jsondict(jsondict['svlan'])
eq_(str(self.sv), str(sv))