diff --git a/bin/cros_au_test_harness.py b/bin/cros_au_test_harness.py index b85fbb7ddb..b9578a28eb 100755 --- a/bin/cros_au_test_harness.py +++ b/bin/cros_au_test_harness.py @@ -701,11 +701,11 @@ class ParallelJob(threading.Thread): try: self._output = self._target(*self._args) finally: - # From threading.py to avoid a refcycle. - del self._target, self._args # Our own clean up. self._Cleanup() self._completed = True + # From threading.py to avoid a refcycle. + del self._target, self._args def GetOutput(self): """Returns the output of the method run.""" @@ -714,6 +714,7 @@ class ParallelJob(threading.Thread): def _Cleanup(self): """Releases semaphores for a waiting caller.""" + Info('Completed job %s' % self) self._starting_semaphore.release() self._ending_semaphore.release() @@ -763,13 +764,15 @@ def _GenerateUpdateId(target, src): return target -def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args): +def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args, print_status): + """Runs set number of specified jobs in parallel. Args: number_of_simultaneous_jobs: Max number of threads to be run in parallel. jobs: Array of methods to run. jobs_args: Array of args associated with method calls. + print_status: True if you'd like this to print out .'s as it runs jobs. Returns: Returns an array of results corresponding to each thread. """ @@ -787,6 +790,10 @@ def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args): args=args) threads.append(thread) + # Cache sudo access. + RunCommand(['sudo', 'echo', 'Starting test harness'], + print_cmd=False, redirect_stdout=True, redirect_stderr=True) + # We use a semaphore to ensure we don't run more jobs that required. # After each thread finishes, it releases (increments semaphore). # Acquire blocks of num jobs reached and continues when a thread finishes. @@ -796,8 +803,12 @@ def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args): next_thread.start() # Wait on the rest of the threads to finish. + Info('Waiting for threads to complete.') for thread in threads: - join_semaphore.acquire(blocking=True) + while not join_semaphore.acquire(blocking=False): + time.sleep(5) + if print_status: + print >> sys.stderr, '.', return [thread.GetOutput() for thread in threads] @@ -820,6 +831,8 @@ def _PregenerateUpdates(parser, options): options: options from parsed parser. Returns: Dictionary of Update Identifiers->Relative cache locations. + Raises: + UpdateException if we fail to generate an update. """ def _GenerateVMUpdate(target, src): """Generates an update using the devserver.""" @@ -827,15 +840,16 @@ def _PregenerateUpdates(parser, options): if src: src = ReinterpretPathForChroot(src) - return RunCommand(['sudo', - './start_devserver', - '--pregenerate_update', - '--exit', - '--image=%s' % target, - '--src_image=%s' % src, - '--for_vm', - ], redirect_stdout=True, enter_chroot=True, - print_cmd=False) + return RunCommandCaptureOutput(['sudo', + './start_devserver', + '--pregenerate_update', + '--exit', + '--image=%s' % target, + '--src_image=%s' % src, + '--for_vm', + ], combine_stdout_stderr=True, + enter_chroot=True, + print_cmd=False) # Get the list of deltas by mocking out update method in test class. test_suite = _PrepareTestSuite(parser, options, GenerateVirtualAUDeltasTest) @@ -853,23 +867,30 @@ def _PregenerateUpdates(parser, options): jobs.append(_GenerateVMUpdate) args.append((target, src)) - raw_results = _RunParallelJobs(options.jobs, jobs, args) + raw_results = _RunParallelJobs(options.jobs, jobs, args, print_status=True) results = [] - # Parse the output. + # Looking for this line in the output. key_line_re = re.compile('^PREGENERATED_UPDATE=([\w/.]+)') for result in raw_results: - for line in result.splitlines(): - match = key_line_re.search(line) - if match: - # Convert blah/blah/update.gz -> update/blah/blah. - path_to_update_gz = match.group(1).rstrip() - (path_to_update_dir, _, _) = path_to_update_gz.rpartition('/update.gz') - results.append('/'.join(['update', path_to_update_dir])) - break + (return_code, output, _) = result + if return_code != 0: + Warning(output) + raise UpdateException(return_code, 'Failed to generate all updates.') + else: + for line in output.splitlines(): + match = key_line_re.search(line) + if match: + # Convert blah/blah/update.gz -> update/blah/blah. + path_to_update_gz = match.group(1).rstrip() + (path_to_update_dir, _, _) = path_to_update_gz.rpartition( + '/update.gz') + results.append('/'.join(['update', path_to_update_dir])) + break - assert len(raw_results) == len(results), \ - 'Insufficient number cache directories returned.' + # Make sure all generation of updates returned cached locations. + if len(raw_results) != len(results): + raise UpdateException(1, 'Insufficient number cache directories returned.') # Build the dictionary from our id's and returned cache paths. cache_dictionary = {} @@ -890,7 +911,7 @@ def _RunTestsInParallel(parser, options, test_class): threads.append(unittest.TextTestRunner().run) args.append(test_case) - results = _RunParallelJobs(options.jobs, threads, args) + results = _RunParallelJobs(options.jobs, threads, args, print_status=False) if not (test_result.wasSuccessful() for test_result in results): Die('Test harness was not successful')