test_pcaplib: Add unit tests for pcaplib

Signed-off-by: IWASE Yusuke <iwase.yusuke0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
IWASE Yusuke 2016-05-30 15:24:43 +09:00 committed by FUJITA Tomonori
parent 6643bae823
commit ad481c7de8
3 changed files with 230 additions and 0 deletions

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,230 @@
# Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import logging
import os
import struct
import sys
import unittest
try:
import mock # Python 2
except ImportError:
from unittest import mock # Python 3
from nose.tools import eq_
from nose.tools import raises
from ryu.utils import binary_str
from ryu.lib import pcaplib
LOG = logging.getLogger(__name__)
PCAP_PACKET_DATA_DIR = os.path.join(
os.path.dirname(sys.modules[__name__].__file__),
'../../packet_data/pcap/')
class Test_PcapFileHdr(unittest.TestCase):
"""
Test case for pcaplib.PcapFileHdr class
"""
hdr = pcaplib.PcapFileHdr(
magic=None, # temporary default
version_major=2,
version_minor=4,
thiszone=0x11223344,
sigfigs=0x22334455,
snaplen=0x33445566,
network=0x44556677,
)
buf_big = (
b'\xa1\xb2\xc3\xd4' # magic (Big Endian)
b'\x00\x02\x00\x04' # version_major, version_minor
b'\x11\x22\x33\x44' # thiszone
b'\x22\x33\x44\x55' # sigfigs
b'\x33\x44\x55\x66' # snaplen
b'\x44\x55\x66\x77' # network
)
buf_little = (
b'\xd4\xc3\xb2\xa1' # magic (Little Endian)
b'\x02\x00\x04\x00' # version_major, version_minor
b'\x44\x33\x22\x11' # thiszone
b'\x55\x44\x33\x22' # sigfigs
b'\x66\x55\x44\x33' # snaplen
b'\x77\x66\x55\x44' # network
)
buf_invalid = (
b'\xff\xff\xff\xff' # magic (Invalid)
b'\x02\x00\x04\x00' # version_major, version_minor
b'\x44\x33\x22\x11' # thiszone
b'\x55\x44\x33\x22' # sigfigs
b'\x66\x55\x44\x33' # snaplen
b'\x77\x66\x55\x44' # network
)
def _assert(self, magic, ret):
self.hdr.magic = magic
eq_(self.hdr.__dict__, ret.__dict__)
def test_parser_with_big_endian(self):
ret, byteorder = pcaplib.PcapFileHdr.parser(self.buf_big)
self._assert(pcaplib.PcapFileHdr.MAGIC_NUMBER_IDENTICAL, ret)
eq_('big', byteorder)
def test_parser_with_little_endian(self):
ret, byteorder = pcaplib.PcapFileHdr.parser(self.buf_little)
self._assert(pcaplib.PcapFileHdr.MAGIC_NUMBER_SWAPPED, ret)
eq_('little', byteorder)
@mock.patch('sys.byteorder', 'big')
def test_serialize_with_big_endian(self):
buf = self.hdr.serialize()
eq_(binary_str(self.buf_big), binary_str(buf))
@mock.patch('sys.byteorder', 'little')
def test_serialize_with_little_endian(self):
buf = self.hdr.serialize()
eq_(binary_str(self.buf_little), binary_str(buf))
@raises(struct.error)
def test_parser_with_invalid_magic_number(self):
pcaplib.PcapFileHdr.parser(self.buf_invalid)
class Test_PcapPktHdr(unittest.TestCase):
"""
Test case for pcaplib.PcapPktHdr class
"""
expected_buf = b'test_data'
hdr = pcaplib.PcapPktHdr(
ts_sec=0x11223344,
ts_usec=0x22334455,
incl_len=len(expected_buf),
orig_len=0x44556677,
)
buf_big = (
b'\x11\x22\x33\x44' # ts_sec
b'\x22\x33\x44\x55' # ts_usec
b'\x00\x00\x00\x09' # incl_len = len(expected_buf)
b'\x44\x55\x66\x77' # orig_len
)
buf_little = (
b'\x44\x33\x22\x11' # ts_sec
b'\x55\x44\x33\x22' # ts_usec
b'\x09\x00\x00\x00' # incl_len = len(expected_buf)
b'\x77\x66\x55\x44' # orig_len
)
def test_parser_with_big_endian(self):
ret, buf = pcaplib.PcapPktHdr.parser(
self.buf_big + self.expected_buf, 'big')
eq_(self.hdr.__dict__, ret.__dict__)
eq_(self.expected_buf, buf)
def test_parser_with_little_endian(self):
ret, buf = pcaplib.PcapPktHdr.parser(
self.buf_little + self.expected_buf, 'little')
eq_(self.hdr.__dict__, ret.__dict__)
eq_(self.expected_buf, buf)
@mock.patch('sys.byteorder', 'big')
def test_serialize_with_big_endian(self):
buf = self.hdr.serialize()
eq_(binary_str(self.buf_big), binary_str(buf))
@mock.patch('sys.byteorder', 'little')
def test_serialize_with_little_endian(self):
buf = self.hdr.serialize()
eq_(binary_str(self.buf_little), binary_str(buf))
class Test_pcaplib_Reader(unittest.TestCase):
"""
Test case for pcaplib.Reader class
"""
expected_outputs = [
(0x1234 + (0x5678 / 1e6), b'test_data_1'), # sec=0x1234, usec=0x5678
(0x2345 + (0x6789 / 1e6), b'test_data_2'), # sec=0x2345, usec=0x6789
]
def _test(self, file_name):
outputs = []
for ts, buf in pcaplib.Reader(open(file_name, 'rb')):
outputs.append((ts, buf))
eq_(self.expected_outputs, outputs)
def test_with_big_endian(self):
self._test(os.path.join(PCAP_PACKET_DATA_DIR, 'big_endian.pcap'))
def test_with_little_endian(self):
self._test(os.path.join(PCAP_PACKET_DATA_DIR, 'little_endian.pcap'))
class DummyFile(object):
def __init__(self):
self.buf = b''
def write(self, buf):
self.buf += buf
def close(self):
pass
class Test_pcaplib_Writer(unittest.TestCase):
"""
Test case for pcaplib.Writer class
"""
@staticmethod
def _test(file_name):
expected_buf = open(file_name, 'rb').read()
f = DummyFile()
w = pcaplib.Writer(f)
w.write_pkt(b'test_data_1', ts=(0x1234 + (0x5678 / 1e6)))
w.write_pkt(b'test_data_2', ts=(0x2345 + (0x6789 / 1e6)))
eq_(expected_buf, f.buf)
@mock.patch('sys.byteorder', 'big')
def test_with_big_endian(self):
self._test(os.path.join(PCAP_PACKET_DATA_DIR, 'big_endian.pcap'))
@mock.patch('sys.byteorder', 'little')
def test_with_little_endian(self):
self._test(os.path.join(PCAP_PACKET_DATA_DIR, 'little_endian.pcap'))
@staticmethod
@mock.patch.object(pcaplib.Writer, '_write_pcap_file_hdr', mock.MagicMock)
@mock.patch.object(pcaplib.Writer, '_write_pkt_hdr', mock.MagicMock)
def test_with_longer_buf():
f = DummyFile()
snaplen = 4
w = pcaplib.Writer(f, snaplen=snaplen)
w.write_pkt(b'hogehoge', ts=0)
expected_buf = b'hoge' # b'hogehoge'[:snaplen]
eq_(expected_buf, f.buf)
eq_(snaplen, len(f.buf))