packet lib to string: unit tests

Signed-off-by: WATANABE Fumitaka <watanabe.fumitaka@nttcom.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
watanabe.fumitaka 2013-08-05 13:07:47 +09:00 committed by FUJITA Tomonori
parent e0d82b9d37
commit 0b5291856b
8 changed files with 895 additions and 0 deletions

View File

@ -0,0 +1,111 @@
# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import logging
import socket
import unittest
from nose.tools import *
from nose.plugins.skip import Skip, SkipTest
from ryu.lib import ip
from ryu.lib import mac
from ryu.lib.packet import dhcp
LOG = logging.getLogger(__name__)
class Test_dhcp_offer(unittest.TestCase):
op = dhcp.DHCP_BOOT_REPLY
chaddr = 'AA:AA:AA:AA:AA:AA'
htype = 1
hlen = 6
hops = 0
xid = 1
secs = 0
flags = 1
ciaddr = '192.168.10.10'
yiaddr = '192.168.20.20'
siaddr = '192.168.30.30'
giaddr = '192.168.40.40'
sname = 'abc'
boot_file = ''
option_list = [dhcp.option('35', '02'),
dhcp.option('01', 'ffffff00'),
dhcp.option('03', 'c0a80a09'),
dhcp.option('06', 'c0a80a09'),
dhcp.option('33', '0003f480'),
dhcp.option('3a', '0001fa40'),
dhcp.option('3b', '000375f0'),
dhcp.option('36', 'c0a80a09')]
magic_cookie = socket.inet_aton('99.130.83.99')
options = dhcp.options(option_list=option_list,
magic_cookie=magic_cookie)
dh = dhcp.dhcp(op, chaddr, options, htype=htype, hlen=hlen,
hops=hops, xid=xid, secs=secs, flags=flags,
ciaddr=ciaddr, yiaddr=yiaddr, siaddr=siaddr,
giaddr=giaddr, sname=sname, boot_file=boot_file)
def setUp(self):
pass
def tearDown(self):
pass
def test_to_string(self):
option_values = ['tag', 'length', 'value']
opt_str_list = []
for option in self.option_list:
_opt_str = ','.join(['%s=%s' % (k, repr(getattr(option, k)))
for k, v in inspect.getmembers(option)
if k in option_values])
opt_str = '%s(%s)' % (dhcp.option.__name__, _opt_str)
opt_str_list.append(opt_str)
option_str = '[%s]' % ', '.join(opt_str_list)
opts_vals = {'magic_cookie': repr(self.magic_cookie),
'option_list': option_str,
'options_len': repr(self.options.options_len)}
_options_str = ','.join(['%s=%s' % (k, opts_vals[k])
for k, v in inspect.getmembers(self.options)
if k in opts_vals])
options_str = '%s(%s)' % (dhcp.options.__name__, _options_str)
dhcp_values = {'op': repr(self.op),
'htype': repr(self.htype),
'hlen': repr(self.hlen),
'hops': repr(self.hops),
'xid': repr(self.xid),
'secs': repr(self.secs),
'flags': repr(self.flags),
'ciaddr': repr(self.ciaddr),
'yiaddr': repr(self.yiaddr),
'siaddr': repr(self.siaddr),
'giaddr': repr(self.giaddr),
'chaddr': repr(self.chaddr),
'sname': repr(self.sname),
'boot_file': repr(self.boot_file),
'options': options_str}
_dh_str = ','.join(['%s=%s' % (k, dhcp_values[k])
for k, v in inspect.getmembers(self.dh)
if k in dhcp_values])
dh_str = '%s(%s)' % (dhcp.dhcp.__name__, _dh_str)
eq_(str(self.dh), dh_str)
eq_(repr(self.dh), dh_str)

View File

