packet.bgp: parse/serialize capability options

Signed-off-by: YAMAMOTO Takashi <yamamoto@valinux.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
YAMAMOTO Takashi 2013-10-08 15:29:36 +09:00 committed by FUJITA Tomonori
parent c2f5f18605
commit 496d803df3
2 changed files with 132 additions and 65 deletions

View File

@ -121,57 +121,38 @@ class _IPAddrPrefix(StringifyMixin):
return buf + bytes(bin_ip_addr)
class BGPOptParam(StringifyMixin):
_PACK_STR = '!BB' # type, length
def __init__(self, type_, value, length=None):
self.type = type_
self.length = length
self.value = value
class _Value(object):
_VALUE_PACK_STR = None
@classmethod
def parser(cls, buf):
(type_, length) = struct.unpack_from(cls._PACK_STR, buffer(buf))
rest = buf[struct.calcsize(cls._PACK_STR):]
value = bytes(rest[:length])
rest = rest[length:]
return cls(type_=type_, length=length, value=value), rest
def serialize(self):
# fixup
self.length = len(self.value)
def parse_value(cls, buf):
(value,) = struct.unpack_from(cls._VALUE_PACK_STR, buffer(buf))
return {
'value': value
}
def serialize_value(self):
buf = bytearray()
msg_pack_into(self._PACK_STR, buf, 0, self.type, self.length)
return buf + bytes(self.value)
msg_pack_into(self._VALUE_PACK_STR, buf, 0, self.value)
return buf
class BGPWithdrawnRoute(_IPAddrPrefix):
pass
class _PathAttribute(StringifyMixin):
_PACK_STR = '!BB' # flags, type
_PACK_STR_LEN = '!B' # length
_PACK_STR_EXT_LEN = '!H' # length w/ BGP_ATTR_FLAG_EXTENDED_LENGTH
class _TypeDisp(object):
_TYPES = {}
_REV_TYPES = None
_ATTR_FLAGS = None
_UNKNOWN_TYPE = None
def __init__(self, value=None, flags=0, type_=None, length=None):
if type_ is None:
if self._REV_TYPES is None:
self._REV_TYPES = dict((v, k) for k, v in
self._TYPES.iteritems())
type_ = self._REV_TYPES[self.__class__]
self.flags = flags
self.type = type_
self.length = length
if not value is None:
self.value = value
@classmethod
def register_unknown_type(cls):
def _register_type(subcls):
cls._UNKNOWN_TYPE = subcls
return subcls
return _register_type
@classmethod
def register_type(cls, type_):
cls._TYPES = cls._TYPES.copy()
def _register_type(subcls):
cls._TYPES[type_] = subcls
cls._REV_TYPES = None
@ -183,7 +164,111 @@ class _PathAttribute(StringifyMixin):
try:
return cls._TYPES[type_]
except KeyError:
return BGPPathAttributeUnknown
return cls._UNKNOWN_TYPE
@classmethod
def _rev_lookup_type(cls, targ_cls):
if cls._REV_TYPES is None:
rev = dict((v, k) for k, v in cls._TYPES.iteritems())
cls._REV_TYPES = rev
return cls._REV_TYPES[targ_cls]
class _OptParam(StringifyMixin, _TypeDisp, _Value):
_PACK_STR = '!BB' # type, length
def __init__(self, type_, value=None, length=None):
if type_ is None:
type_ = self._rev_lookup_type(self.__class__)
self.type = type_
self.length = length
if not value is None:
self.value = value
@classmethod
def parser(cls, buf):
(type_, length) = struct.unpack_from(cls._PACK_STR, buffer(buf))
rest = buf[struct.calcsize(cls._PACK_STR):]
value = bytes(rest[:length])
rest = rest[length:]
subcls = cls._lookup_type(type_)
kwargs = subcls.parse_value(value)
return subcls(type_=type_, length=length, **kwargs), rest
def serialize(self):
# fixup
value = self.serialize_value()
self.length = len(value)
buf = bytearray()
msg_pack_into(self._PACK_STR, buf, 0, self.type, self.length)
return buf + value
@_OptParam.register_unknown_type()
class BGPOptParamUnknown(_OptParam):
@classmethod
def parse_value(cls, buf):
return {
'value': buf
}
def serialize_value(self):
return self.value
@_OptParam.register_type(BGP_OPT_CAPABILITY)
class BGPOptParamCapability(_OptParam):
_CAP_HDR_PACK_STR = '!BB'
def __init__(self, cap_code, cap_value, cap_length=None,
type_=None, length=None):
super(BGPOptParamCapability, self).__init__(type_=type_, length=length)
self.cap_code = cap_code
self.cap_length = cap_length
self.cap_value = cap_value
@classmethod
def parse_value(cls, buf):
(code, length) = struct.unpack_from(cls._CAP_HDR_PACK_STR, buffer(buf))
value = buf[struct.calcsize(cls._CAP_HDR_PACK_STR):]
assert len(value) == length
kwargs = {
'cap_code': code,
'cap_length': length,
'cap_value': value,
}
return kwargs
def serialize_value(self):
# fixup
cap_value = self.cap_value
self.cap_length = len(cap_value)
buf = bytearray()
msg_pack_into(self._CAP_HDR_PACK_STR, buf, 0, self.cap_code,
self.cap_length)
return buf + cap_value
class BGPWithdrawnRoute(_IPAddrPrefix):
pass
class _PathAttribute(StringifyMixin, _TypeDisp, _Value):
_PACK_STR = '!BB' # flags, type
_PACK_STR_LEN = '!B' # length
_PACK_STR_EXT_LEN = '!H' # length w/ BGP_ATTR_FLAG_EXTENDED_LENGTH
_ATTR_FLAGS = None
def __init__(self, value=None, flags=0, type_=None, length=None):
if type_ is None:
type_ = self._rev_lookup_type(self.__class__)
self.flags = flags
self.type = type_
self.length = length
if not value is None:
self.value = value
@classmethod
def parser(cls, buf):
@ -201,13 +286,6 @@ class _PathAttribute(StringifyMixin):
return subcls(flags=flags, type_=type_, length=length,
**subcls.parse_value(value)), rest
@classmethod
def parse_value(cls, buf):
(value,) = struct.unpack_from(cls._VALUE_PACK_STR, buffer(buf))
return {
'value': value
}
def serialize(self):
# fixup
if not self._ATTR_FLAGS is None:
@ -228,12 +306,8 @@ class _PathAttribute(StringifyMixin):
msg_pack_into(len_pack_str, buf, len(buf), self.length)
return buf + value
def serialize_value(self):
buf = bytearray()
msg_pack_into(self._VALUE_PACK_STR, buf, 0, self.value)
return buf
@_PathAttribute.register_unknown_type()
class BGPPathAttributeUnknown(_PathAttribute):
@classmethod
def parse_value(cls, buf):
@ -404,7 +478,7 @@ class BGPNLRI(_IPAddrPrefix):
pass
class BGPMessage(packet_base.PacketBase):
class BGPMessage(packet_base.PacketBase, _TypeDisp):
"""Base class for BGP-4 messages.
An instance has the following attributes at least.
@ -423,14 +497,6 @@ class BGPMessage(packet_base.PacketBase):
_HDR_PACK_STR = '!16sHB' # marker, len, type
_HDR_LEN = struct.calcsize(_HDR_PACK_STR)
_TYPES = {}
@classmethod
def register_type(cls, type_):
def _register_type(subcls):
cls._TYPES[type_] = subcls
return subcls
return _register_type
def __init__(self, type_, len_=None, marker=None):
if marker is None:
@ -453,7 +519,7 @@ class BGPMessage(packet_base.PacketBase):
'%d < %d' % (len(buf), msglen))
binmsg = buf[cls._HDR_LEN:msglen]
rest = buf[msglen:]
subcls = cls._TYPES[type_]
subcls = cls._lookup_type(type_)
kwargs = subcls.parser(binmsg)
return subcls(marker=marker, len_=len_, type_=type_, **kwargs), rest
@ -525,7 +591,7 @@ class BGPOpen(BGPMessage):
binopts = rest[:opt_param_len]
opt_param = []
while binopts:
opt, binopts = BGPOptParam.parser(binopts)
opt, binopts = _OptParam.parser(binopts)
opt_param.append(opt)
return {
"version": version,

View File

@ -40,9 +40,10 @@ class Test_bgp(unittest.TestCase):
eq_(rest, '')
def test_open2(self):
opt_param = [bgp.BGPOptParamCapability(cap_code=200, cap_value='hoge'),
bgp.BGPOptParamUnknown(type_=99, value='fuga')]
msg = bgp.BGPOpen(my_as=30000, bgp_identifier='192.0.2.2',
opt_param=[bgp.BGPOptParam(type_=1, value='hooge'),
bgp.BGPOptParam(type_=2, value='fuga')])
opt_param=opt_param)
binmsg = msg.serialize()
msg2, rest = bgp.BGPMessage.parser(binmsg)
eq_(str(msg), str(msg2))