From f2ee5f68716e17e6531c6decb2113e0639a012f6 Mon Sep 17 00:00:00 2001 From: Chris Sosa Date: Wed, 26 Jan 2011 11:38:14 -0800 Subject: [PATCH] Passes cache location to tests and runs the tests in parallel. As a warning, this is a pretty big change. At a high-level, this changes the harness to move the managing of the devserver from image_to_live into the actual test harness. Paths of the cache locations (for archive_url) are taken when pre-generating the updates and stored in a dictionary (maps "path_to_base->path_to_target" (or path for full updates)-> cache paths). This change also has the tests run in parallel. Because we now start X number of VM's at once, each VM needs it's own pid file and ssh_port. This logic was added as well as running the actual tests in parallel. Change-Id: I1275d79740c50c2a8028489b43dcbbcf5bbd56c4 BUG=chromium-os:10723 TEST=Ran it ... a lot with -q but without a test_prefix (so full test suite). Committed: http://chrome-svn/viewvc/chromeos?view=rev&revision=c418a8f Committed: http://chrome-svn/viewvc/chromeos?view=rev&revision=be787f3 Review URL: http://codereview.chromium.org/6277015 --- bin/cros_au_test_harness.py | 280 ++++++++++++++++++++++++++++-------- bin/cros_run_vm_update | 6 +- image_to_live.sh | 2 +- lib/cros_build_lib.py | 19 +++ lib/cros_vm_lib.sh | 2 - 5 files changed, 241 insertions(+), 68 deletions(-) diff --git a/bin/cros_au_test_harness.py b/bin/cros_au_test_harness.py index ab71232b7e..b85fbb7ddb 100755 --- a/bin/cros_au_test_harness.py +++ b/bin/cros_au_test_harness.py @@ -15,6 +15,7 @@ import optparse import os import re +import subprocess import sys import threading import time @@ -23,6 +24,7 @@ import urllib sys.path.append(os.path.join(os.path.dirname(__file__), '../lib')) from cros_build_lib import Die +from cros_build_lib import GetIPAddress from cros_build_lib import Info from cros_build_lib import ReinterpretPathForChroot from cros_build_lib import RunCommand @@ -31,6 +33,8 @@ from cros_build_lib import Warning import cros_test_proxy +global dev_server_cache + class UpdateException(Exception): """Exception thrown when _UpdateImage or _UpdateUsingPayload fail""" @@ -469,12 +473,14 @@ class RealAUTest(unittest.TestCase, AUTest): class VirtualAUTest(unittest.TestCase, AUTest): """Test harness for updating virtual machines.""" - vm_image_path = None # VM Constants. _FULL_VDISK_SIZE = 6072 _FULL_STATEFULFS_SIZE = 3074 - _KVM_PID_FILE = '/tmp/harness_pid' + + # Class variables used to acquire individual VM variables per test. + _vm_lock = threading.Lock() + _next_port = 9222 def _KillExistingVM(self, pid_file): if os.path.exists(pid_file): @@ -485,10 +491,22 @@ class VirtualAUTest(unittest.TestCase, AUTest): assert not os.path.exists(pid_file) + def _AcquireUniquePortAndPidFile(self): + """Acquires unique ssh port and pid file for VM.""" + with VirtualAUTest._vm_lock: + self._ssh_port = VirtualAUTest._next_port + self._kvm_pid_file = '/tmp/kvm.%d' % self._ssh_port + VirtualAUTest._next_port += 1 + def setUp(self): """Unit test overriden method. Is called before every test.""" AUTest.setUp(self) - self._KillExistingVM(self._KVM_PID_FILE) + self.vm_image_path = None + self._AcquireUniquePortAndPidFile() + self._KillExistingVM(self._kvm_pid_file) + + def tearDown(self): + self._KillExistingVM(self._kvm_pid_file) @classmethod def ProcessOptions(cls, parser, options): @@ -527,26 +545,42 @@ class VirtualAUTest(unittest.TestCase, AUTest): self.assertTrue(os.path.exists(self.vm_image_path)) def _UpdateImage(self, image_path, src_image_path='', stateful_change='old', - proxy_port=None): + proxy_port=''): """Updates VM image with image_path.""" stateful_change_flag = self.GetStatefulChangeFlag(stateful_change) if src_image_path and self._first_update: src_image_path = self.vm_image_path self._first_update = False - cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, - '--update_image_path=%s' % image_path, - '--vm_image_path=%s' % self.vm_image_path, - '--snapshot', - self.graphics_flag, - '--persist', - '--kvm_pid=%s' % self._KVM_PID_FILE, - stateful_change_flag, - '--src_image=%s' % src_image_path, - ] - - if proxy_port: - cmd.append('--proxy_port=%s' % proxy_port) + # Check image payload cache first. + update_id = _GenerateUpdateId(target=image_path, src=src_image_path) + cache_path = dev_server_cache[update_id] + if cache_path: + Info('Using cache %s' % cache_path) + update_url = DevServerWrapper.GetDevServerURL(proxy_port, cache_path) + cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, + '--vm_image_path=%s' % self.vm_image_path, + '--snapshot', + self.graphics_flag, + '--persist', + '--kvm_pid=%s' % self._kvm_pid_file, + '--ssh_port=%s' % self._ssh_port, + stateful_change_flag, + '--update_url=%s' % update_url, + ] + else: + cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, + '--update_image_path=%s' % image_path, + '--vm_image_path=%s' % self.vm_image_path, + '--snapshot', + self.graphics_flag, + '--persist', + '--kvm_pid=%s' % self._kvm_pid_file, + '--ssh_port=%s' % self._ssh_port, + stateful_change_flag, + '--src_image=%s' % src_image_path, + '--proxy_port=%s' % proxy_port + ] if self.verbose: try: @@ -568,7 +602,8 @@ class VirtualAUTest(unittest.TestCase, AUTest): '--snapshot', self.graphics_flag, '--persist', - '--kvm_pid=%s' % self._KVM_PID_FILE, + '--kvm_pid=%s' % self._kvm_pid_file, + '--ssh_port=%s' % self._ssh_port, stateful_change_flag, ] @@ -594,7 +629,8 @@ class VirtualAUTest(unittest.TestCase, AUTest): '--image_path=%s' % self.vm_image_path, '--snapshot', '--persist', - '--kvm_pid=%s' % self._KVM_PID_FILE, + '--kvm_pid=%s' % self._kvm_pid_file, + '--ssh_port=%s' % self._ssh_port, self.verify_suite, ] @@ -613,15 +649,15 @@ class GenerateVirtualAUDeltasTest(VirtualAUTest): def setUp(self): AUTest.setUp(self) + def tearDown(self): + pass + def _UpdateImage(self, image_path, src_image_path='', stateful_change='old', proxy_port=None): if src_image_path and self._first_update: src_image_path = self.vm_image_path self._first_update = False - image_path = ReinterpretPathForChroot(image_path) - if src_image_path: - src_image_path = ReinterpretPathForChroot(src_image_path) if not self.delta_list.has_key(image_path): self.delta_list[image_path] = set([src_image_path]) else: @@ -635,33 +671,98 @@ class GenerateVirtualAUDeltasTest(VirtualAUTest): class ParallelJob(threading.Thread): - """Small wrapper for threading.Thread that releases a semaphore on exit.""" - def __init__(self, semaphore, target, args): + """Small wrapper for threading. Thread that releases a semaphores on exit.""" + + def __init__(self, starting_semaphore, ending_semaphore, target, args): + """Initializes an instance of a job. + + Args: + starting_semaphore: Semaphore used by caller to wait on such that + there isn't more than a certain number of threads running. Should + be initialized to a value for the number of threads wanting to be run + at a time. + ending_semaphore: Semaphore is released every time a job ends. Should be + initialized to 0 before starting first job. Should be acquired once for + each job. Threading.Thread.join() has a bug where if the run function + terminates too quickly join() will hang forever. + target: The func to run. + args: Args to pass to the fun. + """ threading.Thread.__init__(self, target=target, args=args) self._target = target self._args = args - self._semaphore = semaphore + self._starting_semaphore = starting_semaphore + self._ending_semaphore = ending_semaphore self._output = None self._completed = False def run(self): + """Thread override. Runs the method specified and sets output.""" try: - threading.Thread.run(self) + self._output = self._target(*self._args) finally: + # From threading.py to avoid a refcycle. + del self._target, self._args + # Our own clean up. self._Cleanup() self._completed = True def GetOutput(self): + """Returns the output of the method run.""" assert self._completed, 'GetOutput called before thread was run.' return self._output def _Cleanup(self): - self._semaphore.release() + """Releases semaphores for a waiting caller.""" + self._starting_semaphore.release() + self._ending_semaphore.release() def __str__(self): return '%s(%s)' % (self._target, self._args) +class DevServerWrapper(threading.Thread): + """A Simple wrapper around a devserver instance.""" + + def __init__(self): + self.proc = None + threading.Thread.__init__(self) + + def run(self): + # Kill previous running instance of devserver if it exists. + RunCommand(['sudo', 'pkill', '-f', 'devserver.py'], error_ok=True, + print_cmd=False) + RunCommand(['sudo', + './start_devserver', + '--archive_dir=./static', + '--client_prefix=ChromeOSUpdateEngine', + '--production', + ], enter_chroot=True, print_cmd=False) + + def Stop(self): + """Kills the devserver instance.""" + RunCommand(['sudo', 'pkill', '-f', 'devserver.py'], error_ok=True, + print_cmd=False) + + @classmethod + def GetDevServerURL(cls, port, sub_dir): + """Returns the dev server url for a given port and sub directory.""" + ip_addr = GetIPAddress() + if not port: port = 8080 + url = 'http://%(ip)s:%(port)s/%(dir)s' % {'ip': ip_addr, + 'port': str(port), + 'dir': sub_dir} + return url + + +def _GenerateUpdateId(target, src): + """Returns a simple representation id of target and src paths.""" + if src: + return '%s->%s' % (target, src) + else: + return target + + def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args): """Runs set number of specified jobs in parallel. @@ -676,29 +777,39 @@ def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args): return (x, y) threads = [] - job_pool_semaphore = threading.Semaphore(number_of_sumultaneous_jobs) + job_start_semaphore = threading.Semaphore(number_of_sumultaneous_jobs) + join_semaphore = threading.Semaphore(0) assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' # Create the parallel jobs. for job, args in map(_TwoTupleize, jobs, jobs_args): - thread = ParallelJob(job_pool_semaphore, target=job, args=args) + thread = ParallelJob(job_start_semaphore, join_semaphore, target=job, + args=args) threads.append(thread) # We use a semaphore to ensure we don't run more jobs that required. # After each thread finishes, it releases (increments semaphore). # Acquire blocks of num jobs reached and continues when a thread finishes. for next_thread in threads: - job_pool_semaphore.acquire(blocking=True) - Info('Starting %s' % next_thread) + job_start_semaphore.acquire(blocking=True) + Info('Starting job %s' % next_thread) next_thread.start() # Wait on the rest of the threads to finish. for thread in threads: - thread.join() + join_semaphore.acquire(blocking=True) return [thread.GetOutput() for thread in threads] +def _PrepareTestSuite(parser, options, test_class): + """Returns a prepared test suite given by the options and test class.""" + test_class.ProcessOptions(parser, options) + test_loader = unittest.TestLoader() + test_loader.testMethodPrefix = options.test_prefix + return test_loader.loadTestsFromTestCase(test_class) + + def _PregenerateUpdates(parser, options): """Determines all deltas that will be generated and generates them. @@ -708,41 +819,80 @@ def _PregenerateUpdates(parser, options): parser: parser from main. options: options from parsed parser. Returns: - Array of output from generating updates. + Dictionary of Update Identifiers->Relative cache locations. """ def _GenerateVMUpdate(target, src): """Generates an update using the devserver.""" - RunCommand(['sudo', - './start_devserver', - '--pregenerate_update', - '--exit', - '--image=%s' % target, - '--src_image=%s' % src, - '--for_vm' - ], enter_chroot=True) + target = ReinterpretPathForChroot(target) + if src: + src = ReinterpretPathForChroot(src) + + return RunCommand(['sudo', + './start_devserver', + '--pregenerate_update', + '--exit', + '--image=%s' % target, + '--src_image=%s' % src, + '--for_vm', + ], redirect_stdout=True, enter_chroot=True, + print_cmd=False) # Get the list of deltas by mocking out update method in test class. - GenerateVirtualAUDeltasTest.ProcessOptions(parser, options) - test_loader = unittest.TestLoader() - test_loader.testMethodPrefix = options.test_prefix - test_suite = test_loader.loadTestsFromTestCase(GenerateVirtualAUDeltasTest) + test_suite = _PrepareTestSuite(parser, options, GenerateVirtualAUDeltasTest) test_result = unittest.TextTestRunner(verbosity=0).run(test_suite) Info('The following delta updates are required.') + update_ids = [] jobs = [] args = [] for target, srcs in GenerateVirtualAUDeltasTest.delta_list.items(): for src in srcs: - if src: - print >> sys.stderr, 'DELTA AU %s -> %s' % (src, target) - else: - print >> sys.stderr, 'FULL AU %s' % target - + update_id = _GenerateUpdateId(target=target, src=src) + print >> sys.stderr, 'AU: %s' % update_id + update_ids.append(update_id) jobs.append(_GenerateVMUpdate) args.append((target, src)) - results = _RunParallelJobs(options.jobs, jobs, args) - return results + raw_results = _RunParallelJobs(options.jobs, jobs, args) + results = [] + + # Parse the output. + key_line_re = re.compile('^PREGENERATED_UPDATE=([\w/.]+)') + for result in raw_results: + for line in result.splitlines(): + match = key_line_re.search(line) + if match: + # Convert blah/blah/update.gz -> update/blah/blah. + path_to_update_gz = match.group(1).rstrip() + (path_to_update_dir, _, _) = path_to_update_gz.rpartition('/update.gz') + results.append('/'.join(['update', path_to_update_dir])) + break + + assert len(raw_results) == len(results), \ + 'Insufficient number cache directories returned.' + + # Build the dictionary from our id's and returned cache paths. + cache_dictionary = {} + for index, id in enumerate(update_ids): + cache_dictionary[id] = results[index] + + return cache_dictionary + + +def _RunTestsInParallel(parser, options, test_class): + """Runs the tests given by the options and test_class in parallel.""" + threads = [] + args = [] + test_suite = _PrepareTestSuite(parser, options, test_class) + for test in test_suite: + test_name = test.id() + test_case = unittest.TestLoader().loadTestsFromName(test_name) + threads.append(unittest.TextTestRunner().run) + args.append(test_case) + + results = _RunParallelJobs(options.jobs, threads, args) + if not (test_result.wasSuccessful() for test_result in results): + Die('Test harness was not successful') def main(): @@ -777,22 +927,28 @@ def main(): if leftover_args: parser.error('Found extra options we do not support: %s' % leftover_args) + # Figure out the test_class. if options.type == 'vm': test_class = VirtualAUTest elif options.type == 'real': test_class = RealAUTest else: parser.error('Could not parse harness type %s.' % options.type) # TODO(sosa): Caching doesn't really make sense on non-vm images (yet). - if options.type == 'vm': - _PregenerateUpdates(parser, options) + global dev_server_cache + if options.type == 'vm' and options.jobs > 1: + dev_server_cache = _PregenerateUpdates(parser, options) + my_server = DevServerWrapper() + my_server.start() + try: + _RunTestsInParallel(parser, options, test_class) + finally: + my_server.Stop() - # Run the test suite. - test_class.ProcessOptions(parser, options) - test_loader = unittest.TestLoader() - test_loader.testMethodPrefix = options.test_prefix - test_suite = test_loader.loadTestsFromTestCase(test_class) - test_result = unittest.TextTestRunner(verbosity=2).run(test_suite) - if not test_result.wasSuccessful(): - Die('Test harness was not successful.') + else: + dev_server_cache = None + test_suite = _PrepareTestSuite(parser, options, test_class) + test_result = unittest.TextTestRunner(verbosity=2).run(test_suite) + if not test_result.wasSuccessful(): + Die('Test harness was not successful.') if __name__ == '__main__': diff --git a/bin/cros_run_vm_update b/bin/cros_run_vm_update index 53617b6f89..05e035683a 100755 --- a/bin/cros_run_vm_update +++ b/bin/cros_run_vm_update @@ -16,6 +16,7 @@ DEFINE_string src_image "" \ "Create a delta update by passing in the image on the remote machine." DEFINE_string stateful_update_flag "" "Flags to pass to stateful update." s DEFINE_string update_image_path "" "Path of the image to update to." u +DEFINE_string update_url "" "Full url of an update image." DEFINE_string vm_image_path "" "Path of the VM image to update from." v set -e @@ -24,8 +25,6 @@ set -e FLAGS "$@" || exit 1 eval set -- "${FLAGS_ARGV}" -[ -n "${FLAGS_update_image_path}" ] || [ -n "${FLAGS_payload}" ] || \ - die "You must specify a path to an image to use as an update." [ -n "${FLAGS_vm_image_path}" ] || \ die "You must specify a path to a vm image." @@ -45,11 +44,12 @@ if [ -n "${FLAGS_proxy_port}" ]; then fi $(dirname $0)/../image_to_live.sh \ + --for_vm \ --remote=127.0.0.1 \ --ssh_port=${FLAGS_ssh_port} \ --stateful_update_flag=${FLAGS_stateful_update_flag} \ --src_image="${FLAGS_src_image}" \ + --update_url="${FLAGS_update_url}" \ --verify \ - --for_vm \ ${IMAGE_ARGS} diff --git a/image_to_live.sh b/image_to_live.sh index bbb5ec904d..a8ce40c6b6 100755 --- a/image_to_live.sh +++ b/image_to_live.sh @@ -25,6 +25,7 @@ DEFINE_boolean update_known_hosts ${FLAGS_FALSE} \ "Update your known_hosts with the new remote instance's key." DEFINE_string update_log "update_engine.log" \ "Path to log for the update_engine." +DEFINE_string update_url "" "Full url of an update image." DEFINE_boolean verify ${FLAGS_TRUE} "Verify image on device after update." # Flags for devserver. @@ -44,7 +45,6 @@ DEFINE_string src_image "" \ "Create a delta update by passing in the image on the remote machine." DEFINE_boolean update_stateful ${FLAGS_TRUE} \ "Perform update of stateful partition e.g. /var /usr/local." -DEFINE_string update_url "" "Full url of an update image." # Flags for stateful update. DEFINE_string stateful_update_flag "" \ diff --git a/lib/cros_build_lib.py b/lib/cros_build_lib.py index 22f7de946e..11fa40cac9 100644 --- a/lib/cros_build_lib.py +++ b/lib/cros_build_lib.py @@ -6,6 +6,7 @@ import inspect import os +import re import subprocess import sys @@ -245,3 +246,21 @@ def ReinterpretPathForChroot(path): new_path = os.path.join('/home', os.getenv('USER'), 'trunk', relative_path) return new_path + + +def GetIPAddress(device='eth0'): + """Returns the IP Address for a given device using ifconfig. + + socket.gethostname() is insufficient for machines where the host files are + not set up "correctly." Since some of our builders may have this issue, + this method gives you a generic way to get the address so you are reachable + either via a VM or remote machine on the same network. + """ + ifconfig_output = RunCommand(['ifconfig', device], redirect_stdout=True, + print_cmd=False) + match = re.search('.*inet addr:(\d+\.\d+\.\d+\.\d+).*', ifconfig_output) + if match: + return match.group(1) + else: + Warning('Failed to find ip address in %s' % ifconfig_output) + return None diff --git a/lib/cros_vm_lib.sh b/lib/cros_vm_lib.sh index b0cd1ee6da..8b70ad2c99 100644 --- a/lib/cros_vm_lib.sh +++ b/lib/cros_vm_lib.sh @@ -38,8 +38,6 @@ function blocking_kill() { ! ps -p ${1} > /dev/null } -# TODO(rtc): These flags assume that we'll be using KVM on Lucid and won't work -# on Hardy. # $1: Path to the virtual image to start. function start_kvm() { # Override default pid file.