@ -0,0 +1,67 @@
# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import inspect
import logging
from nose.tools import *
from nose.plugins.skip import Skip, SkipTest
from ryu.lib.packet import icmp
LOG = logging.getLogger(__name__)
class Test_icmp_dest_unreach(unittest.TestCase):
type_ = icmp.ICMP_DEST_UNREACH
code = icmp.ICMP_HOST_UNREACH_CODE
csum = 0
mtu = 10
data = 'abc'
data_len = len(data)
dst_unreach = icmp.dest_unreach(data_len=data_len, mtu=mtu, data=data)
ic = icmp.icmp(type_, code, csum, data=dst_unreach)
def setUp(self):
pass
def tearDown(self):
pass
def test_to_string(self):
data_values = {'data': self.data,
'data_len': self.data_len,
'mtu': self.mtu}
_data_str = ','.join(['%s=%s' % (k, repr(data_values[k]))
for k, v in inspect.getmembers(self.dst_unreach)
if k in data_values])
data_str = '%s(%s)' % (icmp.dest_unreach.__name__, _data_str)
icmp_values = {'type': repr(self.type_),
'code': repr(self.code),
'csum': repr(self.csum),
'data': data_str}
_ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
for k, v in inspect.getmembers(self.ic)
if k in icmp_values])
ic_str = '%s(%s)' % (icmp.icmp.__name__, _ic_str)
eq_(str(self.ic), ic_str)
eq_(repr(self.ic), ic_str)

View File

@ -18,6 +18,7 @@
import unittest
import logging
import struct
import inspect
from nose.tools import ok_, eq_, nottest, raises
from nose.plugins.skip import Skip, SkipTest
@ -157,6 +158,30 @@ class Test_icmpv6_echo_request(unittest.TestCase):
def test_serialize_with_data(self):
self._test_serialize(self.data)
def test_to_string(self):
ec = icmpv6.echo(self.id_, self.seq, self.data)
ic = icmpv6.icmpv6(self.type_, self.code, self.csum, ec)
echo_values = {'id': self.id_,
'seq': self.seq,
'data': self.data}
_echo_str = ','.join(['%s=%s' % (k, repr(echo_values[k]))
for k, v in inspect.getmembers(ec)
if k in echo_values])
echo_str = '%s(%s)' % (icmpv6.echo.__name__, _echo_str)
icmp_values = {'type_': repr(self.type_),
'code': repr(self.code),
'csum': repr(self.csum),
'data': echo_str}
_ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
for k, v in inspect.getmembers(ic)
if k in icmp_values])
ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
eq_(str(ic), ic_str)
eq_(repr(ic), ic_str)
class Test_icmpv6_echo_reply(Test_icmpv6_echo_request):
def setUp(self):
@ -263,6 +288,41 @@ class Test_icmpv6_neighbor_solict(unittest.TestCase):
eq_(nd_length, self.nd_length)
eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
def test_to_string(self):
nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
nd = icmpv6.nd_neighbor(
self.res, self.dst, self.nd_type, self.nd_length, nd_opt)
ic = icmpv6.icmpv6(self.type_, self.code, self.csum, nd)
nd_opt_values = {'hw_src': self.nd_hw_src,
'data': None}
_nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
for k, v in inspect.getmembers(nd_opt)
if k in nd_opt_values])
nd_opt_str = '%s(%s)' % (icmpv6.nd_option_la.__name__, _nd_opt_str)
nd_values = {'res': repr(nd.res),
'dst': repr(self.dst),
'type_': repr(self.nd_type),
'length': repr(self.nd_length),
'data': nd_opt_str}
_nd_str = ','.join(['%s=%s' % (k, nd_values[k])
for k, v in inspect.getmembers(nd)
if k in nd_values])
nd_str = '%s(%s)' % (icmpv6.nd_neighbor.__name__, _nd_str)
icmp_values = {'type_': repr(self.type_),
'code': repr(self.code),
'csum': repr(self.csum),
'data': nd_str}
_ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
for k, v in inspect.getmembers(ic)
if k in icmp_values])
ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
eq_(str(ic), ic_str)
eq_(repr(ic), ic_str)
class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solict):
def setUp(self):
@ -339,3 +399,18 @@ class Test_icmpv6_router_solict(unittest.TestCase):
def test_serialize_with_data(self):
self._test_serialize(self.data)
def test_to_string(self):
ic = icmpv6.icmpv6(self.type_, self.code, self.csum, self.data)
icmp_values = {'type_': self.type_,
'code': self.code,
'csum': self.csum,
'data': self.data}
_ic_str = ','.join(['%s=%s' % (k, repr(icmp_values[k]))
for k, v in inspect.getmembers(ic)
if k in icmp_values])
ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
eq_(str(ic), ic_str)
eq_(repr(ic), ic_str)

