sw test tool: Add support for sending packets continuously

Signed-off-by: WATANABE Fumitaka <watanabe.fumitaka1@gmail.com>
Signed-off-by: Yuichi Ito <ito.yuichi0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
Yuichi Ito 2014-03-28 13:19:07 +09:00 committed by FUJITA Tomonori
parent 3b7b40d9a6
commit 867062f586

View File

@ -81,6 +81,8 @@ TARGET_RECEIVE_PORT = 1
INTERVAL = 1 # sec
WAIT_TIMER = 3 # sec
CONTINUOUS_THREAD_INTVL = float(0.01) # sec
CONTINUOUS_PROGRESS_SPAN = 3 # sec
# Default settings for 'ingress: packets'
DEFAULT_DURATION_TIME = 30
@ -230,6 +232,9 @@ class OfTester(app_manager.RyuApp):
self.waiter = None
self.send_msg_xids = []
self.rcv_msgs = []
self.ingress_event = None
self.ingress_threads = []
self.thread_msg = None
self.test_thread = hub.spawn(
self._test_sequential_execute, test_dir)
@ -252,7 +257,9 @@ class OfTester(app_manager.RyuApp):
def close(self):
if self.test_thread is not None:
hub.kill(self.test_thread)
hub.joinall([self.test_thread])
if self.ingress_event:
self.ingress_event.set()
hub.joinall([self.test_thread])
self._test_end('--- Test terminated ---')
@set_ev_cls(ofp_event.EventOFPStateChange,
@ -336,6 +343,7 @@ class OfTester(app_manager.RyuApp):
if description:
self.logger.info('%s', description)
self.thread_msg = None
# Test execute.
try:
@ -381,6 +389,11 @@ class OfTester(app_manager.RyuApp):
except Exception:
result = [TEST_ERROR, RYU_INTERNAL_ERROR]
result_type = RYU_INTERNAL_ERROR
finally:
self.ingress_event = None
for tid in self.ingress_threads:
hub.kill(tid)
self.ingress_threads = []
# Output test result.
self.logger.info(' %-100s %s', test.description, result[0])
@ -635,6 +648,66 @@ class OfTester(app_manager.RyuApp):
if not lookup:
raise TestError(self.state)
def _continuous_packet_send(self, pkt):
assert self.ingress_event is None
pkt_data = pkt[KEY_PACKETS][KEY_DATA]
pktps = pkt[KEY_PACKETS][KEY_PKTPS]
duration_time = pkt[KEY_PACKETS][KEY_DURATION_TIME]
self.logger.debug("send_packet:[%s]", packet.Packet(pkt_data))
self.logger.debug("pktps:[%d]", pktps)
self.logger.debug("duration_time:[%d]", duration_time)
arg = {'pkt_data': pkt_data,
'thread_counter': 0,
'dot_span': int(CONTINUOUS_PROGRESS_SPAN /
CONTINUOUS_THREAD_INTVL),
'packet_counter': float(0),
'packet_counter_inc': pktps * CONTINUOUS_THREAD_INTVL}
try:
self.ingress_event = hub.Event()
tid = hub.spawn(self._send_packet_thread, arg)
self.ingress_threads.append(tid)
self.ingress_event.wait(duration_time)
if self.thread_msg is not None:
raise self.thread_msg # pylint: disable=E0702
finally:
sys.stdout.write("\r\n")
sys.stdout.flush()
def _send_packet_thread(self, arg):
""" Send several packets continuously. """
if self.ingress_event is None or self.ingress_event._cond:
return
# display dots to express progress of sending packets
if not arg['thread_counter'] % arg['dot_span']:
sys.stdout.write(".")
sys.stdout.flush()
arg['thread_counter'] += 1
# pile up float values and
# use integer portion as the number of packets this thread sends
arg['packet_counter'] += arg['packet_counter_inc']
count = int(arg['packet_counter'])
arg['packet_counter'] -= count
hub.sleep(CONTINUOUS_THREAD_INTVL)
tid = hub.spawn(self._send_packet_thread, arg)
self.ingress_threads.append(tid)
hub.sleep(0)
for _ in range(count):
try:
self.tester_sw.send_packet_out(arg['pkt_data'])
except Exception as err:
self.thread_msg = err
self.ingress_event.set()
break
def _compare_flow(self, stats1, stats2):
attr_list = ['cookie', 'priority', 'hard_timeout', 'idle_timeout',
'table_id', 'instructions', 'match']