diff --git a/parallel_emerge b/parallel_emerge index 35c089cdf5..af552543fe 100755 --- a/parallel_emerge +++ b/parallel_emerge @@ -38,14 +38,17 @@ Basic operation: of the same package for a runtime dep). """ +import codecs import copy import multiprocessing import os import Queue import shlex +import signal import sys import tempfile import time +import traceback import urllib2 # If PORTAGE_USERNAME isn't specified, scrape it from the $HOME variable. On @@ -214,7 +217,7 @@ class DepGraphGenerator(object): """ __slots__ = ["board", "emerge", "mandatory_source", "no_workon_deps", - "package_db", "rebuild"] + "package_db", "rebuild", "show_output"] def __init__(self): self.board = None @@ -223,6 +226,7 @@ class DepGraphGenerator(object): self.no_workon_deps = False self.package_db = {} self.rebuild = False + self.show_output = False def ParseParallelEmergeArgs(self, argv): """Read the parallel emerge arguments from the command-line. @@ -249,6 +253,8 @@ class DepGraphGenerator(object): self.no_workon_deps = True elif arg == "--rebuild": self.rebuild = True + elif arg == "--show-output": + self.show_output = True else: # Not one of our options, so pass through to emerge. emerge_args.append(arg) @@ -1067,20 +1073,72 @@ def PrintDepsMap(deps_map): print " no dependencies" -def EmergeWorker(task_queue, done_queue, emerge, package_db): +class EmergeJobState(object): + __slots__ = ["done", "filename", "last_output_seek", "last_output_timestamp", + "pkgname", "retcode", "start_timestamp", "target"] + + def __init__(self, target, pkgname, done, filename, start_timestamp, + retcode=None): + + # The full name of the target we're building (e.g. + # chromeos-base/chromeos-0.0.1-r60) + self.target = target + + # The short name of the target we're building (e.g. chromeos-0.0.1-r60) + self.pkgname = pkgname + + # Whether the job is done. (True if the job is done; false otherwise.) + self.done = done + + # The filename where output is currently stored. + self.filename = filename + + # The location (in bytes) of the end of the last complete line we printed. + # This starts off at zero. We use this to jump to the right place when we + # print output from the same ebuild multiple times. + self.last_output_seek = 0 + + # The timestamp of the last time we printed output. Since we haven't + # printed output yet, this starts at zero. + self.last_output_timestamp = 0 + + # The return code of our job, if the job is actually finished. + self.retcode = retcode + + # The timestamp when our job started. + self.start_timestamp = start_timestamp + + +def EmergeWorker(task_queue, job_queue, emerge, package_db): """This worker emerges any packages given to it on the task_queue. Args: task_queue: The queue of tasks for this worker to do. - done_queue: The queue of results from the worker. + job_queue: The queue of results from the worker. emerge: An EmergeData() object. package_db: A dict, mapping package ids to portage Package objects. It expects package identifiers to be passed to it via task_queue. When - the package is merged, it pushes (target, retval, outputstr) into the - done_queue. + a task is started, it pushes the (target, filename) to the started_queue. + The output is stored in filename. When a merge starts or finishes, we push + EmergeJobState objects to the job_queue. """ + def ExitHandler(signum, frame): + # Remove our signal handlers so we don't get called recursively. + signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + + # Try to exit cleanly + sys.exit(1) + + # Ensure that we exit quietly and cleanly, if possible, when we receive + # SIGTERM or SIGINT signals. By default, when the user hits CTRL-C, all + # of the child processes will print details about KeyboardInterrupt + # exceptions, which isn't very helpful. + signal.signal(signal.SIGINT, ExitHandler) + signal.signal(signal.SIGTERM, ExitHandler) + settings, trees, mtimedb = emerge.settings, emerge.trees, emerge.mtimedb opts, spinner = emerge.opts, emerge.spinner opts["--nodeps"] = True @@ -1088,14 +1146,16 @@ def EmergeWorker(task_queue, done_queue, emerge, package_db): # Wait for a new item to show up on the queue. This is a blocking wait, # so if there's nothing to do, we just sit here. target = task_queue.get() - print "Emerging", target db_pkg = package_db[target] db_pkg.root_config = emerge.root_config install_list = [db_pkg] - output = tempfile.TemporaryFile() - outputstr = "" + pkgname = db_pkg.pf + output = tempfile.NamedTemporaryFile(prefix=pkgname + "-", delete=False) + start_timestamp = time.time() + job = EmergeJobState(target, pkgname, False, output.name, start_timestamp) + job_queue.put(job) if "--pretend" in opts: - retval = 0 + retcode = 0 else: save_stdout = sys.stdout save_stderr = sys.stderr @@ -1104,30 +1164,34 @@ def EmergeWorker(task_queue, done_queue, emerge, package_db): sys.stderr = output scheduler = Scheduler(settings, trees, mtimedb, opts, spinner, install_list, [], emerge.scheduler_graph) - retval = scheduler.merge() + retcode = scheduler.merge() + except Exception: + traceback.print_exc(file=output) + retcode = 1 finally: sys.stdout = save_stdout sys.stderr = save_stderr - if retval is None: - retval = 0 - if retval != 0: - output.seek(0) - outputstr = output.read() + output.close() + if retcode is None: + retcode = 0 - done_queue.put((target, retval, outputstr)) + job = EmergeJobState(target, pkgname, True, output.name, start_timestamp, + retcode) + job_queue.put(job) class EmergeQueue(object): """Class to schedule emerge jobs according to a dependency graph.""" - def __init__(self, deps_map, emerge, package_db): + def __init__(self, deps_map, emerge, package_db, show_output): # Store the dependency graph. self._deps_map = deps_map # Initialize the running queue to empty - self._jobs = set() + self._jobs = {} # List of total package installs represented in deps_map. install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"] self._total_jobs = len(install_jobs) + self._show_output = show_output if "--pretend" in emerge.opts: print "Skipping merge because of --pretend mode." @@ -1140,21 +1204,48 @@ class EmergeQueue(object): procs = min(self._total_jobs, emerge.opts.get("--jobs", multiprocessing.cpu_count())) self._emerge_queue = multiprocessing.Queue() - self._done_queue = multiprocessing.Queue() - args = (self._emerge_queue, self._done_queue, emerge, package_db) + self._job_queue = multiprocessing.Queue() + args = (self._emerge_queue, self._job_queue, emerge, package_db) self._pool = multiprocessing.Pool(procs, EmergeWorker, args) # Initialize the failed queue to empty. self._retry_queue = [] - self._failed = {} + self._failed = set() # Print an update before we launch the merges. self._Status() + # Setup an exit handler so that we print nice messages if we are + # terminated. + self._SetupExitHandler() + + # Schedule our jobs. for target, info in deps_map.items(): if not info["needs"]: self._Schedule(target) + def _SetupExitHandler(self): + + def ExitHandler(signum, frame): + + # Kill our signal handlers so we don't get called recursively + signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + + # Print our current job status + for target, job in self._jobs.iteritems(): + if job: + self._PrintJob(job) + os.unlink(job.filename) + + # Notify the user that we are exiting + print "Exiting on signal %s" % signum + sys.exit(1) + + # Print out job status when we are killed + signal.signal(signal.SIGINT, ExitHandler) + signal.signal(signal.SIGTERM, ExitHandler) + def _Schedule(self, target): # We maintain a tree of all deps, if this doesn't need # to be installed just free up it's children and continue. @@ -1165,16 +1256,56 @@ class EmergeQueue(object): self._Finish(target) else: # Kick off the build if it's marked to be built. - self._jobs.add(target) + self._jobs[target] = None self._emerge_queue.put(target) def _LoadAvg(self): loads = open("/proc/loadavg", "r").readline().split()[:3] return " ".join(loads) + def _PrintJob(self, job): + """Print output so far of specified job""" + + # Calculate how long the job has been running. + current_time = time.time() + seconds = current_time - job.start_timestamp + + # Note that we've printed out the job so far. + job.last_output_timestamp = current_time + + # Note that we're starting the job + info = "job %s (%dm%.1fs) ===" % (job.pkgname, seconds / 60, seconds % 60) + if job.last_output_seek: + print "=== Continue output for %s " % info + else: + print "=== Start output for %s ===" % info + + # Print actual output from job + f = codecs.open(job.filename, encoding='utf-8', errors='replace') + f.seek(job.last_output_seek) + prefix = job.pkgname + ":" + for line in f: + + # Save off our position in the file + if line and line[-1] == "\n": + job.last_output_seek = f.tell() + line = line[:-1] + + # Print our line + print prefix, line.encode('utf-8', 'replace') + f.close() + + # Note end of output section + if job.done: + print "=== Complete: %s ===" % info + else: + print "=== Still running: %s ===" % info + + def _Status(self): """Print status.""" - seconds = time.time() - GLOBAL_START + current_time = time.time() + seconds = current_time - GLOBAL_START line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s " "[Time %dm%.1fs Load %s]") qsize = self._emerge_queue.qsize() @@ -1182,6 +1313,18 @@ class EmergeQueue(object): len(self._retry_queue), self._total_jobs, seconds / 60, seconds % 60, self._LoadAvg()) + # Print interim output every minute if --show-output is used. Otherwise, + # only print output if a job has been running for 60 minutes or more. + if self._show_output: + interval = 60 + else: + interval = 60 * 60 + for target, job in self._jobs.iteritems(): + if job: + last_timestamp = max(job.start_timestamp, job.last_output_timestamp) + if last_timestamp + interval < current_time: + self._PrintJob(job) + def _Finish(self, target): """Mark a target as completed and unblock dependecies.""" for dep in self._deps_map[target]["provides"]: @@ -1205,7 +1348,7 @@ class EmergeQueue(object): while self._deps_map: # Check here that we are actually waiting for something. if (self._emerge_queue.empty() and - self._done_queue.empty() and + self._job_queue.empty() and not self._jobs and self._deps_map): # If we have failed on a package, retry it now. @@ -1213,11 +1356,10 @@ class EmergeQueue(object): self._Retry() # If we have failed a package twice, just give up. elif self._failed: - for failure, output in self._failed.items(): + for failure in self._failed: print "Package failed: %s" % failure - print output PrintDepsMap(self._deps_map) - print "Packages failed: %s" % " ,".join(self._failed.keys()) + print "Packages failed: %s" % " ,".join(self._failed) sys.exit(1) # If we have dependency cycles. else: @@ -1226,27 +1368,39 @@ class EmergeQueue(object): sys.exit(1) try: - target, retcode, output = self._done_queue.get(timeout=5) + job = self._job_queue.get(timeout=5) except Queue.Empty: # Print an update. self._Status() continue - self._jobs.discard(target) + target = job.target - # Print if necessary. - if retcode != 0: - print output - if retcode != 0: + if not job.done: + self._jobs[target] = job + print "Started %s (logged in %s)" % (target, job.filename) + continue + + # Print output of job + if self._show_output or job.retcode != 0: + self._PrintJob(job) + os.unlink(job.filename) + del self._jobs[target] + + seconds = time.time() - job.start_timestamp + details = "%s (in %dm%.1fs)" % (target, seconds / 60, seconds % 60) + + # Complain if necessary. + if job.retcode != 0: # Handle job failure. if target in self._failed: # If this job has failed previously, give up. - print "Failed %s. Your build has failed." % target + print "Failed %s. Your build has failed." % details else: # Queue up this build to try again after a long while. self._retry_queue.append(target) - self._failed[target] = 1 - print "Failed %s, retrying later." % target + self._failed.add(target) + print "Failed %s, retrying later." % details else: if target in self._failed and self._retry_queue: # If we have successfully retried a failed package, and there @@ -1254,7 +1408,7 @@ class EmergeQueue(object): # one retrying package actively running at a time. self._Retry() - print "Completed %s" % target + print "Completed %s" % details # Mark as completed and unblock waiting ebuilds. self._Finish(target) @@ -1305,7 +1459,7 @@ def main(): PrintDepsMap(deps_graph) # Run the queued emerges. - scheduler = EmergeQueue(deps_graph, emerge, deps.package_db) + scheduler = EmergeQueue(deps_graph, emerge, deps.package_db, deps.show_output) scheduler.Run() # Update world.