View File

@ -0,0 +1,65 @@
# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import logging
import inspect
from nose.tools import *
from nose.plugins.skip import Skip, SkipTest
from ryu.lib import ip
from ryu.lib.packet import ipv6
LOG = logging.getLogger(__name__)
class Test_ipv6(unittest.TestCase):
version = 6
traffic_class = 0
flow_label = 0
payload_length = 817
nxt = 6
hop_limit = 128
src = ip.ipv6_to_bin('2002:4637:d5d3::4637:d5d3')
dst = ip.ipv6_to_bin('2001:4860:0:2001::68')
ip = ipv6.ipv6(version, traffic_class, flow_label, payload_length,
nxt, hop_limit, src, dst)
def setUp(self):
pass
def tearDown(self):
pass
def test_to_string(self):
ipv6_values = {'version': self.version,
'traffic_class': self.traffic_class,
'flow_label': self.flow_label,
'payload_length': self.payload_length,
'nxt': self.nxt,
'hop_limit': self.hop_limit,
'src': self.src,
'dst': self.dst}
_ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
for k, v in inspect.getmembers(self.ip)
if k in ipv6_values])
ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
eq_(str(self.ip), ipv6_str)
eq_(repr(self.ip), ipv6_str)

View File

@ -18,6 +18,7 @@
import unittest
import logging
import struct
import inspect
from nose.tools import ok_, eq_, nottest
from ryu.ofproto import ether
@ -120,6 +121,62 @@ class TestLLDPMandatoryTLV(unittest.TestCase):
pkt.serialize()
eq_(pkt.data, self.data)
def test_to_string(self):
chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS,
chassis_id='\x00\x04\x96\x1f\xa7\x26')
port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME,
port_id='1/3')
ttl = lldp.TTL(ttl=120)
end = lldp.End()
tlvs = (chassis_id, port_id, ttl, end)
lldp_pkt = lldp.lldp(tlvs)
chassis_id_values = {'subtype': lldp.ChassisID.SUB_MAC_ADDRESS,
'chassis_id': '\x00\x04\x96\x1f\xa7\x26',
'len': chassis_id.len,
'typelen': chassis_id.typelen}
_ch_id_str = ','.join(['%s=%s' % (k, repr(chassis_id_values[k]))
for k, v in inspect.getmembers(chassis_id)
if k in chassis_id_values])
tlv_chassis_id_str = '%s(%s)' % (lldp.ChassisID.__name__, _ch_id_str)
port_id_values = {'subtype': port_id.subtype,
'port_id': port_id.port_id,
'len': port_id.len,
'typelen': port_id.typelen}
_port_id_str = ','.join(['%s=%s' % (k, repr(port_id_values[k]))
for k, v in inspect.getmembers(port_id)
if k in port_id_values])
tlv_port_id_str = '%s(%s)' % (lldp.PortID.__name__, _port_id_str)
ttl_values = {'ttl': ttl.ttl,
'len': ttl.len,
'typelen': ttl.typelen}
_ttl_str = ','.join(['%s=%s' % (k, repr(ttl_values[k]))
for k, v in inspect.getmembers(ttl)
if k in ttl_values])
tlv_ttl_str = '%s(%s)' % (lldp.TTL.__name__, _ttl_str)
end_values = {'len': end.len,
'typelen': end.typelen}
_end_str = ','.join(['%s=%s' % (k, repr(end_values[k]))
for k, v in inspect.getmembers(end)
if k in end_values])
tlv_end_str = '%s(%s)' % (lldp.End.__name__, _end_str)
_tlvs_str = '(%s, %s, %s, %s)'
tlvs_str = _tlvs_str % (tlv_chassis_id_str,
tlv_port_id_str,
tlv_ttl_str,
tlv_end_str)
_lldp_str = '%s(tlvs=%s)'
lldp_str = _lldp_str % (lldp.lldp.__name__,
tlvs_str)
eq_(str(lldp_pkt), lldp_str)
eq_(repr(lldp_pkt), lldp_str)
class TestLLDPOptionalTLV(unittest.TestCase):
def setUp(self):
@ -260,3 +317,158 @@ class TestLLDPOptionalTLV(unittest.TestCase):
# self.data has many organizationally specific TLVs
data = str(pkt.data[:-2])
eq_(data, self.data[:len(data)])
def test_to_string(self):
chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS,
chassis_id='\x00\x01\x30\xf9\xad\xa0')
port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME,
port_id='1/1')
ttl = lldp.TTL(ttl=120)
port_desc = lldp.PortDescription(
port_description='Summit300-48-Port 1001\x00')
sys_name = lldp.SystemName(system_name='Summit300-48\x00')
sys_desc = lldp.SystemDescription(
system_description='Summit300-48 - Version 7.4e.1 (Build 5) '
+ 'by Release_Master 05/27/05 04:53:11\x00')
sys_cap = lldp.SystemCapabilities(
subtype=lldp.ChassisID.SUB_CHASSIS_COMPONENT,
system_cap=0x14,
enabled_cap=0x14)
man_addr = lldp.ManagementAddress(
addr_subtype=0x06, addr='\x00\x01\x30\xf9\xad\xa0',
intf_subtype=0x02, intf_num=1001,
oid='')
org_spec = lldp.OrganizationallySpecific(
oui='\x00\x12\x0f', subtype=0x02, info='\x07\x01\x00')
end = lldp.End()
tlvs = (chassis_id, port_id, ttl, port_desc, sys_name,
sys_desc, sys_cap, man_addr, org_spec, end)
lldp_pkt = lldp.lldp(tlvs)
# ChassisID string
chassis_id_values = {'subtype': lldp.ChassisID.SUB_MAC_ADDRESS,
'chassis_id': '\x00\x01\x30\xf9\xad\xa0',
'len': chassis_id.len,
'typelen': chassis_id.typelen}
_ch_id_str = ','.join(['%s=%s' % (k, repr(chassis_id_values[k]))
for k, v in inspect.getmembers(chassis_id)
if k in chassis_id_values])
tlv_chassis_id_str = '%s(%s)' % (lldp.ChassisID.__name__, _ch_id_str)
# PortID string
port_id_values = {'subtype': port_id.subtype,
'port_id': port_id.port_id,
'len': port_id.len,
'typelen': port_id.typelen}
_port_id_str = ','.join(['%s=%s' % (k, repr(port_id_values[k]))
for k, v in inspect.getmembers(port_id)
if k in port_id_values])
tlv_port_id_str = '%s(%s)' % (lldp.PortID.__name__, _port_id_str)
# TTL string
ttl_values = {'ttl': ttl.ttl,
'len': ttl.len,
'typelen': ttl.typelen}
_ttl_str = ','.join(['%s=%s' % (k, repr(ttl_values[k]))
for k, v in inspect.getmembers(ttl)
if k in ttl_values])
tlv_ttl_str = '%s(%s)' % (lldp.TTL.__name__, _ttl_str)
# PortDescription string
port_desc_values = {'tlv_info': port_desc.tlv_info,
'len': port_desc.len,
'typelen': port_desc.typelen}
_port_desc_str = ','.join(['%s=%s' % (k, repr(port_desc_values[k]))
for k, v in inspect.getmembers(port_desc)
if k in port_desc_values])
tlv_port_desc_str = '%s(%s)' % (lldp.PortDescription.__name__,
_port_desc_str)
# SystemName string
sys_name_values = {'tlv_info': sys_name.tlv_info,
'len': sys_name.len,
'typelen': sys_name.typelen}
_system_name_str = ','.join(['%s=%s' % (k, repr(sys_name_values[k]))
for k, v in inspect.getmembers(sys_name)
if k in sys_name_values])
tlv_system_name_str = '%s(%s)' % (lldp.SystemName.__name__,
_system_name_str)
# SystemDescription string
sys_desc_values = {'tlv_info': sys_desc.tlv_info,
'len': sys_desc.len,
'typelen': sys_desc.typelen}
_sys_desc_str = ','.join(['%s=%s' % (k, repr(sys_desc_values[k]))
for k, v in inspect.getmembers(sys_desc)
if k in sys_desc_values])
tlv_sys_desc_str = '%s(%s)' % (lldp.SystemDescription.__name__,
_sys_desc_str)
# SystemCapabilities string
sys_cap_values = {'subtype': lldp.ChassisID.SUB_CHASSIS_COMPONENT,
'system_cap': 0x14,
'enabled_cap': 0x14,
'len': sys_cap.len,
'typelen': sys_cap.typelen}
_sys_cap_str = ','.join(['%s=%s' % (k, repr(sys_cap_values[k]))
for k, v in inspect.getmembers(sys_cap)
if k in sys_cap_values])
tlv_sys_cap_str = '%s(%s)' % (lldp.SystemCapabilities.__name__,
_sys_cap_str)
# ManagementAddress string
man_addr_values = {'addr_subtype': 0x06,
'addr': '\x00\x01\x30\xf9\xad\xa0',
'addr_len': man_addr.addr_len,
'intf_subtype': 0x02,
'intf_num': 1001,
'oid': '',
'oid_len': man_addr.oid_len,
'len': man_addr.len,
'typelen': man_addr.typelen}
_man_addr_str = ','.join(['%s=%s' % (k, repr(man_addr_values[k]))
for k, v in inspect.getmembers(man_addr)
if k in man_addr_values])
tlv_man_addr_str = '%s(%s)' % (lldp.ManagementAddress.__name__,
_man_addr_str)
# OrganizationallySpecific string
org_spec_values = {'oui': '\x00\x12\x0f',
'subtype': 0x02,
'info': '\x07\x01\x00',
'len': org_spec.len,
'typelen': org_spec.typelen}
_org_spec_str = ','.join(['%s=%s' % (k, repr(org_spec_values[k]))
for k, v in inspect.getmembers(org_spec)
if k in org_spec_values])
tlv_org_spec_str = '%s(%s)' % (lldp.OrganizationallySpecific.__name__,
_org_spec_str)
# End string
end_values = {'len': end.len,
'typelen': end.typelen}
_end_str = ','.join(['%s=%s' % (k, repr(end_values[k]))
for k, v in inspect.getmembers(end)
if k in end_values])
tlv_end_str = '%s(%s)' % (lldp.End.__name__, _end_str)
# tlvs string
_tlvs_str = '(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'
tlvs_str = _tlvs_str % (tlv_chassis_id_str,
tlv_port_id_str,
tlv_ttl_str,
tlv_port_desc_str,
tlv_system_name_str,
tlv_sys_desc_str,
tlv_sys_cap_str,
tlv_man_addr_str,
tlv_org_spec_str,
tlv_end_str)
# lldp string
_lldp_str = '%s(tlvs=%s)'
lldp_str = _lldp_str % (lldp.lldp.__name__,
tlvs_str)
eq_(str(lldp_pkt), lldp_str)
eq_(repr(lldp_pkt), lldp_str)

