This change squashes output while pregenerationg updates.

It also caches sudo credentials before running jobs (for devs) and
exits if any updater returns an error code != 0 with the output
of pregenerate update

BUG=chromium-os:10723
TEST=Ran it ... killed one pregeneration to make sure it did print + exit
correctly

Review URL: http://codereview.chromium.org/6381013

Change-Id: I04842211e469b3a0022cb53b78efd17fbebe7010
This commit is contained in:
Chris Sosa 2011-01-26 18:44:59 -08:00
parent 30acb0b926
commit fb58ea5d96

View File

@ -701,11 +701,11 @@ class ParallelJob(threading.Thread):
try:
self._output = self._target(*self._args)
finally:
# From threading.py to avoid a refcycle.
del self._target, self._args
# Our own clean up.
self._Cleanup()
self._completed = True
# From threading.py to avoid a refcycle.
del self._target, self._args
def GetOutput(self):
"""Returns the output of the method run."""
@ -714,6 +714,7 @@ class ParallelJob(threading.Thread):
def _Cleanup(self):
"""Releases semaphores for a waiting caller."""
Info('Completed job %s' % self)
self._starting_semaphore.release()
self._ending_semaphore.release()
@ -763,13 +764,15 @@ def _GenerateUpdateId(target, src):
return target
def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args):
def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args, print_status):
"""Runs set number of specified jobs in parallel.
Args:
number_of_simultaneous_jobs: Max number of threads to be run in parallel.
jobs: Array of methods to run.
jobs_args: Array of args associated with method calls.
print_status: True if you'd like this to print out .'s as it runs jobs.
Returns:
Returns an array of results corresponding to each thread.
"""
@ -787,6 +790,10 @@ def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args):
args=args)
threads.append(thread)
# Cache sudo access.
RunCommand(['sudo', 'echo', 'Starting test harness'],
print_cmd=False, redirect_stdout=True, redirect_stderr=True)
# We use a semaphore to ensure we don't run more jobs that required.
# After each thread finishes, it releases (increments semaphore).
# Acquire blocks of num jobs reached and continues when a thread finishes.
@ -796,8 +803,12 @@ def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args):
next_thread.start()
# Wait on the rest of the threads to finish.
Info('Waiting for threads to complete.')
for thread in threads:
join_semaphore.acquire(blocking=True)
while not join_semaphore.acquire(blocking=False):
time.sleep(5)
if print_status:
print >> sys.stderr, '.',
return [thread.GetOutput() for thread in threads]
@ -820,6 +831,8 @@ def _PregenerateUpdates(parser, options):
options: options from parsed parser.
Returns:
Dictionary of Update Identifiers->Relative cache locations.
Raises:
UpdateException if we fail to generate an update.
"""
def _GenerateVMUpdate(target, src):
"""Generates an update using the devserver."""
@ -827,15 +840,16 @@ def _PregenerateUpdates(parser, options):
if src:
src = ReinterpretPathForChroot(src)
return RunCommand(['sudo',
'./start_devserver',
'--pregenerate_update',
'--exit',
'--image=%s' % target,
'--src_image=%s' % src,
'--for_vm',
], redirect_stdout=True, enter_chroot=True,
print_cmd=False)
return RunCommandCaptureOutput(['sudo',
'./start_devserver',
'--pregenerate_update',
'--exit',
'--image=%s' % target,
'--src_image=%s' % src,
'--for_vm',
], combine_stdout_stderr=True,
enter_chroot=True,
print_cmd=False)
# Get the list of deltas by mocking out update method in test class.
test_suite = _PrepareTestSuite(parser, options, GenerateVirtualAUDeltasTest)
@ -853,23 +867,30 @@ def _PregenerateUpdates(parser, options):
jobs.append(_GenerateVMUpdate)
args.append((target, src))
raw_results = _RunParallelJobs(options.jobs, jobs, args)
raw_results = _RunParallelJobs(options.jobs, jobs, args, print_status=True)
results = []
# Parse the output.
# Looking for this line in the output.
key_line_re = re.compile('^PREGENERATED_UPDATE=([\w/.]+)')
for result in raw_results:
for line in result.splitlines():
match = key_line_re.search(line)
if match:
# Convert blah/blah/update.gz -> update/blah/blah.
path_to_update_gz = match.group(1).rstrip()
(path_to_update_dir, _, _) = path_to_update_gz.rpartition('/update.gz')
results.append('/'.join(['update', path_to_update_dir]))
break
(return_code, output, _) = result
if return_code != 0:
Warning(output)
raise UpdateException(return_code, 'Failed to generate all updates.')
else:
for line in output.splitlines():
match = key_line_re.search(line)
if match:
# Convert blah/blah/update.gz -> update/blah/blah.
path_to_update_gz = match.group(1).rstrip()
(path_to_update_dir, _, _) = path_to_update_gz.rpartition(
'/update.gz')
results.append('/'.join(['update', path_to_update_dir]))
break
assert len(raw_results) == len(results), \
'Insufficient number cache directories returned.'
# Make sure all generation of updates returned cached locations.
if len(raw_results) != len(results):
raise UpdateException(1, 'Insufficient number cache directories returned.')
# Build the dictionary from our id's and returned cache paths.
cache_dictionary = {}
@ -890,7 +911,7 @@ def _RunTestsInParallel(parser, options, test_class):
threads.append(unittest.TextTestRunner().run)
args.append(test_case)
results = _RunParallelJobs(options.jobs, threads, args)
results = _RunParallelJobs(options.jobs, threads, args, print_status=False)
if not (test_result.wasSuccessful() for test_result in results):
Die('Test harness was not successful')