diff --git a/bin/cros_au_test_harness.py b/bin/cros_au_test_harness.py index f274b14e1a..42dcaaab93 100755 --- a/bin/cros_au_test_harness.py +++ b/bin/cros_au_test_harness.py @@ -16,7 +16,7 @@ import optparse import os import re import sys -import thread +import threading import time import unittest import urllib @@ -134,9 +134,10 @@ class AUTest(object): # Will raise ValueError if expected is not found. if re.search(re.escape(expected_msg), err.stdout, re.MULTILINE): return + else: + Warning("Didn't find '%s' in:" % expected_msg) + Warning(err.stdout) - Warning("Didn't find '%s' in:" % expected_msg) - Warning(err.stdout) self.fail('We managed to update when failure was expected') def AttemptUpdateWithFilter(self, filter, proxy_port=8081): @@ -155,7 +156,7 @@ class AUTest(object): # This update is expected to fail... try: - self.PerformUpdate(self.target_image_path, self.base_image_path, + self.PerformUpdate(self.target_image_path, self.target_image_path, proxy_port=proxy_port) finally: proxy.shutdown() @@ -248,12 +249,10 @@ class AUTest(object): percent_passed = self.VerifyImage(10) # Update to - all tests should pass on new image. - Info('Updating from base image on vm to target image.') self.PerformUpdate(self.target_image_path, self.base_image_path) self.VerifyImage(100) # Update from - same percentage should pass that originally passed. - Info('Updating from updated image on vm back to base image.') self.PerformUpdate(self.base_image_path, self.target_image_path) self.VerifyImage(percent_passed) @@ -271,12 +270,10 @@ class AUTest(object): percent_passed = self.VerifyImage(10) # Update to - all tests should pass on new image. - Info('Updating from base image on vm to target image and wiping stateful.') self.PerformUpdate(self.target_image_path, self.base_image_path, 'clean') self.VerifyImage(100) # Update from - same percentage should pass that originally passed. - Info('Updating from updated image back to base image and wiping stateful.') self.PerformUpdate(self.base_image_path, self.target_image_path, 'clean') self.VerifyImage(percent_passed) @@ -508,13 +505,15 @@ class VirtualAUTest(unittest.TestCase, AUTest): def PrepareBase(self, image_path): """Creates an update-able VM based on base image.""" + # Needed for VM delta updates. We need to use the qemu image rather + # than the base image on a first update. By tracking the first_update + # we can set src_image to the qemu form of the base image when + # performing generating the delta payload. + self._first_update = True self.vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname( image_path) - - Info('Creating: %s' % self.vm_image_path) - if not os.path.exists(self.vm_image_path): - Info('Qemu image %s not found, creating one.' % self.vm_image_path) + Info('Creating %s' % vm_image_path) RunCommand(['%s/image_to_vm.sh' % self.crosutils, '--full', '--from=%s' % ReinterpretPathForChroot( @@ -523,18 +522,17 @@ class VirtualAUTest(unittest.TestCase, AUTest): '--statefulfs_size=%s' % self._FULL_STATEFULFS_SIZE, '--board=%s' % self.board, '--test_image'], enter_chroot=True) - else: - Info('Using existing VM image %s' % self.vm_image_path) - Info('Testing for %s' % self.vm_image_path) + Info('Using %s as base' % self.vm_image_path) self.assertTrue(os.path.exists(self.vm_image_path)) def _UpdateImage(self, image_path, src_image_path='', stateful_change='old', proxy_port=None): """Updates VM image with image_path.""" stateful_change_flag = self.GetStatefulChangeFlag(stateful_change) - if src_image_path == self.base_image_path: + if src_image_path and self._first_update: src_image_path = self.vm_image_path + self._first_update = False cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, '--update_image_path=%s' % image_path, @@ -608,6 +606,144 @@ class VirtualAUTest(unittest.TestCase, AUTest): return self.AssertEnoughTestsPassed(self, output, percent_required_to_pass) +class GenerateVirtualAUDeltasTest(VirtualAUTest): + """Class the overrides VirtualAUTest and stores deltas we will generate.""" + delta_list = {} + + def setUp(self): + AUTest.setUp(self) + + def _UpdateImage(self, image_path, src_image_path='', stateful_change='old', + proxy_port=None): + if src_image_path and self._first_update: + src_image_path = self.vm_image_path + self._first_update = False + + image_path = ReinterpretPathForChroot(image_path) + if src_image_path: + src_image_path = ReinterpretPathForChroot(src_image_path) + if not self.delta_list.has_key(image_path): + self.delta_list[image_path] = set([src_image_path]) + else: + self.delta_list[image_path].add(src_image_path) + + def AttemptUpdateWithPayloadExpectedFailure(self, payload, expected_msg): + pass + + def VerifyImage(self, percent_required_to_pass): + pass + + +class ParallelJob(threading.Thread): + """Small wrapper for threading.Thread that releases a semaphore on exit.""" + def __init__(self, semaphore, target, args): + threading.Thread.__init__(self, target=target, args=args) + self._target = target + self._args = args + self._semaphore = semaphore + self._output = None + self._completed = False + + def run(self): + try: + threading.Thread.run(self) + finally: + self._Cleanup() + self._completed = True + + def GetOutput(self): + assert self._completed, 'GetOutput called before thread was run.' + return self._output + + def _Cleanup(self): + self._semaphore.release() + + def __str__(self): + return '%s(%s)' % (self._target, self._args) + + +def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args): + """Runs set number of specified jobs in parallel. + + Args: + number_of_simultaneous_jobs: Max number of threads to be run in parallel. + jobs: Array of methods to run. + jobs_args: Array of args associated with method calls. + Returns: + Returns an array of results corresponding to each thread. + """ + def _TwoTupleize(x, y): + return (x, y) + + threads = [] + job_pool_semaphore = threading.Semaphore(number_of_sumultaneous_jobs) + assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' + + # Create the parallel jobs. + for job, args in map(_TwoTupleize, jobs, jobs_args): + thread = ParallelJob(job_pool_semaphore, target=job, args=args) + threads.append(thread) + + # We use a semaphore to ensure we don't run more jobs that required. + # After each thread finishes, it releases (increments semaphore). + # Acquire blocks of num jobs reached and continues when a thread finishes. + for next_thread in threads: + job_pool_semaphore.acquire(blocking=True) + Info('Starting %s' % next_thread) + next_thread.start() + + # Wait on the rest of the threads to finish. + for thread in threads: + thread.join() + + return [thread.GetOutput() for thread in threads] + + +def _PregenerateUpdates(parser, options): + """Determines all deltas that will be generated and generates them. + + This method effectively pre-generates the dev server cache for all tests. + + Args: + parser: parser from main. + options: options from parsed parser. + Returns: + Array of output from generating updates. + """ + def _GenerateVMUpdate(target, src): + """Generates an update using the devserver.""" + RunCommand(['./start_devserver', + '--pregenerate_update', + '--exit', + '--image=%s' % target, + '--src_image=%s' % src, + '--for_vm' + ]) + + # Get the list of deltas by mocking out update method in test class. + GenerateVirtualAUDeltasTest.ProcessOptions(parser, options) + test_loader = unittest.TestLoader() + test_loader.testMethodPrefix = options.test_prefix + test_suite = test_loader.loadTestsFromTestCase(GenerateVirtualAUDeltasTest) + test_result = unittest.TextTestRunner(verbosity=0).run(test_suite) + + Info('The following delta updates are required.') + jobs = [] + args = [] + for target, srcs in GenerateVirtualAUDeltasTest.delta_list.items(): + for src in srcs: + if src: + print >> sys.stderr, 'DELTA AU %s -> %s' % (src, target) + else: + print >> sys.stderr, 'FULL AU %s' % target + + jobs.append(_GenerateVMUpdate) + args.append((target, src)) + + results = _RunParallelJobs(options.jobs, jobs, args) + return results + + def main(): parser = optparse.OptionParser() parser.add_option('-b', '--base_image', @@ -619,10 +755,12 @@ def main(): help='Disable using delta updates.') parser.add_option('--no_graphics', action='store_true', help='Disable graphics for the vm test.') - parser.add_option('-m', '--remote', - help='Remote address for real test.') + parser.add_option('-j', '--jobs', default=8, type=int, + help='Number of simultaneous jobs') parser.add_option('-q', '--quick_test', default=False, action='store_true', help='Use a basic test to verify image.') + parser.add_option('-m', '--remote', + help='Remote address for real test.') parser.add_option('-t', '--target_image', help='path to the target image.') parser.add_option('--test_prefix', default='test', @@ -642,15 +780,18 @@ def main(): elif options.type == 'real': test_class = RealAUTest else: parser.error('Could not parse harness type %s.' % options.type) - test_class.ProcessOptions(parser, options) + # TODO(sosa): Caching doesn't really make sense on non-vm images (yet). + if options.type == 'vm': + _PregenerateUpdates(parser, options) + # Run the test suite. + test_class.ProcessOptions(parser, options) test_loader = unittest.TestLoader() test_loader.testMethodPrefix = options.test_prefix test_suite = test_loader.loadTestsFromTestCase(test_class) test_result = unittest.TextTestRunner(verbosity=2).run(test_suite) - if not test_result.wasSuccessful(): - Die('Test harness was not successful') + Die('Test harness was not successful.') if __name__ == '__main__':