mirror of
https://github.com/faucetsdn/ryu.git
synced 2026-05-09 14:26:10 +02:00
test_ofctl: improving readability
Signed-off-by: Minoru TAKAHASHI <takahashi.minoru7@gmail.com> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
parent
dc2d5a90b7
commit
969b4b6837
@ -116,220 +116,304 @@ class Test_ofctl(unittest.TestCase):
|
||||
pass
|
||||
|
||||
def _test_actions(self, act, test):
|
||||
act_type = act["type"]
|
||||
to_actions = test.to_actions
|
||||
actions_to_str = test.actions_to_str
|
||||
dp = ofproto_protocol.ProtocolDesc(version=test.ver)
|
||||
act_list = []
|
||||
act_list.append(act)
|
||||
# str -> action
|
||||
result = to_actions(dp, act_list)
|
||||
insts = result[0]
|
||||
act_type = act["type"]
|
||||
|
||||
def equal_act(insts, act, act_type, test):
|
||||
if act_type in test.supported_action:
|
||||
cls = test.supported_action[act_type]
|
||||
else:
|
||||
cls = None
|
||||
if act_type == 'GOTO_TABLE':
|
||||
ok_(isinstance(insts, cls))
|
||||
eq_(insts.table_id, act["table_id"])
|
||||
elif act_type == 'WRITE_METADATA':
|
||||
ok_(isinstance(insts, cls))
|
||||
eq_(insts.metadata, act["metadata"])
|
||||
eq_(insts.metadata_mask, act["metadata_mask"])
|
||||
elif act_type == 'METER':
|
||||
ok_(isinstance(insts, cls))
|
||||
eq_(insts.meter_id, act["meter_id"])
|
||||
else:
|
||||
if test.ver == ofproto_v1_0.OFP_VERSION:
|
||||
action = insts
|
||||
else:
|
||||
action = insts.actions[0]
|
||||
ok_(isinstance(action, cls))
|
||||
if act_type == 'OUTPUT':
|
||||
eq_(action.port, act["port"])
|
||||
elif act_type == 'SET_VLAN_VID':
|
||||
eq_(action.vlan_vid, act["vlan_vid"])
|
||||
elif act_type == 'SET_VLAN_PCP':
|
||||
eq_(action.vlan_pcp, act["vlan_pcp"])
|
||||
elif act_type == 'SET_DL_SRC':
|
||||
eq_(addrconv.mac.bin_to_text(action.dl_addr),
|
||||
act["dl_src"])
|
||||
elif act_type == 'SET_DL_DST':
|
||||
eq_(addrconv.mac.bin_to_text(action.dl_addr),
|
||||
act["dl_dst"])
|
||||
elif act_type == 'SET_NW_SRC':
|
||||
ip = netaddr.ip.IPAddress(action.nw_addr)
|
||||
eq_(str(ip), act["nw_src"])
|
||||
elif act_type == 'SET_NW_DST':
|
||||
ip = netaddr.ip.IPAddress(action.nw_addr)
|
||||
eq_(str(ip), act["nw_dst"])
|
||||
elif act_type == 'SET_NW_TOS':
|
||||
eq_(action.tos, act["nw_tos"])
|
||||
elif act_type == 'SET_TP_SRC':
|
||||
eq_(action.tp, act["tp_src"])
|
||||
elif act_type == 'SET_TP_DST':
|
||||
eq_(action.tp, act["tp_dst"])
|
||||
elif act_type == 'ENQUEUE':
|
||||
eq_(action.queue_id, act["queue_id"])
|
||||
eq_(action.port, act["port"])
|
||||
elif act_type == 'SET_MPLS_TTL':
|
||||
eq_(action.mpls_ttl, act["mpls_ttl"])
|
||||
elif act_type in ['PUSH_VLAN', 'PUSH_MPLS',
|
||||
'POP_MPLS', 'PUSH_PBB']:
|
||||
eq_(action.ethertype, act["ethertype"])
|
||||
elif act_type == 'SET_QUEUE':
|
||||
eq_(action.queue_id, act["queue_id"])
|
||||
elif act_type == 'GROUP':
|
||||
eq_(action.group_id, act["group_id"])
|
||||
elif act_type == 'SET_NW_TTL':
|
||||
eq_(action.nw_ttl, act["nw_ttl"])
|
||||
elif act_type in ['STRIP_VLAN', 'COPY_TTL_OUT',
|
||||
'COPY_TTL_IN', 'DEC_MPLS_TTL',
|
||||
'POP_VLAN', 'DEC_NW_TTL', 'POP_PBB']:
|
||||
pass
|
||||
else:
|
||||
assert False
|
||||
# str -> action
|
||||
insts = test.to_actions(dp, [act])
|
||||
|
||||
if test.ver == ofproto_v1_0.OFP_VERSION:
|
||||
equal_act(insts, act, act_type, test)
|
||||
action = insts[0]
|
||||
self._equal_str_to_act(action, act, act_type, test)
|
||||
else:
|
||||
if act_type == 'WRITE_ACTIONS':
|
||||
ok_(isinstance(insts, test._parser.OFPInstructionActions))
|
||||
eq_(insts.type, test._ofproto.OFPIT_WRITE_ACTIONS)
|
||||
equal_act(insts, act["actions"][0], act["actions"][0]["type"], test)
|
||||
elif act_type == 'CLEAR_ACTIONS':
|
||||
ok_(isinstance(insts, test._parser.OFPInstructionActions))
|
||||
eq_(insts.type, test._ofproto.OFPIT_CLEAR_ACTIONS)
|
||||
else:
|
||||
if act_type not in ['GOTO_TABLE', 'WRITE_METADATA', 'METER']:
|
||||
ok_(isinstance(insts, test._parser.OFPInstructionActions))
|
||||
eq_(insts.type, test._ofproto.OFPIT_APPLY_ACTIONS)
|
||||
equal_act(insts, act, act_type, test)
|
||||
else:
|
||||
equal_act(insts, act, act_type, test)
|
||||
inst = insts[0]
|
||||
self._equal_str_to_inst(inst, act, act_type, test)
|
||||
|
||||
# action -> str
|
||||
action_str = actions_to_str(result)
|
||||
inst_str = test.actions_to_str(insts)
|
||||
if test.ver == ofproto_v1_0.OFP_VERSION:
|
||||
act_str = inst_str
|
||||
self._equal_act_to_str(act_str, act, act_type, test)
|
||||
else:
|
||||
self._equal_inst_to_str(inst_str, act, act_type, test)
|
||||
|
||||
def equal_str(action_str, act, act_type, test):
|
||||
action_str_list = action_str[0].split(':', 1)
|
||||
eq_(action_str_list[0], act_type)
|
||||
def _test_match(self, attrs, test):
|
||||
dp = ofproto_protocol.ProtocolDesc(version=test.ver)
|
||||
|
||||
# str -> match
|
||||
match = test.to_match(dp, attrs)
|
||||
|
||||
for key, value in attrs.items():
|
||||
key = self._conv_key(test, key, attrs)
|
||||
self._equal_str_to_match(key, value, match, test)
|
||||
|
||||
# match -> str
|
||||
match_str = test.match_to_str(match)
|
||||
|
||||
for key, value in attrs.items():
|
||||
if key in conv_of12_to_of10_dict:
|
||||
key_old = conv_of12_to_of10_dict[key]
|
||||
else:
|
||||
key_old = key
|
||||
self._equal_match_to_str(key_old, value, match_str, test)
|
||||
|
||||
def _equal_str_to_inst(self, inst, act, act_type, test):
|
||||
if act_type in test.supported_action:
|
||||
cls = test.supported_action[act_type]
|
||||
else:
|
||||
cls = None
|
||||
if act_type == 'GOTO_TABLE':
|
||||
ok_(isinstance(inst, cls))
|
||||
eq_(inst.table_id, act["table_id"])
|
||||
elif act_type == 'WRITE_METADATA':
|
||||
ok_(isinstance(inst, cls))
|
||||
eq_(inst.metadata, act["metadata"])
|
||||
eq_(inst.metadata_mask, act["metadata_mask"])
|
||||
elif act_type == 'METER':
|
||||
ok_(isinstance(inst, cls))
|
||||
eq_(inst.meter_id, act["meter_id"])
|
||||
elif act_type == 'WRITE_ACTIONS':
|
||||
ok_(isinstance(inst, cls))
|
||||
eq_(inst.type, test._ofproto.OFPIT_WRITE_ACTIONS)
|
||||
self._equal_str_to_act(inst.actions[0],
|
||||
act["actions"][0],
|
||||
act["actions"][0]["type"],
|
||||
test)
|
||||
elif act_type == 'CLEAR_ACTIONS':
|
||||
ok_(isinstance(inst, cls))
|
||||
eq_(inst.type, test._ofproto.OFPIT_CLEAR_ACTIONS)
|
||||
else:
|
||||
# APPLY_ACTIONS or Uknown Action Type
|
||||
ok_(isinstance(inst, test._parser.OFPInstructionActions))
|
||||
eq_(inst.type, test._ofproto.OFPIT_APPLY_ACTIONS)
|
||||
self._equal_str_to_act(inst.actions[0], act,
|
||||
act_type, test)
|
||||
|
||||
def _equal_str_to_act(self, action, act, act_type, test):
|
||||
if act_type in test.supported_action:
|
||||
cls = test.supported_action[act_type]
|
||||
else:
|
||||
cls = None
|
||||
ok_(isinstance(action, cls))
|
||||
if act_type == 'OUTPUT':
|
||||
eq_(action.port, act["port"])
|
||||
elif act_type == 'SET_VLAN_VID':
|
||||
eq_(action.vlan_vid, act["vlan_vid"])
|
||||
elif act_type == 'SET_VLAN_PCP':
|
||||
eq_(action.vlan_pcp, act["vlan_pcp"])
|
||||
elif act_type == 'SET_DL_SRC':
|
||||
eq_(addrconv.mac.bin_to_text(action.dl_addr),
|
||||
act["dl_src"])
|
||||
elif act_type == 'SET_DL_DST':
|
||||
eq_(addrconv.mac.bin_to_text(action.dl_addr),
|
||||
act["dl_dst"])
|
||||
elif act_type == 'SET_NW_SRC':
|
||||
ip = netaddr.ip.IPAddress(action.nw_addr)
|
||||
eq_(str(ip), act["nw_src"])
|
||||
elif act_type == 'SET_NW_DST':
|
||||
ip = netaddr.ip.IPAddress(action.nw_addr)
|
||||
eq_(str(ip), act["nw_dst"])
|
||||
elif act_type == 'SET_NW_TOS':
|
||||
eq_(action.tos, act["nw_tos"])
|
||||
elif act_type == 'SET_TP_SRC':
|
||||
eq_(action.tp, act["tp_src"])
|
||||
elif act_type == 'SET_TP_DST':
|
||||
eq_(action.tp, act["tp_dst"])
|
||||
elif act_type == 'ENQUEUE':
|
||||
eq_(action.queue_id, act["queue_id"])
|
||||
eq_(action.port, act["port"])
|
||||
elif act_type == 'SET_MPLS_TTL':
|
||||
eq_(action.mpls_ttl, act["mpls_ttl"])
|
||||
elif act_type in ['PUSH_VLAN', 'PUSH_MPLS',
|
||||
'POP_MPLS', 'PUSH_PBB']:
|
||||
eq_(action.ethertype, act["ethertype"])
|
||||
elif act_type == 'SET_QUEUE':
|
||||
eq_(action.queue_id, act["queue_id"])
|
||||
elif act_type == 'GROUP':
|
||||
eq_(action.group_id, act["group_id"])
|
||||
elif act_type == 'SET_NW_TTL':
|
||||
eq_(action.nw_ttl, act["nw_ttl"])
|
||||
elif act_type == 'SET_FIELD':
|
||||
eq_(action.key, act['field'])
|
||||
eq_(action.value, act['value'])
|
||||
elif act_type in ['STRIP_VLAN', 'COPY_TTL_OUT',
|
||||
'COPY_TTL_IN', 'DEC_MPLS_TTL',
|
||||
'POP_VLAN', 'DEC_NW_TTL', 'POP_PBB']:
|
||||
pass
|
||||
else: # Uknown Action Type
|
||||
assert False
|
||||
|
||||
def _equal_inst_to_str(self, inst_str, act, act_type, test):
|
||||
if act_type == 'WRITE_ACTIONS':
|
||||
act_str = inst_str[0]["WRITE_ACTIONS"]
|
||||
act = act["actions"][0]
|
||||
act_type = act["type"]
|
||||
self._equal_act_to_str(act_str, act, act_type, test)
|
||||
else:
|
||||
inst_str_list = inst_str[0].split(':', 1)
|
||||
eq_(inst_str_list[0], act_type)
|
||||
if act_type == 'GOTO_TABLE':
|
||||
eq_(int(action_str_list[1]), act["table_id"])
|
||||
eq_(int(inst_str_list[1]), act["table_id"])
|
||||
elif act_type == 'WRITE_METADATA':
|
||||
met = action_str_list[1].split('/')
|
||||
met = inst_str_list[1].split('/')
|
||||
eq_(int(met[0], 16), act["metadata"])
|
||||
eq_(int(met[1], 16), act["metadata_mask"])
|
||||
elif act_type == 'METER':
|
||||
eq_(int(action_str_list[1]), act["meter_id"])
|
||||
eq_(int(inst_str_list[1]), act["meter_id"])
|
||||
elif act_type == 'CLEAR_ACTIONS':
|
||||
pass
|
||||
else:
|
||||
if act_type == 'OUTPUT':
|
||||
eq_(int(action_str_list[1]), act["port"])
|
||||
elif act_type == 'SET_VLAN_VID':
|
||||
eq_(int(action_str_list[1]), act["vlan_vid"])
|
||||
elif act_type == 'SET_VLAN_PCP':
|
||||
eq_(int(action_str_list[1]), act["vlan_pcp"])
|
||||
elif act_type == 'SET_DL_SRC':
|
||||
eq_(action_str_list[1], act["dl_src"])
|
||||
elif act_type == 'SET_DL_DST':
|
||||
eq_(action_str_list[1], act["dl_dst"])
|
||||
elif act_type == 'SET_NW_SRC':
|
||||
eq_(action_str_list[1], act["nw_src"])
|
||||
elif act_type == 'SET_NW_DST':
|
||||
eq_(action_str_list[1], act["nw_dst"])
|
||||
elif act_type == 'SET_NW_TOS':
|
||||
eq_(int(action_str_list[1]), act["nw_tos"])
|
||||
elif act_type == 'SET_TP_SRC':
|
||||
eq_(int(action_str_list[1]), act["tp_src"])
|
||||
elif act_type == 'SET_TP_DST':
|
||||
eq_(int(action_str_list[1]), act["tp_dst"])
|
||||
elif act_type == 'ENQUEUE':
|
||||
enq = action_str_list[1].split(':')
|
||||
eq_(int(enq[0], 10), act["port"])
|
||||
eq_(int(enq[1], 10), act["queue_id"])
|
||||
elif act_type == 'SET_MPLS_TTL':
|
||||
eq_(int(action_str_list[1]), act["mpls_ttl"])
|
||||
elif act_type == 'PUSH_VLAN':
|
||||
eq_(int(action_str_list[1]), act["ethertype"])
|
||||
elif act_type == 'PUSH_MPLS':
|
||||
eq_(int(action_str_list[1]), act["ethertype"])
|
||||
elif act_type == 'POP_MPLS':
|
||||
eq_(int(action_str_list[1]), act["ethertype"])
|
||||
elif act_type == 'SET_QUEUE':
|
||||
eq_(int(action_str_list[1]), act["queue_id"])
|
||||
elif act_type == 'GROUP':
|
||||
eq_(int(action_str_list[1]), act["group_id"])
|
||||
elif act_type == 'SET_NW_TTL':
|
||||
eq_(int(action_str_list[1]), act["nw_ttl"])
|
||||
elif act_type == 'SET_FIELD':
|
||||
eq_(action_str_list[1].strip(' {'), act["field"])
|
||||
eq_(action_str_list[2].strip('} '), act["value"])
|
||||
elif act_type == 'PUSH_PBB':
|
||||
eq_(int(action_str_list[1]), act["ethertype"])
|
||||
elif act_type in ['STRIP_VLAN', 'COPY_TTL_OUT',
|
||||
'COPY_TTL_IN', 'DEC_MPLS_TTL',
|
||||
'POP_VLAN', 'DEC_NW_TTL', 'POP_PBB']:
|
||||
pass
|
||||
else:
|
||||
assert False
|
||||
# APPLY_ACTIONS
|
||||
act_str = inst_str
|
||||
self._equal_act_to_str(act_str, act, act_type, test)
|
||||
|
||||
if act_type == 'WRITE_ACTIONS':
|
||||
action_str = action_str[0]["WRITE_ACTIONS"]
|
||||
act = act["actions"][0]
|
||||
act_type = act["type"]
|
||||
equal_str(action_str, act, act_type, test)
|
||||
def _equal_act_to_str(self, act_str, act, act_type, test):
|
||||
act_str_list = act_str[0].split(':', 1)
|
||||
eq_(act_str_list[0], act_type)
|
||||
if act_type == 'OUTPUT':
|
||||
eq_(int(act_str_list[1]), act["port"])
|
||||
elif act_type == 'SET_VLAN_VID':
|
||||
eq_(int(act_str_list[1]), act["vlan_vid"])
|
||||
elif act_type == 'SET_VLAN_PCP':
|
||||
eq_(int(act_str_list[1]), act["vlan_pcp"])
|
||||
elif act_type == 'SET_DL_SRC':
|
||||
eq_(act_str_list[1], act["dl_src"])
|
||||
elif act_type == 'SET_DL_DST':
|
||||
eq_(act_str_list[1], act["dl_dst"])
|
||||
elif act_type == 'SET_NW_SRC':
|
||||
eq_(act_str_list[1], act["nw_src"])
|
||||
elif act_type == 'SET_NW_DST':
|
||||
eq_(act_str_list[1], act["nw_dst"])
|
||||
elif act_type == 'SET_NW_TOS':
|
||||
eq_(int(act_str_list[1]), act["nw_tos"])
|
||||
elif act_type == 'SET_TP_SRC':
|
||||
eq_(int(act_str_list[1]), act["tp_src"])
|
||||
elif act_type == 'SET_TP_DST':
|
||||
eq_(int(act_str_list[1]), act["tp_dst"])
|
||||
elif act_type == 'ENQUEUE':
|
||||
enq = act_str_list[1].split(':')
|
||||
eq_(int(enq[0], 10), act["port"])
|
||||
eq_(int(enq[1], 10), act["queue_id"])
|
||||
elif act_type == 'SET_MPLS_TTL':
|
||||
eq_(int(act_str_list[1]), act["mpls_ttl"])
|
||||
elif act_type == 'PUSH_VLAN':
|
||||
eq_(int(act_str_list[1]), act["ethertype"])
|
||||
elif act_type == 'PUSH_MPLS':
|
||||
eq_(int(act_str_list[1]), act["ethertype"])
|
||||
elif act_type == 'POP_MPLS':
|
||||
eq_(int(act_str_list[1]), act["ethertype"])
|
||||
elif act_type == 'SET_QUEUE':
|
||||
eq_(int(act_str_list[1]), act["queue_id"])
|
||||
elif act_type == 'GROUP':
|
||||
eq_(int(act_str_list[1]), act["group_id"])
|
||||
elif act_type == 'SET_NW_TTL':
|
||||
eq_(int(act_str_list[1]), act["nw_ttl"])
|
||||
elif act_type == 'SET_FIELD':
|
||||
field, value = act_str_list[1].split(':')
|
||||
eq_(field.strip(' {'), act["field"])
|
||||
eq_(int(value.strip('} ')), act["value"])
|
||||
elif act_type == 'PUSH_PBB':
|
||||
eq_(int(act_str_list[1]), act["ethertype"])
|
||||
elif act_type in ['STRIP_VLAN', 'COPY_TTL_OUT',
|
||||
'COPY_TTL_IN', 'DEC_MPLS_TTL',
|
||||
'POP_VLAN', 'DEC_NW_TTL', 'POP_PBB']:
|
||||
pass
|
||||
else:
|
||||
equal_str(action_str, act, act_type, test)
|
||||
assert False
|
||||
|
||||
def _test_to_match(self, attrs, test):
|
||||
to_match = test.to_match
|
||||
match_to_str = test.match_to_str
|
||||
dp = ofproto_protocol.ProtocolDesc(version=test.ver)
|
||||
ofproto = dp.ofproto
|
||||
def _equal_str_to_match(self, key, value, match, test):
|
||||
field_value = self._get_field_value(test, key, match)
|
||||
|
||||
# str -> match
|
||||
match = to_match(dp, attrs)
|
||||
|
||||
def equal_match(key, value, match):
|
||||
key = self._conv_key(test, key, attrs)
|
||||
field_value = self._get_field_value(test, key, match)
|
||||
|
||||
if key in ['eth_src', 'eth_dst', 'arp_sha', 'arp_tha']:
|
||||
# MAC address
|
||||
eth, mask = _to_match_eth(value)
|
||||
if mask is not None:
|
||||
# with mask
|
||||
for i in range(0, len(mask)):
|
||||
if mask[i] == 'f':
|
||||
eq_(eth[i], field_value[0][i])
|
||||
eq_(mask, field_value[1])
|
||||
else:
|
||||
# without mask
|
||||
eq_(eth, field_value)
|
||||
return
|
||||
elif key in ['dl_src', 'dl_dst']:
|
||||
eth, mask = _to_match_eth(value)
|
||||
field_value = addrconv.mac.bin_to_text(field_value)
|
||||
if key in ['eth_src', 'eth_dst', 'arp_sha', 'arp_tha']:
|
||||
# MAC address
|
||||
eth, mask = _to_match_eth(value)
|
||||
if mask is not None:
|
||||
# with mask
|
||||
for i in range(0, len(mask)):
|
||||
if mask[i] == 'f':
|
||||
eq_(eth[i], field_value[0][i])
|
||||
eq_(mask, field_value[1])
|
||||
else:
|
||||
# without mask
|
||||
eq_(eth, field_value)
|
||||
return
|
||||
elif key in ['ipv4_src', 'ipv4_dst', 'arp_spa', 'arp_tpa']:
|
||||
# IPv4 address
|
||||
ipv4, mask = _to_match_ip(value)
|
||||
if mask is not None:
|
||||
# with mask
|
||||
eq_(ipv4, field_value[0])
|
||||
eq_(mask, field_value[1])
|
||||
else:
|
||||
# without mask
|
||||
eq_(ipv4, field_value)
|
||||
return
|
||||
elif key in ['nw_src', 'nw_dst']:
|
||||
# IPv4 address
|
||||
return
|
||||
elif key in ['dl_src', 'dl_dst']:
|
||||
eth, mask = _to_match_eth(value)
|
||||
field_value = addrconv.mac.bin_to_text(field_value)
|
||||
eq_(eth, field_value)
|
||||
return
|
||||
elif key in ['ipv4_src', 'ipv4_dst', 'arp_spa', 'arp_tpa']:
|
||||
# IPv4 address
|
||||
ipv4, mask = _to_match_ip(value)
|
||||
if mask is not None:
|
||||
# with mask
|
||||
eq_(ipv4, field_value[0])
|
||||
eq_(mask, field_value[1])
|
||||
else:
|
||||
# without mask
|
||||
eq_(ipv4, field_value)
|
||||
return
|
||||
elif key in ['nw_src', 'nw_dst']:
|
||||
# IPv4 address
|
||||
ipv4, mask = _to_match_ip(value)
|
||||
field_value = _to_match_ip(field_value)
|
||||
if mask is not None:
|
||||
# with mask
|
||||
eq_(ipv4, field_value[0])
|
||||
eq_(mask, field_value[1])
|
||||
else:
|
||||
# without mask
|
||||
eq_(ipv4, field_value[0])
|
||||
return
|
||||
elif key in ['ipv6_src', 'ipv6_dst']:
|
||||
# IPv6 address
|
||||
ipv6, mask = _to_match_ip(value)
|
||||
if mask is not None:
|
||||
# with mask
|
||||
eq_(ipv6, field_value[0])
|
||||
eq_(mask, field_value[1])
|
||||
else:
|
||||
# without mask
|
||||
eq_(ipv6, field_value)
|
||||
return
|
||||
elif key == 'vlan_vid':
|
||||
if test.ver == ofproto_v1_0.OFP_VERSION:
|
||||
eq_(value, field_value)
|
||||
else:
|
||||
eq_(test.expected_value['vlan_vid'][
|
||||
value]['to_match'], field_value)
|
||||
return
|
||||
else:
|
||||
if isinstance(value, str) and '/' in value:
|
||||
# with mask
|
||||
value, mask = _to_match_masked_int(value)
|
||||
value &= mask
|
||||
eq_(value, field_value[0])
|
||||
eq_(mask, field_value[1])
|
||||
else:
|
||||
# without mask
|
||||
eq_(_str_to_int(value), field_value)
|
||||
return
|
||||
|
||||
def _equal_match_to_str(self, key, value, match_str, test):
|
||||
field_value = match_str[key]
|
||||
if key in ['dl_src', 'dl_dst', 'arp_sha', 'arp_tha']:
|
||||
# MAC address
|
||||
eth, mask = _to_match_eth(value)
|
||||
if mask is not None:
|
||||
# with mask
|
||||
field_value = field_value.split('/')
|
||||
for i in range(0, len(mask)):
|
||||
if mask[i] == 'f':
|
||||
eq_(eth[i], field_value[0][i])
|
||||
eq_(mask, field_value[1])
|
||||
else:
|
||||
# without mask
|
||||
eq_(eth, field_value)
|
||||
return
|
||||
elif key in['nw_src', 'nw_dst', 'arp_spa', 'arp_tpa']:
|
||||
# IPv4 address
|
||||
if test.ver == ofproto_v1_0.OFP_VERSION:
|
||||
ipv4, mask = _to_match_ip(value)
|
||||
field_value = _to_match_ip(field_value)
|
||||
if mask is not None:
|
||||
@ -339,117 +423,45 @@ class Test_ofctl(unittest.TestCase):
|
||||
else:
|
||||
# without mask
|
||||
eq_(ipv4, field_value[0])
|
||||
return
|
||||
elif key in ['ipv6_src', 'ipv6_dst']:
|
||||
# IPv6 address
|
||||
ipv6, mask = _to_match_ip(value)
|
||||
if mask is not None:
|
||||
# with mask
|
||||
eq_(ipv6, field_value[0])
|
||||
eq_(mask, field_value[1])
|
||||
else:
|
||||
# without mask
|
||||
eq_(ipv6, field_value)
|
||||
return
|
||||
elif key == 'vlan_vid':
|
||||
if test.ver == ofproto_v1_0.OFP_VERSION:
|
||||
eq_(value, field_value)
|
||||
else:
|
||||
eq_(test.expected_value['vlan_vid'][
|
||||
value]['to_match'], field_value)
|
||||
return
|
||||
else:
|
||||
if isinstance(value, str) and '/' in value:
|
||||
# with mask
|
||||
value, mask = _to_match_masked_int(value)
|
||||
value &= mask
|
||||
eq_(value, field_value[0])
|
||||
eq_(mask, field_value[1])
|
||||
else:
|
||||
# without mask
|
||||
eq_(_str_to_int(value), field_value)
|
||||
return
|
||||
|
||||
for key, value in attrs.items():
|
||||
equal_match(key, value, match)
|
||||
|
||||
# match -> str
|
||||
match_str = match_to_str(match)
|
||||
|
||||
def equal_str(key, value, match_str):
|
||||
field_value = match_str[key]
|
||||
if key in ['dl_src', 'dl_dst', 'arp_sha', 'arp_tha']:
|
||||
# MAC address
|
||||
eth, mask = _to_match_eth(value)
|
||||
ipv4, mask = _to_match_ip(value)
|
||||
if mask is not None:
|
||||
# with mask
|
||||
field_value = field_value.split('/')
|
||||
for i in range(0, len(mask)):
|
||||
if mask[i] == 'f':
|
||||
eq_(eth[i], field_value[0][i])
|
||||
eq_(ipv4, field_value[0])
|
||||
eq_(mask, field_value[1])
|
||||
else:
|
||||
# without mask
|
||||
eq_(eth, field_value)
|
||||
return
|
||||
elif key in['nw_src', 'nw_dst', 'arp_spa', 'arp_tpa']:
|
||||
# IPv4 address
|
||||
if test.ver == ofproto_v1_0.OFP_VERSION:
|
||||
ipv4, mask = _to_match_ip(value)
|
||||
field_value = _to_match_ip(field_value)
|
||||
if mask is not None:
|
||||
# with mask
|
||||
eq_(ipv4, field_value[0])
|
||||
eq_(mask, field_value[1])
|
||||
else:
|
||||
# without mask
|
||||
eq_(ipv4, field_value[0])
|
||||
else:
|
||||
ipv4, mask = _to_match_ip(value)
|
||||
if mask is not None:
|
||||
# with mask
|
||||
field_value = field_value.split('/')
|
||||
eq_(ipv4, field_value[0])
|
||||
eq_(mask, field_value[1])
|
||||
else:
|
||||
# without mask
|
||||
eq_(ipv4, field_value)
|
||||
return
|
||||
elif key in ['ipv6_src', 'ipv6_dst']:
|
||||
# IPv6 address
|
||||
ipv6, mask = _to_match_ip(value)
|
||||
if mask is not None:
|
||||
# with mask
|
||||
field_value = field_value.split('/')
|
||||
eq_(ipv6, field_value[0])
|
||||
eq_(mask, field_value[1])
|
||||
else:
|
||||
# without mask
|
||||
eq_(ipv6, field_value)
|
||||
return
|
||||
elif key == 'dl_vlan':
|
||||
if test.ver == ofproto_v1_0.OFP_VERSION:
|
||||
eq_(value, field_value)
|
||||
else:
|
||||
eq_(test.expected_value['vlan_vid'][
|
||||
value]['to_str'], field_value)
|
||||
return
|
||||
eq_(ipv4, field_value)
|
||||
return
|
||||
elif key in ['ipv6_src', 'ipv6_dst']:
|
||||
# IPv6 address
|
||||
ipv6, mask = _to_match_ip(value)
|
||||
if mask is not None:
|
||||
# with mask
|
||||
field_value = field_value.split('/')
|
||||
eq_(ipv6, field_value[0])
|
||||
eq_(mask, field_value[1])
|
||||
else:
|
||||
if isinstance(value, str) and '/' in value:
|
||||
# with mask
|
||||
value = _to_masked_int_str(value)
|
||||
eq_(value, field_value)
|
||||
else:
|
||||
# without mask
|
||||
eq_(_str_to_int(value), field_value)
|
||||
return
|
||||
|
||||
for key, value in attrs.items():
|
||||
if key in conv_of12_to_of10_dict:
|
||||
key_old = conv_of12_to_of10_dict[key]
|
||||
# without mask
|
||||
eq_(ipv6, field_value)
|
||||
return
|
||||
elif key == 'dl_vlan':
|
||||
if test.ver == ofproto_v1_0.OFP_VERSION:
|
||||
eq_(value, field_value)
|
||||
else:
|
||||
key_old = key
|
||||
equal_str(key_old, value, match_str)
|
||||
eq_(test.expected_value['vlan_vid'][
|
||||
value]['to_str'], field_value)
|
||||
return
|
||||
else:
|
||||
if isinstance(value, str) and '/' in value:
|
||||
# with mask
|
||||
value = _to_masked_int_str(value)
|
||||
eq_(value, field_value)
|
||||
else:
|
||||
# without mask
|
||||
eq_(_str_to_int(value), field_value)
|
||||
return
|
||||
|
||||
def _conv_key(self, test, key, attrs):
|
||||
if test.ver != ofproto_v1_0.OFP_VERSION:
|
||||
@ -723,6 +735,8 @@ class test_data_v1_2(test_data_base):
|
||||
'SET_FIELD': self._parser.OFPActionSetField,
|
||||
'GOTO_TABLE': self._parser.OFPInstructionGotoTable,
|
||||
'WRITE_METADATA': self._parser.OFPInstructionWriteMetadata,
|
||||
'WRITE_ACTIONS': self._parser.OFPInstructionActions,
|
||||
'CLEAR_ACTIONS': self._parser.OFPInstructionActions,
|
||||
})
|
||||
self.set_expected_value()
|
||||
|
||||
@ -827,7 +841,7 @@ def _add_tests_match(cls):
|
||||
def _run(self, name, attr, cls):
|
||||
print('processing %s ...' % name)
|
||||
cls_ = Test_ofctl(name)
|
||||
cls_._test_to_match(attr, cls)
|
||||
cls_._test_match(attr, cls)
|
||||
print('adding %s ...' % method_name)
|
||||
func = functools.partial(
|
||||
_run, name=method_name, attr=attr, cls=cls)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user