View File

@ -0,0 +1,54 @@
# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import logging
import inspect
from nose.tools import *
from nose.plugins.skip import Skip, SkipTest
from ryu.lib.packet import mpls
LOG = logging.getLogger(__name__)
class Test_mpls(unittest.TestCase):
label = 29
exp = 6
bsb = 1
ttl = 64
mp = mpls.mpls(label, exp, bsb, ttl)
def setUp(self):
pass
def tearDown(self):
pass
def test_to_string(self):
mpls_values = {'label': self.label,
'exp': self.exp,
'bsb': self.bsb,
'ttl': self.ttl}
_mpls_str = ','.join(['%s=%s' % (k, repr(mpls_values[k]))
for k, v in inspect.getmembers(self.mp)
if k in mpls_values])
mpls_str = '%s(%s)' % (mpls.mpls.__name__, _mpls_str)
eq_(str(self.mp), mpls_str)
eq_(repr(self.mp), mpls_str)

View File

@ -19,6 +19,7 @@ import unittest
import logging
import struct
import array
import inspect
from nose.tools import *
from nose.plugins.skip import Skip, SkipTest
from ryu.ofproto import ether, inet
@ -116,6 +117,40 @@ class TestPacket(unittest.TestCase):
eq_(self.dst_mac, p_arp.dst_mac)
eq_(self.dst_ip, p_arp.dst_ip)
# to string
eth_values = {'dst': self.dst_mac,
'src': self.src_mac,
'ethertype': ether.ETH_TYPE_ARP}
_eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
for k, v in inspect.getmembers(p_eth)
if k in eth_values])
eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
arp_values = {'hwtype': 1,
'proto': ether.ETH_TYPE_IP,
'hlen': 6,
'plen': 4,
'opcode': 2,
'src_mac': self.src_mac,
'dst_mac': self.dst_mac,
'src_ip': self.src_ip,
'dst_ip': self.dst_ip}
_arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k]))
for k, v in inspect.getmembers(p_arp)
if k in arp_values])
arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str)
pkt_str = '%s, %s' % (eth_str, arp_str)
eq_(eth_str, str(p_eth))
eq_(eth_str, repr(p_eth))
eq_(arp_str, str(p_arp))
eq_(arp_str, repr(p_arp))
eq_(pkt_str, str(pkt))
eq_(pkt_str, repr(pkt))
def test_vlan_arp(self):
# buid packet
e = ethernet.ethernet(self.dst_mac, self.src_mac,
@ -185,6 +220,52 @@ class TestPacket(unittest.TestCase):
eq_(self.dst_mac, p_arp.dst_mac)
eq_(self.dst_ip, p_arp.dst_ip)
# to string
eth_values = {'dst': self.dst_mac,
'src': self.src_mac,
'ethertype': ether.ETH_TYPE_8021Q}
_eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
for k, v in inspect.getmembers(p_eth)
if k in eth_values])
eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
vlan_values = {'pcp': 0b111,
'cfi': 0b1,
'vid': 3,
'ethertype': ether.ETH_TYPE_ARP}
_vlan_str = ','.join(['%s=%s' % (k, repr(vlan_values[k]))
for k, v in inspect.getmembers(p_vlan)
if k in vlan_values])
vlan_str = '%s(%s)' % (vlan.vlan.__name__, _vlan_str)
arp_values = {'hwtype': 1,
'proto': ether.ETH_TYPE_IP,
'hlen': 6,
'plen': 4,
'opcode': 2,
'src_mac': self.src_mac,
'dst_mac': self.dst_mac,
'src_ip': self.src_ip,
'dst_ip': self.dst_ip}
_arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k]))
for k, v in inspect.getmembers(p_arp)
if k in arp_values])
arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str)
pkt_str = '%s, %s, %s' % (eth_str, vlan_str, arp_str)
eq_(eth_str, str(p_eth))
eq_(eth_str, repr(p_eth))
eq_(vlan_str, str(p_vlan))
eq_(vlan_str, repr(p_vlan))
eq_(arp_str, str(p_arp))
eq_(arp_str, repr(p_arp))
eq_(pkt_str, str(pkt))
eq_(pkt_str, repr(pkt))
def test_ipv4_udp(self):
# buid packet
e = ethernet.ethernet(self.dst_mac, self.src_mac,
@ -272,6 +353,57 @@ class TestPacket(unittest.TestCase):
ok_('payload' in protocols)
eq_(self.payload, protocols['payload'].tostring())
# to string
eth_values = {'dst': self.dst_mac,
'src': self.src_mac,
'ethertype': ether.ETH_TYPE_IP}
_eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
for k, v in inspect.getmembers(p_eth)
if k in eth_values])
eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
ipv4_values = {'version': 4,
'header_length': 5,
'tos': 1,
'total_length': l,
'identification': 3,
'flags': 1,
'offset': p_ipv4.offset,
'ttl': 64,
'proto': inet.IPPROTO_UDP,
'csum': p_ipv4.csum,
'src': self.src_ip,
'dst': self.dst_ip,
'option': None}
_ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
for k, v in inspect.getmembers(p_ipv4)
if k in ipv4_values])
ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
udp_values = {'src_port': 0x190f,
'dst_port': 0x1F90,
'total_length': len(u_buf) + len(self.payload),
'csum': 0x77b2}
_udp_str = ','.join(['%s=%s' % (k, repr(udp_values[k]))
for k, v in inspect.getmembers(p_udp)
if k in udp_values])
udp_str = '%s(%s)' % (udp.udp.__name__, _udp_str)
pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, udp_str,
repr(protocols['payload']))
eq_(eth_str, str(p_eth))
eq_(eth_str, repr(p_eth))
eq_(ipv4_str, str(p_ipv4))
eq_(ipv4_str, repr(p_ipv4))
eq_(udp_str, str(p_udp))
eq_(udp_str, repr(p_udp))
eq_(pkt_str, str(pkt))
eq_(pkt_str, repr(pkt))
def test_ipv4_tcp(self):
# buid packet
e = ethernet.ethernet(self.dst_mac, self.src_mac,
@ -371,6 +503,63 @@ class TestPacket(unittest.TestCase):
ok_('payload' in protocols)
eq_(self.payload, protocols['payload'].tostring())
# to string
eth_values = {'dst': self.dst_mac,
'src': self.src_mac,
'ethertype': ether.ETH_TYPE_IP}
_eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
for k, v in inspect.getmembers(p_eth)
if k in eth_values])
eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
ipv4_values = {'version': 4,
'header_length': 5,
'tos': 0,
'total_length': l,
'identification': 0,
'flags': 0,
'offset': p_ipv4.offset,
'ttl': 64,
'proto': inet.IPPROTO_TCP,
'csum': p_ipv4.csum,
'src': self.src_ip,
'dst': self.dst_ip,
'option': None}
_ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
for k, v in inspect.getmembers(p_ipv4)
if k in ipv4_values])
ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
tcp_values = {'src_port': 0x190f,
'dst_port': 0x1F90,
'seq': 0x123,
'ack': 1,
'offset': 6,
'bits': 0b101010,
'window_size': 2048,
'csum': p_tcp.csum,
'urgent': 0x6f,
'option': p_tcp.option}
_tcp_str = ','.join(['%s=%s' % (k, repr(tcp_values[k]))
for k, v in inspect.getmembers(p_tcp)
if k in tcp_values])
tcp_str = '%s(%s)' % (tcp.tcp.__name__, _tcp_str)
pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, tcp_str,
repr(protocols['payload']))
eq_(eth_str, str(p_eth))
eq_(eth_str, repr(p_eth))
eq_(ipv4_str, str(p_ipv4))
eq_(ipv4_str, repr(p_ipv4))
eq_(tcp_str, str(p_tcp))
eq_(tcp_str, repr(p_tcp))
eq_(pkt_str, str(pkt))
eq_(pkt_str, repr(pkt))
def test_llc_bpdu(self):
# buid packet
e = ethernet.ethernet(self.dst_mac, self.src_mac,
@ -462,3 +651,64 @@ class TestPacket(unittest.TestCase):
eq_(20, p_bpdu.max_age)
eq_(2, p_bpdu.hello_time)
eq_(15, p_bpdu.forward_delay)
# to string
eth_values = {'dst': self.dst_mac,
'src': self.src_mac,
'ethertype': ether.ETH_TYPE_IEEE802_3}
_eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
for k, v in inspect.getmembers(p_eth)
if k in eth_values])
eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
ctrl_values = {'modifier_function1': 0,
'pf_bit': 0,
'modifier_function2': 0}
_ctrl_str = ','.join(['%s=%s' % (k, repr(ctrl_values[k]))
for k, v in inspect.getmembers(p_llc.control)
if k in ctrl_values])
ctrl_str = '%s(%s)' % (llc.ControlFormatU.__name__, _ctrl_str)
llc_values = {'dsap_addr': repr(llc.SAP_BDPU),
'ssap_addr': repr(llc.SAP_BDPU),
'control': ctrl_str}
_llc_str = ','.join(['%s=%s' % (k, llc_values[k])
for k, v in inspect.getmembers(p_llc)
if k in llc_values])
llc_str = '%s(%s)' % (llc.llc.__name__, _llc_str)
bpdu_values = {'protocol_id': bpdu.PROTOCOL_IDENTIFIER,
'version_id': bpdu.PROTOCOLVERSION_ID_BPDU,
'bpdu_type': bpdu.TYPE_CONFIG_BPDU,
'flags': 0,
'root_priority': long(32768),
'root_system_id_extension': long(0),
'root_mac_address': self.src_mac,
'root_path_cost': 0,
'bridge_priority': long(32768),
'bridge_system_id_extension': long(0),
'bridge_mac_address': self.dst_mac,
'port_priority': 128,
'port_number': 4,
'message_age': float(1),
'max_age': float(20),
'hello_time': float(2),
'forward_delay': float(15)}
_bpdu_str = ','.join(['%s=%s' % (k, repr(bpdu_values[k]))
for k, v in inspect.getmembers(p_bpdu)
if k in bpdu_values])
bpdu_str = '%s(%s)' % (bpdu.ConfigurationBPDUs.__name__, _bpdu_str)
pkt_str = '%s, %s, %s' % (eth_str, llc_str, bpdu_str)
eq_(eth_str, str(p_eth))
eq_(eth_str, repr(p_eth))
eq_(llc_str, str(p_llc))
eq_(llc_str, repr(p_llc))
eq_(bpdu_str, str(p_bpdu))
eq_(bpdu_str, repr(p_bpdu))
eq_(pkt_str, str(pkt))
eq_(pkt_str, repr(pkt))

