Add --show-output option to parallel_emerge which prints output from jobs.

Also update parallel_emerge to print output in general if you hit
CTRL-C or if a job is running for more than an hour. Also cleanup exit
handling so that emerge exits a bit more cleanly, and so that we don't
hang if emerge throws an Exception.

BUG=none
TEST=Ran ./parallel_emerge --show-output -uDNve chromeos

Review URL: http://codereview.chromium.org/3010056
This commit is contained in:
David James 2010-08-06 17:18:57 -07:00
parent 638976a796
commit 733fc865db

View File

@ -38,14 +38,17 @@ Basic operation:
of the same package for a runtime dep). of the same package for a runtime dep).
""" """
import codecs
import copy import copy
import multiprocessing import multiprocessing
import os import os
import Queue import Queue
import shlex import shlex
import signal
import sys import sys
import tempfile import tempfile
import time import time
import traceback
import urllib2 import urllib2
# If PORTAGE_USERNAME isn't specified, scrape it from the $HOME variable. On # If PORTAGE_USERNAME isn't specified, scrape it from the $HOME variable. On
@ -214,7 +217,7 @@ class DepGraphGenerator(object):
""" """
__slots__ = ["board", "emerge", "mandatory_source", "no_workon_deps", __slots__ = ["board", "emerge", "mandatory_source", "no_workon_deps",
"package_db", "rebuild"] "package_db", "rebuild", "show_output"]
def __init__(self): def __init__(self):
self.board = None self.board = None
@ -223,6 +226,7 @@ class DepGraphGenerator(object):
self.no_workon_deps = False self.no_workon_deps = False
self.package_db = {} self.package_db = {}
self.rebuild = False self.rebuild = False
self.show_output = False
def ParseParallelEmergeArgs(self, argv): def ParseParallelEmergeArgs(self, argv):
"""Read the parallel emerge arguments from the command-line. """Read the parallel emerge arguments from the command-line.
@ -249,6 +253,8 @@ class DepGraphGenerator(object):
self.no_workon_deps = True self.no_workon_deps = True
elif arg == "--rebuild": elif arg == "--rebuild":
self.rebuild = True self.rebuild = True
elif arg == "--show-output":
self.show_output = True
else: else:
# Not one of our options, so pass through to emerge. # Not one of our options, so pass through to emerge.
emerge_args.append(arg) emerge_args.append(arg)
@ -1067,20 +1073,72 @@ def PrintDepsMap(deps_map):
print " no dependencies" print " no dependencies"
def EmergeWorker(task_queue, done_queue, emerge, package_db): class EmergeJobState(object):
__slots__ = ["done", "filename", "last_output_seek", "last_output_timestamp",
"pkgname", "retcode", "start_timestamp", "target"]
def __init__(self, target, pkgname, done, filename, start_timestamp,
retcode=None):
# The full name of the target we're building (e.g.
# chromeos-base/chromeos-0.0.1-r60)
self.target = target
# The short name of the target we're building (e.g. chromeos-0.0.1-r60)
self.pkgname = pkgname
# Whether the job is done. (True if the job is done; false otherwise.)
self.done = done
# The filename where output is currently stored.
self.filename = filename
# The location (in bytes) of the end of the last complete line we printed.
# This starts off at zero. We use this to jump to the right place when we
# print output from the same ebuild multiple times.
self.last_output_seek = 0
# The timestamp of the last time we printed output. Since we haven't
# printed output yet, this starts at zero.
self.last_output_timestamp = 0
# The return code of our job, if the job is actually finished.
self.retcode = retcode
# The timestamp when our job started.
self.start_timestamp = start_timestamp
def EmergeWorker(task_queue, job_queue, emerge, package_db):
"""This worker emerges any packages given to it on the task_queue. """This worker emerges any packages given to it on the task_queue.
Args: Args:
task_queue: The queue of tasks for this worker to do. task_queue: The queue of tasks for this worker to do.
done_queue: The queue of results from the worker. job_queue: The queue of results from the worker.
emerge: An EmergeData() object. emerge: An EmergeData() object.
package_db: A dict, mapping package ids to portage Package objects. package_db: A dict, mapping package ids to portage Package objects.
It expects package identifiers to be passed to it via task_queue. When It expects package identifiers to be passed to it via task_queue. When
the package is merged, it pushes (target, retval, outputstr) into the a task is started, it pushes the (target, filename) to the started_queue.
done_queue. The output is stored in filename. When a merge starts or finishes, we push
EmergeJobState objects to the job_queue.
""" """
def ExitHandler(signum, frame):
# Remove our signal handlers so we don't get called recursively.
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
# Try to exit cleanly
sys.exit(1)
# Ensure that we exit quietly and cleanly, if possible, when we receive
# SIGTERM or SIGINT signals. By default, when the user hits CTRL-C, all
# of the child processes will print details about KeyboardInterrupt
# exceptions, which isn't very helpful.
signal.signal(signal.SIGINT, ExitHandler)
signal.signal(signal.SIGTERM, ExitHandler)
settings, trees, mtimedb = emerge.settings, emerge.trees, emerge.mtimedb settings, trees, mtimedb = emerge.settings, emerge.trees, emerge.mtimedb
opts, spinner = emerge.opts, emerge.spinner opts, spinner = emerge.opts, emerge.spinner
opts["--nodeps"] = True opts["--nodeps"] = True
@ -1088,14 +1146,16 @@ def EmergeWorker(task_queue, done_queue, emerge, package_db):
# Wait for a new item to show up on the queue. This is a blocking wait, # Wait for a new item to show up on the queue. This is a blocking wait,
# so if there's nothing to do, we just sit here. # so if there's nothing to do, we just sit here.
target = task_queue.get() target = task_queue.get()
print "Emerging", target
db_pkg = package_db[target] db_pkg = package_db[target]
db_pkg.root_config = emerge.root_config db_pkg.root_config = emerge.root_config
install_list = [db_pkg] install_list = [db_pkg]
output = tempfile.TemporaryFile() pkgname = db_pkg.pf
outputstr = "" output = tempfile.NamedTemporaryFile(prefix=pkgname + "-", delete=False)
start_timestamp = time.time()
job = EmergeJobState(target, pkgname, False, output.name, start_timestamp)
job_queue.put(job)
if "--pretend" in opts: if "--pretend" in opts:
retval = 0 retcode = 0
else: else:
save_stdout = sys.stdout save_stdout = sys.stdout
save_stderr = sys.stderr save_stderr = sys.stderr
@ -1104,30 +1164,34 @@ def EmergeWorker(task_queue, done_queue, emerge, package_db):
sys.stderr = output sys.stderr = output
scheduler = Scheduler(settings, trees, mtimedb, opts, spinner, scheduler = Scheduler(settings, trees, mtimedb, opts, spinner,
install_list, [], emerge.scheduler_graph) install_list, [], emerge.scheduler_graph)
retval = scheduler.merge() retcode = scheduler.merge()
except Exception:
traceback.print_exc(file=output)
retcode = 1
finally: finally:
sys.stdout = save_stdout sys.stdout = save_stdout
sys.stderr = save_stderr sys.stderr = save_stderr
if retval is None: output.close()
retval = 0 if retcode is None:
if retval != 0: retcode = 0
output.seek(0)
outputstr = output.read()
done_queue.put((target, retval, outputstr)) job = EmergeJobState(target, pkgname, True, output.name, start_timestamp,
retcode)
job_queue.put(job)
class EmergeQueue(object): class EmergeQueue(object):
"""Class to schedule emerge jobs according to a dependency graph.""" """Class to schedule emerge jobs according to a dependency graph."""
def __init__(self, deps_map, emerge, package_db): def __init__(self, deps_map, emerge, package_db, show_output):
# Store the dependency graph. # Store the dependency graph.
self._deps_map = deps_map self._deps_map = deps_map
# Initialize the running queue to empty # Initialize the running queue to empty
self._jobs = set() self._jobs = {}
# List of total package installs represented in deps_map. # List of total package installs represented in deps_map.
install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"] install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"]
self._total_jobs = len(install_jobs) self._total_jobs = len(install_jobs)
self._show_output = show_output
if "--pretend" in emerge.opts: if "--pretend" in emerge.opts:
print "Skipping merge because of --pretend mode." print "Skipping merge because of --pretend mode."
@ -1140,21 +1204,48 @@ class EmergeQueue(object):
procs = min(self._total_jobs, procs = min(self._total_jobs,
emerge.opts.get("--jobs", multiprocessing.cpu_count())) emerge.opts.get("--jobs", multiprocessing.cpu_count()))
self._emerge_queue = multiprocessing.Queue() self._emerge_queue = multiprocessing.Queue()
self._done_queue = multiprocessing.Queue() self._job_queue = multiprocessing.Queue()
args = (self._emerge_queue, self._done_queue, emerge, package_db) args = (self._emerge_queue, self._job_queue, emerge, package_db)
self._pool = multiprocessing.Pool(procs, EmergeWorker, args) self._pool = multiprocessing.Pool(procs, EmergeWorker, args)
# Initialize the failed queue to empty. # Initialize the failed queue to empty.
self._retry_queue = [] self._retry_queue = []
self._failed = {} self._failed = set()
# Print an update before we launch the merges. # Print an update before we launch the merges.
self._Status() self._Status()
# Setup an exit handler so that we print nice messages if we are
# terminated.
self._SetupExitHandler()
# Schedule our jobs.
for target, info in deps_map.items(): for target, info in deps_map.items():
if not info["needs"]: if not info["needs"]:
self._Schedule(target) self._Schedule(target)
def _SetupExitHandler(self):
def ExitHandler(signum, frame):
# Kill our signal handlers so we don't get called recursively
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
# Print our current job status
for target, job in self._jobs.iteritems():
if job:
self._PrintJob(job)
os.unlink(job.filename)
# Notify the user that we are exiting
print "Exiting on signal %s" % signum
sys.exit(1)
# Print out job status when we are killed
signal.signal(signal.SIGINT, ExitHandler)
signal.signal(signal.SIGTERM, ExitHandler)
def _Schedule(self, target): def _Schedule(self, target):
# We maintain a tree of all deps, if this doesn't need # We maintain a tree of all deps, if this doesn't need
# to be installed just free up it's children and continue. # to be installed just free up it's children and continue.
@ -1165,16 +1256,56 @@ class EmergeQueue(object):
self._Finish(target) self._Finish(target)
else: else:
# Kick off the build if it's marked to be built. # Kick off the build if it's marked to be built.
self._jobs.add(target) self._jobs[target] = None
self._emerge_queue.put(target) self._emerge_queue.put(target)
def _LoadAvg(self): def _LoadAvg(self):
loads = open("/proc/loadavg", "r").readline().split()[:3] loads = open("/proc/loadavg", "r").readline().split()[:3]
return " ".join(loads) return " ".join(loads)
def _PrintJob(self, job):
"""Print output so far of specified job"""
# Calculate how long the job has been running.
current_time = time.time()
seconds = current_time - job.start_timestamp
# Note that we've printed out the job so far.
job.last_output_timestamp = current_time
# Note that we're starting the job
info = "job %s (%dm%.1fs) ===" % (job.pkgname, seconds / 60, seconds % 60)
if job.last_output_seek:
print "=== Continue output for %s " % info
else:
print "=== Start output for %s ===" % info
# Print actual output from job
f = codecs.open(job.filename, encoding='utf-8', errors='replace')
f.seek(job.last_output_seek)
prefix = job.pkgname + ":"
for line in f:
# Save off our position in the file
if line and line[-1] == "\n":
job.last_output_seek = f.tell()
line = line[:-1]
# Print our line
print prefix, line.encode('utf-8', 'replace')
f.close()
# Note end of output section
if job.done:
print "=== Complete: %s ===" % info
else:
print "=== Still running: %s ===" % info
def _Status(self): def _Status(self):
"""Print status.""" """Print status."""
seconds = time.time() - GLOBAL_START current_time = time.time()
seconds = current_time - GLOBAL_START
line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s " line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s "
"[Time %dm%.1fs Load %s]") "[Time %dm%.1fs Load %s]")
qsize = self._emerge_queue.qsize() qsize = self._emerge_queue.qsize()
@ -1182,6 +1313,18 @@ class EmergeQueue(object):
len(self._retry_queue), self._total_jobs, len(self._retry_queue), self._total_jobs,
seconds / 60, seconds % 60, self._LoadAvg()) seconds / 60, seconds % 60, self._LoadAvg())
# Print interim output every minute if --show-output is used. Otherwise,
# only print output if a job has been running for 60 minutes or more.
if self._show_output:
interval = 60
else:
interval = 60 * 60
for target, job in self._jobs.iteritems():
if job:
last_timestamp = max(job.start_timestamp, job.last_output_timestamp)
if last_timestamp + interval < current_time:
self._PrintJob(job)
def _Finish(self, target): def _Finish(self, target):
"""Mark a target as completed and unblock dependecies.""" """Mark a target as completed and unblock dependecies."""
for dep in self._deps_map[target]["provides"]: for dep in self._deps_map[target]["provides"]:
@ -1205,7 +1348,7 @@ class EmergeQueue(object):
while self._deps_map: while self._deps_map:
# Check here that we are actually waiting for something. # Check here that we are actually waiting for something.
if (self._emerge_queue.empty() and if (self._emerge_queue.empty() and
self._done_queue.empty() and self._job_queue.empty() and
not self._jobs and not self._jobs and
self._deps_map): self._deps_map):
# If we have failed on a package, retry it now. # If we have failed on a package, retry it now.
@ -1213,11 +1356,10 @@ class EmergeQueue(object):
self._Retry() self._Retry()
# If we have failed a package twice, just give up. # If we have failed a package twice, just give up.
elif self._failed: elif self._failed:
for failure, output in self._failed.items(): for failure in self._failed:
print "Package failed: %s" % failure print "Package failed: %s" % failure
print output
PrintDepsMap(self._deps_map) PrintDepsMap(self._deps_map)
print "Packages failed: %s" % " ,".join(self._failed.keys()) print "Packages failed: %s" % " ,".join(self._failed)
sys.exit(1) sys.exit(1)
# If we have dependency cycles. # If we have dependency cycles.
else: else:
@ -1226,27 +1368,39 @@ class EmergeQueue(object):
sys.exit(1) sys.exit(1)
try: try:
target, retcode, output = self._done_queue.get(timeout=5) job = self._job_queue.get(timeout=5)
except Queue.Empty: except Queue.Empty:
# Print an update. # Print an update.
self._Status() self._Status()
continue continue
self._jobs.discard(target) target = job.target
# Print if necessary. if not job.done:
if retcode != 0: self._jobs[target] = job
print output print "Started %s (logged in %s)" % (target, job.filename)
if retcode != 0: continue
# Print output of job
if self._show_output or job.retcode != 0:
self._PrintJob(job)
os.unlink(job.filename)
del self._jobs[target]
seconds = time.time() - job.start_timestamp
details = "%s (in %dm%.1fs)" % (target, seconds / 60, seconds % 60)
# Complain if necessary.
if job.retcode != 0:
# Handle job failure. # Handle job failure.
if target in self._failed: if target in self._failed:
# If this job has failed previously, give up. # If this job has failed previously, give up.
print "Failed %s. Your build has failed." % target print "Failed %s. Your build has failed." % details
else: else:
# Queue up this build to try again after a long while. # Queue up this build to try again after a long while.
self._retry_queue.append(target) self._retry_queue.append(target)
self._failed[target] = 1 self._failed.add(target)
print "Failed %s, retrying later." % target print "Failed %s, retrying later." % details
else: else:
if target in self._failed and self._retry_queue: if target in self._failed and self._retry_queue:
# If we have successfully retried a failed package, and there # If we have successfully retried a failed package, and there
@ -1254,7 +1408,7 @@ class EmergeQueue(object):
# one retrying package actively running at a time. # one retrying package actively running at a time.
self._Retry() self._Retry()
print "Completed %s" % target print "Completed %s" % details
# Mark as completed and unblock waiting ebuilds. # Mark as completed and unblock waiting ebuilds.
self._Finish(target) self._Finish(target)
@ -1305,7 +1459,7 @@ def main():
PrintDepsMap(deps_graph) PrintDepsMap(deps_graph)
# Run the queued emerges. # Run the queued emerges.
scheduler = EmergeQueue(deps_graph, emerge, deps.package_db) scheduler = EmergeQueue(deps_graph, emerge, deps.package_db, deps.show_output)
scheduler.Run() scheduler.Run()
# Update world. # Update world.