mirror of
https://github.com/flatcar/scripts.git
synced 2025-09-23 22:51:03 +02:00
Pre-generate updates for test harness in parallel.
First CL of many. Thought I'd commit this before the others since it sets stuff up for the other CL's (i.e. Thread Pool). With this CL, all updates will be pre-generated before the test run. This should improve some speeds since the two updates will be produced at the same time rather than at the time of the sequential tests. Change-Id: Iaf0a06f0d99c31d3d749e478fd2fb6efa0763a28 BUG=chromium-os:10723 TEST=Ran it with -q Review URL: http://codereview.chromium.org/6264005
This commit is contained in:
parent
2c11e0d7bc
commit
a4cc3cd3ab
@ -16,7 +16,7 @@ import optparse
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import thread
|
||||
import threading
|
||||
import time
|
||||
import unittest
|
||||
import urllib
|
||||
@ -134,9 +134,10 @@ class AUTest(object):
|
||||
# Will raise ValueError if expected is not found.
|
||||
if re.search(re.escape(expected_msg), err.stdout, re.MULTILINE):
|
||||
return
|
||||
else:
|
||||
Warning("Didn't find '%s' in:" % expected_msg)
|
||||
Warning(err.stdout)
|
||||
|
||||
Warning("Didn't find '%s' in:" % expected_msg)
|
||||
Warning(err.stdout)
|
||||
self.fail('We managed to update when failure was expected')
|
||||
|
||||
def AttemptUpdateWithFilter(self, filter, proxy_port=8081):
|
||||
@ -155,7 +156,7 @@ class AUTest(object):
|
||||
|
||||
# This update is expected to fail...
|
||||
try:
|
||||
self.PerformUpdate(self.target_image_path, self.base_image_path,
|
||||
self.PerformUpdate(self.target_image_path, self.target_image_path,
|
||||
proxy_port=proxy_port)
|
||||
finally:
|
||||
proxy.shutdown()
|
||||
@ -248,12 +249,10 @@ class AUTest(object):
|
||||
percent_passed = self.VerifyImage(10)
|
||||
|
||||
# Update to - all tests should pass on new image.
|
||||
Info('Updating from base image on vm to target image.')
|
||||
self.PerformUpdate(self.target_image_path, self.base_image_path)
|
||||
self.VerifyImage(100)
|
||||
|
||||
# Update from - same percentage should pass that originally passed.
|
||||
Info('Updating from updated image on vm back to base image.')
|
||||
self.PerformUpdate(self.base_image_path, self.target_image_path)
|
||||
self.VerifyImage(percent_passed)
|
||||
|
||||
@ -271,12 +270,10 @@ class AUTest(object):
|
||||
percent_passed = self.VerifyImage(10)
|
||||
|
||||
# Update to - all tests should pass on new image.
|
||||
Info('Updating from base image on vm to target image and wiping stateful.')
|
||||
self.PerformUpdate(self.target_image_path, self.base_image_path, 'clean')
|
||||
self.VerifyImage(100)
|
||||
|
||||
# Update from - same percentage should pass that originally passed.
|
||||
Info('Updating from updated image back to base image and wiping stateful.')
|
||||
self.PerformUpdate(self.base_image_path, self.target_image_path, 'clean')
|
||||
self.VerifyImage(percent_passed)
|
||||
|
||||
@ -508,13 +505,15 @@ class VirtualAUTest(unittest.TestCase, AUTest):
|
||||
|
||||
def PrepareBase(self, image_path):
|
||||
"""Creates an update-able VM based on base image."""
|
||||
# Needed for VM delta updates. We need to use the qemu image rather
|
||||
# than the base image on a first update. By tracking the first_update
|
||||
# we can set src_image to the qemu form of the base image when
|
||||
# performing generating the delta payload.
|
||||
self._first_update = True
|
||||
self.vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname(
|
||||
image_path)
|
||||
|
||||
Info('Creating: %s' % self.vm_image_path)
|
||||
|
||||
if not os.path.exists(self.vm_image_path):
|
||||
Info('Qemu image %s not found, creating one.' % self.vm_image_path)
|
||||
Info('Creating %s' % vm_image_path)
|
||||
RunCommand(['%s/image_to_vm.sh' % self.crosutils,
|
||||
'--full',
|
||||
'--from=%s' % ReinterpretPathForChroot(
|
||||
@ -523,18 +522,17 @@ class VirtualAUTest(unittest.TestCase, AUTest):
|
||||
'--statefulfs_size=%s' % self._FULL_STATEFULFS_SIZE,
|
||||
'--board=%s' % self.board,
|
||||
'--test_image'], enter_chroot=True)
|
||||
else:
|
||||
Info('Using existing VM image %s' % self.vm_image_path)
|
||||
|
||||
Info('Testing for %s' % self.vm_image_path)
|
||||
Info('Using %s as base' % self.vm_image_path)
|
||||
self.assertTrue(os.path.exists(self.vm_image_path))
|
||||
|
||||
def _UpdateImage(self, image_path, src_image_path='', stateful_change='old',
|
||||
proxy_port=None):
|
||||
"""Updates VM image with image_path."""
|
||||
stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
|
||||
if src_image_path == self.base_image_path:
|
||||
if src_image_path and self._first_update:
|
||||
src_image_path = self.vm_image_path
|
||||
self._first_update = False
|
||||
|
||||
cmd = ['%s/cros_run_vm_update' % self.crosutilsbin,
|
||||
'--update_image_path=%s' % image_path,
|
||||
@ -608,6 +606,144 @@ class VirtualAUTest(unittest.TestCase, AUTest):
|
||||
return self.AssertEnoughTestsPassed(self, output, percent_required_to_pass)
|
||||
|
||||
|
||||
class GenerateVirtualAUDeltasTest(VirtualAUTest):
|
||||
"""Class the overrides VirtualAUTest and stores deltas we will generate."""
|
||||
delta_list = {}
|
||||
|
||||
def setUp(self):
|
||||
AUTest.setUp(self)
|
||||
|
||||
def _UpdateImage(self, image_path, src_image_path='', stateful_change='old',
|
||||
proxy_port=None):
|
||||
if src_image_path and self._first_update:
|
||||
src_image_path = self.vm_image_path
|
||||
self._first_update = False
|
||||
|
||||
image_path = ReinterpretPathForChroot(image_path)
|
||||
if src_image_path:
|
||||
src_image_path = ReinterpretPathForChroot(src_image_path)
|
||||
if not self.delta_list.has_key(image_path):
|
||||
self.delta_list[image_path] = set([src_image_path])
|
||||
else:
|
||||
self.delta_list[image_path].add(src_image_path)
|
||||
|
||||
def AttemptUpdateWithPayloadExpectedFailure(self, payload, expected_msg):
|
||||
pass
|
||||
|
||||
def VerifyImage(self, percent_required_to_pass):
|
||||
pass
|
||||
|
||||
|
||||
class ParallelJob(threading.Thread):
|
||||
"""Small wrapper for threading.Thread that releases a semaphore on exit."""
|
||||
def __init__(self, semaphore, target, args):
|
||||
threading.Thread.__init__(self, target=target, args=args)
|
||||
self._target = target
|
||||
self._args = args
|
||||
self._semaphore = semaphore
|
||||
self._output = None
|
||||
self._completed = False
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
threading.Thread.run(self)
|
||||
finally:
|
||||
self._Cleanup()
|
||||
self._completed = True
|
||||
|
||||
def GetOutput(self):
|
||||
assert self._completed, 'GetOutput called before thread was run.'
|
||||
return self._output
|
||||
|
||||
def _Cleanup(self):
|
||||
self._semaphore.release()
|
||||
|
||||
def __str__(self):
|
||||
return '%s(%s)' % (self._target, self._args)
|
||||
|
||||
|
||||
def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args):
|
||||
"""Runs set number of specified jobs in parallel.
|
||||
|
||||
Args:
|
||||
number_of_simultaneous_jobs: Max number of threads to be run in parallel.
|
||||
jobs: Array of methods to run.
|
||||
jobs_args: Array of args associated with method calls.
|
||||
Returns:
|
||||
Returns an array of results corresponding to each thread.
|
||||
"""
|
||||
def _TwoTupleize(x, y):
|
||||
return (x, y)
|
||||
|
||||
threads = []
|
||||
job_pool_semaphore = threading.Semaphore(number_of_sumultaneous_jobs)
|
||||
assert len(jobs) == len(jobs_args), 'Length of args array is wrong.'
|
||||
|
||||
# Create the parallel jobs.
|
||||
for job, args in map(_TwoTupleize, jobs, jobs_args):
|
||||
thread = ParallelJob(job_pool_semaphore, target=job, args=args)
|
||||
threads.append(thread)
|
||||
|
||||
# We use a semaphore to ensure we don't run more jobs that required.
|
||||
# After each thread finishes, it releases (increments semaphore).
|
||||
# Acquire blocks of num jobs reached and continues when a thread finishes.
|
||||
for next_thread in threads:
|
||||
job_pool_semaphore.acquire(blocking=True)
|
||||
Info('Starting %s' % next_thread)
|
||||
next_thread.start()
|
||||
|
||||
# Wait on the rest of the threads to finish.
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
return [thread.GetOutput() for thread in threads]
|
||||
|
||||
|
||||
def _PregenerateUpdates(parser, options):
|
||||
"""Determines all deltas that will be generated and generates them.
|
||||
|
||||
This method effectively pre-generates the dev server cache for all tests.
|
||||
|
||||
Args:
|
||||
parser: parser from main.
|
||||
options: options from parsed parser.
|
||||
Returns:
|
||||
Array of output from generating updates.
|
||||
"""
|
||||
def _GenerateVMUpdate(target, src):
|
||||
"""Generates an update using the devserver."""
|
||||
RunCommand(['./start_devserver',
|
||||
'--pregenerate_update',
|
||||
'--exit',
|
||||
'--image=%s' % target,
|
||||
'--src_image=%s' % src,
|
||||
'--for_vm'
|
||||
])
|
||||
|
||||
# Get the list of deltas by mocking out update method in test class.
|
||||
GenerateVirtualAUDeltasTest.ProcessOptions(parser, options)
|
||||
test_loader = unittest.TestLoader()
|
||||
test_loader.testMethodPrefix = options.test_prefix
|
||||
test_suite = test_loader.loadTestsFromTestCase(GenerateVirtualAUDeltasTest)
|
||||
test_result = unittest.TextTestRunner(verbosity=0).run(test_suite)
|
||||
|
||||
Info('The following delta updates are required.')
|
||||
jobs = []
|
||||
args = []
|
||||
for target, srcs in GenerateVirtualAUDeltasTest.delta_list.items():
|
||||
for src in srcs:
|
||||
if src:
|
||||
print >> sys.stderr, 'DELTA AU %s -> %s' % (src, target)
|
||||
else:
|
||||
print >> sys.stderr, 'FULL AU %s' % target
|
||||
|
||||
jobs.append(_GenerateVMUpdate)
|
||||
args.append((target, src))
|
||||
|
||||
results = _RunParallelJobs(options.jobs, jobs, args)
|
||||
return results
|
||||
|
||||
|
||||
def main():
|
||||
parser = optparse.OptionParser()
|
||||
parser.add_option('-b', '--base_image',
|
||||
@ -619,10 +755,12 @@ def main():
|
||||
help='Disable using delta updates.')
|
||||
parser.add_option('--no_graphics', action='store_true',
|
||||
help='Disable graphics for the vm test.')
|
||||
parser.add_option('-m', '--remote',
|
||||
help='Remote address for real test.')
|
||||
parser.add_option('-j', '--jobs', default=8, type=int,
|
||||
help='Number of simultaneous jobs')
|
||||
parser.add_option('-q', '--quick_test', default=False, action='store_true',
|
||||
help='Use a basic test to verify image.')
|
||||
parser.add_option('-m', '--remote',
|
||||
help='Remote address for real test.')
|
||||
parser.add_option('-t', '--target_image',
|
||||
help='path to the target image.')
|
||||
parser.add_option('--test_prefix', default='test',
|
||||
@ -642,15 +780,18 @@ def main():
|
||||
elif options.type == 'real': test_class = RealAUTest
|
||||
else: parser.error('Could not parse harness type %s.' % options.type)
|
||||
|
||||
test_class.ProcessOptions(parser, options)
|
||||
# TODO(sosa): Caching doesn't really make sense on non-vm images (yet).
|
||||
if options.type == 'vm':
|
||||
_PregenerateUpdates(parser, options)
|
||||
|
||||
# Run the test suite.
|
||||
test_class.ProcessOptions(parser, options)
|
||||
test_loader = unittest.TestLoader()
|
||||
test_loader.testMethodPrefix = options.test_prefix
|
||||
test_suite = test_loader.loadTestsFromTestCase(test_class)
|
||||
test_result = unittest.TextTestRunner(verbosity=2).run(test_suite)
|
||||
|
||||
if not test_result.wasSuccessful():
|
||||
Die('Test harness was not successful')
|
||||
Die('Test harness was not successful.')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
Loading…
x
Reference in New Issue
Block a user