View File

@ -19,6 +19,7 @@
import unittest
import logging
import struct
import inspect
from nose.tools import eq_, ok_
from nose.tools import raises
@ -183,6 +184,26 @@ class Test_vrrpv2(unittest.TestCase):
max_adver_int = vrrp.VRRP_V2_MAX_ADVER_INT_MAX + 1
ok_(not self._test_is_valid(max_adver_int=max_adver_int))
def test_to_string(self):
vrrpv2_values = {'version': self.version,
'type': self.type_,
'vrid': self.vrid,
'priority': self.priority,
'count_ip': self.count_ip,
'max_adver_int': self.max_adver_int,
'checksum': self.vrrpv2.checksum,
'ip_addresses': [self.ip_address],
'auth_type': self.auth_type,
'auth_data': self.auth_data,
'identification': self.vrrpv2.identification}
_vrrpv2_str = ','.join(['%s=%s' % (k, repr(vrrpv2_values[k]))
for k, v in inspect.getmembers(self.vrrpv2)
if k in vrrpv2_values])
vrrpv2_str = '%s(%s)' % (vrrp.vrrpv2.__name__, _vrrpv2_str)
eq_(str(self.vrrpv2), vrrpv2_str)
eq_(repr(self.vrrpv2), vrrpv2_str)
class Test_vrrpv3_ipv4(unittest.TestCase):
""" Test case for vrrp v3 IPv4
@ -328,6 +349,26 @@ class Test_vrrpv3_ipv4(unittest.TestCase):
max_adver_int = vrrp.VRRP_V3_MAX_ADVER_INT_MAX + 1
ok_(not self._test_is_valid(max_adver_int=max_adver_int))
def test_to_string(self):
vrrpv3_values = {'version': self.version,
'type': self.type_,
'vrid': self.vrid,
'priority': self.priority,
'count_ip': self.count_ip,
'max_adver_int': self.max_adver_int,
'checksum': self.vrrpv3.checksum,
'ip_addresses': [self.ip_address],
'auth_type': None,
'auth_data': None,
'identification': self.vrrpv3.identification}
_vrrpv3_str = ','.join(['%s=%s' % (k, repr(vrrpv3_values[k]))
for k, v in inspect.getmembers(self.vrrpv3)
if k in vrrpv3_values])
vrrpv3_str = '%s(%s)' % (vrrp.vrrpv3.__name__, _vrrpv3_str)
eq_(str(self.vrrpv3), vrrpv3_str)
eq_(repr(self.vrrpv3), vrrpv3_str)
class Test_vrrpv3_ipv6(unittest.TestCase):
""" Test case for vrrp v3 IPv6
@ -430,3 +471,23 @@ class Test_vrrpv3_ipv6(unittest.TestCase):
print(len(p0.data), p0.data)
print(len(p1.data), p1.data)
eq_(p0.data, p1.data)
def test_to_string(self):
vrrpv3_values = {'version': self.version,
'type': self.type_,
'vrid': self.vrid,
'priority': self.priority,
'count_ip': self.count_ip,
'max_adver_int': self.max_adver_int,
'checksum': self.vrrpv3.checksum,
'ip_addresses': [self.ip_address],
'auth_type': None,
'auth_data': None,
'identification': self.vrrpv3.identification}
_vrrpv3_str = ','.join(['%s=%s' % (k, repr(vrrpv3_values[k]))
for k, v in inspect.getmembers(self.vrrpv3)
if k in vrrpv3_values])
vrrpv3_str = '%s(%s)' % (vrrp.vrrpv3.__name__, _vrrpv3_str)
eq_(str(self.vrrpv3), vrrpv3_str)
eq_(repr(self.vrrpv3), vrrpv3_str)