From 145b4e04125587a31e8b38f0229628061245ba9b Mon Sep 17 00:00:00 2001 From: David James Date: Wed, 6 Oct 2010 11:05:29 -0700 Subject: [PATCH] Cleanup parallel_emerge exit conditions to fix hangs. I've cleaned up parallel_emerge to send explicit signals to children to tell them when they need to exit. I also cleaned up the CTRL-C handling to correctly print data when interrupted by CTRL-C (previously, the print thread exited first, so the data we wanted on the failure wasn't actually printed.) BUG=chromium-os:5976 TEST=Test several board emerges, including exiting early with CTRL-C. Change-Id: Iab6efc8e1bf868106244a6210bd02e9e8283af37 Review URL: http://codereview.chromium.org/3534006 --- parallel_emerge | 65 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 50 insertions(+), 15 deletions(-) diff --git a/parallel_emerge b/parallel_emerge index 9a77ef45c4..37214241e1 100755 --- a/parallel_emerge +++ b/parallel_emerge @@ -40,6 +40,7 @@ Basic operation: import codecs import copy +import errno import multiprocessing import os import Queue @@ -1308,6 +1309,12 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db): # Wait for a new item to show up on the queue. This is a blocking wait, # so if there's nothing to do, we just sit here. target = task_queue.get() + if not target: + # If target is None, this means that the main thread wants us to quit. + # The other workers need to exit too, so we'll push the message back on + # to the queue so they'll get it too. + task_queue.put(target) + return db_pkg = package_db[target] db_pkg.root_config = emerge.root_config install_list = [db_pkg] @@ -1412,14 +1419,33 @@ class JobPrinter(object): def PrintWorker(queue): """A worker that prints stuff to the screen as requested.""" - SetupWorkerSignals() + + def ExitHandler(signum, frame): + # Switch to default signal handlers so that we'll die after two signals. + signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + + # Don't exit on the first SIGINT / SIGTERM, because the parent worker will + # handle it and tell us when we need to exit. + signal.signal(signal.SIGINT, ExitHandler) + signal.signal(signal.SIGTERM, ExitHandler) + + # seek_locations is a map indicating the position we are at in each file. + # It starts off empty, but is set by the various Print jobs as we go along + # to indicate where we left off in each file. seek_locations = {} while True: - job = queue.get() - if job: - job.Print(seek_locations) - else: - break + try: + job = queue.get() + if job: + job.Print(seek_locations) + else: + break + except IOError as ex: + if ex.errno == errno.EINTR: + # Looks like we received a signal. Keep printing. + continue + raise class EmergeQueue(object): @@ -1490,9 +1516,8 @@ class EmergeQueue(object): # Notify the user that we are exiting self._Print("Exiting on signal %s" % signum) - # Exit when print worker is done. - self._print_queue.put(None) - self._print_worker.join() + # Kill child threads, then exit. + self._Exit() sys.exit(1) # Print out job status when we are killed @@ -1558,6 +1583,17 @@ class EmergeQueue(object): self._Schedule(target) self._Print("Retrying emerge of %s." % target) + def _Exit(self): + # Tell emerge workers to exit. They all exit when 'None' is pushed + # to the queue. + self._emerge_queue.put(None) + self._pool.close() + self._pool.join() + + # Now that our workers are finished, we can kill the print queue. + self._print_queue.put(None) + self._print_worker.join() + def Run(self): """Run through the scheduled ebuilds. @@ -1574,9 +1610,8 @@ class EmergeQueue(object): if self._retry_queue: self._Retry() else: - # Tell the print worker we're done, and wait for it to exit. - self._print_queue.put(None) - self._print_worker.join() + # Tell child threads to exit. + self._Exit() # The dependency map is helpful for debugging failures. PrintDepsMap(self._deps_map) @@ -1637,9 +1672,9 @@ class EmergeQueue(object): # Print an update. self._Status() - # Tell the print worker we're done, and wait for it to exit. - self._print_queue.put(None) - self._print_worker.join() + # Tell child threads to exit. + self._Print("Merge complete") + self._Exit() def main():