mirror of
https://github.com/flatcar/scripts.git
synced 2025-08-09 05:56:58 +02:00
Move printing of output to a separate thread.
This should cut several minutes off the time for build_packages --nousepkg --showoutput. TEST=Ran build_packages --showoutput, verified output. BUG=chromium-os:5647 Review URL: http://codereview.chromium.org/3106008
This commit is contained in:
parent
bc69d7bae4
commit
f4170f83c3
207
parallel_emerge
207
parallel_emerge
@ -1119,21 +1119,7 @@ class EmergeJobState(object):
|
|||||||
self.start_timestamp = start_timestamp
|
self.start_timestamp = start_timestamp
|
||||||
|
|
||||||
|
|
||||||
def EmergeWorker(task_queue, job_queue, emerge, package_db):
|
def SetupWorkerSignals():
|
||||||
"""This worker emerges any packages given to it on the task_queue.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
task_queue: The queue of tasks for this worker to do.
|
|
||||||
job_queue: The queue of results from the worker.
|
|
||||||
emerge: An EmergeData() object.
|
|
||||||
package_db: A dict, mapping package ids to portage Package objects.
|
|
||||||
|
|
||||||
It expects package identifiers to be passed to it via task_queue. When
|
|
||||||
a task is started, it pushes the (target, filename) to the started_queue.
|
|
||||||
The output is stored in filename. When a merge starts or finishes, we push
|
|
||||||
EmergeJobState objects to the job_queue.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def ExitHandler(signum, frame):
|
def ExitHandler(signum, frame):
|
||||||
# Remove our signal handlers so we don't get called recursively.
|
# Remove our signal handlers so we don't get called recursively.
|
||||||
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
||||||
@ -1149,6 +1135,23 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db):
|
|||||||
signal.signal(signal.SIGINT, ExitHandler)
|
signal.signal(signal.SIGINT, ExitHandler)
|
||||||
signal.signal(signal.SIGTERM, ExitHandler)
|
signal.signal(signal.SIGTERM, ExitHandler)
|
||||||
|
|
||||||
|
|
||||||
|
def EmergeWorker(task_queue, job_queue, emerge, package_db):
|
||||||
|
"""This worker emerges any packages given to it on the task_queue.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task_queue: The queue of tasks for this worker to do.
|
||||||
|
job_queue: The queue of results from the worker.
|
||||||
|
emerge: An EmergeData() object.
|
||||||
|
package_db: A dict, mapping package ids to portage Package objects.
|
||||||
|
|
||||||
|
It expects package identifiers to be passed to it via task_queue. When
|
||||||
|
a task is started, it pushes the (target, filename) to the started_queue.
|
||||||
|
The output is stored in filename. When a merge starts or finishes, we push
|
||||||
|
EmergeJobState objects to the job_queue.
|
||||||
|
"""
|
||||||
|
|
||||||
|
SetupWorkerSignals()
|
||||||
settings, trees, mtimedb = emerge.settings, emerge.trees, emerge.mtimedb
|
settings, trees, mtimedb = emerge.settings, emerge.trees, emerge.mtimedb
|
||||||
opts, spinner = emerge.opts, emerge.spinner
|
opts, spinner = emerge.opts, emerge.spinner
|
||||||
opts["--nodeps"] = True
|
opts["--nodeps"] = True
|
||||||
@ -1190,6 +1193,86 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db):
|
|||||||
job_queue.put(job)
|
job_queue.put(job)
|
||||||
|
|
||||||
|
|
||||||
|
class LinePrinter(object):
|
||||||
|
"""Helper object to print a single line."""
|
||||||
|
|
||||||
|
def __init__(self, line):
|
||||||
|
self.line = line
|
||||||
|
|
||||||
|
def Print(self, seek_locations):
|
||||||
|
print self.line
|
||||||
|
|
||||||
|
|
||||||
|
class JobPrinter(object):
|
||||||
|
"""Helper object to print output of a job."""
|
||||||
|
|
||||||
|
def __init__(self, job, unlink=False):
|
||||||
|
"""Print output of job.
|
||||||
|
|
||||||
|
If unlink is True, unlink the job output file when done."""
|
||||||
|
self.current_time = time.time()
|
||||||
|
self.job = job
|
||||||
|
self.unlink = unlink
|
||||||
|
|
||||||
|
def Print(self, seek_locations):
|
||||||
|
|
||||||
|
job = self.job
|
||||||
|
|
||||||
|
# Calculate how long the job has been running.
|
||||||
|
seconds = self.current_time - job.start_timestamp
|
||||||
|
|
||||||
|
# Note that we've printed out the job so far.
|
||||||
|
job.last_output_timestamp = self.current_time
|
||||||
|
|
||||||
|
# Note that we're starting the job
|
||||||
|
info = "job %s (%dm%.1fs)" % (job.pkgname, seconds / 60, seconds % 60)
|
||||||
|
last_output_seek = seek_locations.get(job.filename, 0)
|
||||||
|
if last_output_seek:
|
||||||
|
print "=== Continue output for %s ===" % info
|
||||||
|
else:
|
||||||
|
print "=== Start output for %s ===" % info
|
||||||
|
|
||||||
|
# Print actual output from job
|
||||||
|
f = codecs.open(job.filename, encoding='utf-8', errors='replace')
|
||||||
|
f.seek(last_output_seek)
|
||||||
|
prefix = job.pkgname + ":"
|
||||||
|
for line in f:
|
||||||
|
|
||||||
|
# Save off our position in the file
|
||||||
|
if line and line[-1] == "\n":
|
||||||
|
last_output_seek = f.tell()
|
||||||
|
line = line[:-1]
|
||||||
|
|
||||||
|
# Print our line
|
||||||
|
print prefix, line.encode('utf-8', 'replace')
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
# Save our last spot in the file so that we don't print out the same
|
||||||
|
# location twice.
|
||||||
|
seek_locations[job.filename] = last_output_seek
|
||||||
|
|
||||||
|
# Note end of output section
|
||||||
|
if job.done:
|
||||||
|
print "=== Complete: %s ===" % info
|
||||||
|
else:
|
||||||
|
print "=== Still running: %s ===" % info
|
||||||
|
|
||||||
|
if self.unlink:
|
||||||
|
os.unlink(job.filename)
|
||||||
|
|
||||||
|
|
||||||
|
def PrintWorker(queue):
|
||||||
|
"""A worker that prints stuff to the screen as requested."""
|
||||||
|
SetupWorkerSignals()
|
||||||
|
seek_locations = {}
|
||||||
|
while True:
|
||||||
|
job = queue.get()
|
||||||
|
if job:
|
||||||
|
job.Print(seek_locations)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
class EmergeQueue(object):
|
class EmergeQueue(object):
|
||||||
"""Class to schedule emerge jobs according to a dependency graph."""
|
"""Class to schedule emerge jobs according to a dependency graph."""
|
||||||
|
|
||||||
@ -1215,8 +1298,12 @@ class EmergeQueue(object):
|
|||||||
emerge.opts.get("--jobs", multiprocessing.cpu_count()))
|
emerge.opts.get("--jobs", multiprocessing.cpu_count()))
|
||||||
self._emerge_queue = multiprocessing.Queue()
|
self._emerge_queue = multiprocessing.Queue()
|
||||||
self._job_queue = multiprocessing.Queue()
|
self._job_queue = multiprocessing.Queue()
|
||||||
|
self._print_queue = multiprocessing.Queue()
|
||||||
args = (self._emerge_queue, self._job_queue, emerge, package_db)
|
args = (self._emerge_queue, self._job_queue, emerge, package_db)
|
||||||
self._pool = multiprocessing.Pool(procs, EmergeWorker, args)
|
self._pool = multiprocessing.Pool(procs, EmergeWorker, args)
|
||||||
|
self._print_worker = multiprocessing.Process(target=PrintWorker,
|
||||||
|
args=[self._print_queue])
|
||||||
|
self._print_worker.start()
|
||||||
|
|
||||||
# Initialize the failed queue to empty.
|
# Initialize the failed queue to empty.
|
||||||
self._retry_queue = []
|
self._retry_queue = []
|
||||||
@ -1245,11 +1332,14 @@ class EmergeQueue(object):
|
|||||||
# Print our current job status
|
# Print our current job status
|
||||||
for target, job in self._jobs.iteritems():
|
for target, job in self._jobs.iteritems():
|
||||||
if job:
|
if job:
|
||||||
self._PrintJob(job)
|
self._print_queue.put(JobPrinter(job, unlink=True))
|
||||||
os.unlink(job.filename)
|
|
||||||
|
|
||||||
# Notify the user that we are exiting
|
# Notify the user that we are exiting
|
||||||
print "Exiting on signal %s" % signum
|
self._Print("Exiting on signal %s" % signum)
|
||||||
|
|
||||||
|
# Exit when print worker is done.
|
||||||
|
self._print_queue.put(None)
|
||||||
|
self._print_worker.join()
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
# Print out job status when we are killed
|
# Print out job status when we are killed
|
||||||
@ -1273,44 +1363,9 @@ class EmergeQueue(object):
|
|||||||
loads = open("/proc/loadavg", "r").readline().split()[:3]
|
loads = open("/proc/loadavg", "r").readline().split()[:3]
|
||||||
return " ".join(loads)
|
return " ".join(loads)
|
||||||
|
|
||||||
def _PrintJob(self, job):
|
def _Print(self, line):
|
||||||
"""Print output so far of specified job"""
|
"""Print a single line."""
|
||||||
|
self._print_queue.put(LinePrinter(line))
|
||||||
# Calculate how long the job has been running.
|
|
||||||
current_time = time.time()
|
|
||||||
seconds = current_time - job.start_timestamp
|
|
||||||
|
|
||||||
# Note that we've printed out the job so far.
|
|
||||||
job.last_output_timestamp = current_time
|
|
||||||
|
|
||||||
# Note that we're starting the job
|
|
||||||
info = "job %s (%dm%.1fs) ===" % (job.pkgname, seconds / 60, seconds % 60)
|
|
||||||
if job.last_output_seek:
|
|
||||||
print "=== Continue output for %s " % info
|
|
||||||
else:
|
|
||||||
print "=== Start output for %s ===" % info
|
|
||||||
|
|
||||||
# Print actual output from job
|
|
||||||
f = codecs.open(job.filename, encoding='utf-8', errors='replace')
|
|
||||||
f.seek(job.last_output_seek)
|
|
||||||
prefix = job.pkgname + ":"
|
|
||||||
for line in f:
|
|
||||||
|
|
||||||
# Save off our position in the file
|
|
||||||
if line and line[-1] == "\n":
|
|
||||||
job.last_output_seek = f.tell()
|
|
||||||
line = line[:-1]
|
|
||||||
|
|
||||||
# Print our line
|
|
||||||
print prefix, line.encode('utf-8', 'replace')
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
# Note end of output section
|
|
||||||
if job.done:
|
|
||||||
print "=== Complete: %s ===" % info
|
|
||||||
else:
|
|
||||||
print "=== Still running: %s ===" % info
|
|
||||||
|
|
||||||
|
|
||||||
def _Status(self):
|
def _Status(self):
|
||||||
"""Print status."""
|
"""Print status."""
|
||||||
@ -1319,9 +1374,9 @@ class EmergeQueue(object):
|
|||||||
line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s "
|
line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s "
|
||||||
"[Time %dm%.1fs Load %s]")
|
"[Time %dm%.1fs Load %s]")
|
||||||
qsize = self._emerge_queue.qsize()
|
qsize = self._emerge_queue.qsize()
|
||||||
print line % (len(self._deps_map), qsize, len(self._jobs) - qsize,
|
self._Print(line % (len(self._deps_map), qsize, len(self._jobs) - qsize,
|
||||||
len(self._retry_queue), self._total_jobs,
|
len(self._retry_queue), self._total_jobs,
|
||||||
seconds / 60, seconds % 60, self._LoadAvg())
|
seconds / 60, seconds % 60, self._LoadAvg()))
|
||||||
|
|
||||||
# Print interim output every minute if --show-output is used. Otherwise,
|
# Print interim output every minute if --show-output is used. Otherwise,
|
||||||
# only print output if a job has been running for 60 minutes or more.
|
# only print output if a job has been running for 60 minutes or more.
|
||||||
@ -1333,7 +1388,8 @@ class EmergeQueue(object):
|
|||||||
if job:
|
if job:
|
||||||
last_timestamp = max(job.start_timestamp, job.last_output_timestamp)
|
last_timestamp = max(job.start_timestamp, job.last_output_timestamp)
|
||||||
if last_timestamp + interval < current_time:
|
if last_timestamp + interval < current_time:
|
||||||
self._PrintJob(job)
|
self._print_queue.put(JobPrinter(job))
|
||||||
|
job.last_output_timestamp = current_time
|
||||||
|
|
||||||
def _Finish(self, target):
|
def _Finish(self, target):
|
||||||
"""Mark a target as completed and unblock dependecies."""
|
"""Mark a target as completed and unblock dependecies."""
|
||||||
@ -1347,7 +1403,7 @@ class EmergeQueue(object):
|
|||||||
if self._retry_queue:
|
if self._retry_queue:
|
||||||
target = self._retry_queue.pop(0)
|
target = self._retry_queue.pop(0)
|
||||||
self._Schedule(target)
|
self._Schedule(target)
|
||||||
print "Retrying emerge of %s." % target
|
self._Print("Retrying emerge of %s." % target)
|
||||||
|
|
||||||
def Run(self):
|
def Run(self):
|
||||||
"""Run through the scheduled ebuilds.
|
"""Run through the scheduled ebuilds.
|
||||||
@ -1364,17 +1420,19 @@ class EmergeQueue(object):
|
|||||||
# If we have failed on a package, retry it now.
|
# If we have failed on a package, retry it now.
|
||||||
if self._retry_queue:
|
if self._retry_queue:
|
||||||
self._Retry()
|
self._Retry()
|
||||||
# If we have failed a package twice, just give up.
|
else:
|
||||||
elif self._failed:
|
# Tell the print worker we're done, and wait for it to exit.
|
||||||
for failure in self._failed:
|
self._print_queue.put(None)
|
||||||
print "Package failed: %s" % failure
|
self._print_worker.join()
|
||||||
|
|
||||||
|
# The dependency map is helpful for debugging failures.
|
||||||
PrintDepsMap(self._deps_map)
|
PrintDepsMap(self._deps_map)
|
||||||
|
|
||||||
|
# Tell the user why we're exiting.
|
||||||
|
if self._failed:
|
||||||
print "Packages failed: %s" % " ,".join(self._failed)
|
print "Packages failed: %s" % " ,".join(self._failed)
|
||||||
sys.exit(1)
|
|
||||||
# If we have dependency cycles.
|
|
||||||
else:
|
else:
|
||||||
print "Deadlock! Circular dependencies!"
|
print "Deadlock! Circular dependencies!"
|
||||||
PrintDepsMap(self._deps_map)
|
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -1388,12 +1446,13 @@ class EmergeQueue(object):
|
|||||||
|
|
||||||
if not job.done:
|
if not job.done:
|
||||||
self._jobs[target] = job
|
self._jobs[target] = job
|
||||||
print "Started %s (logged in %s)" % (target, job.filename)
|
self._Print("Started %s (logged in %s)" % (target, job.filename))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Print output of job
|
# Print output of job
|
||||||
if self._show_output or job.retcode != 0:
|
if self._show_output or job.retcode != 0:
|
||||||
self._PrintJob(job)
|
self._print_queue.put(JobPrinter(job, unlink=True))
|
||||||
|
else:
|
||||||
os.unlink(job.filename)
|
os.unlink(job.filename)
|
||||||
del self._jobs[target]
|
del self._jobs[target]
|
||||||
|
|
||||||
@ -1405,12 +1464,12 @@ class EmergeQueue(object):
|
|||||||
# Handle job failure.
|
# Handle job failure.
|
||||||
if target in self._failed:
|
if target in self._failed:
|
||||||
# If this job has failed previously, give up.
|
# If this job has failed previously, give up.
|
||||||
print "Failed %s. Your build has failed." % details
|
self._Print("Failed %s. Your build has failed." % details)
|
||||||
else:
|
else:
|
||||||
# Queue up this build to try again after a long while.
|
# Queue up this build to try again after a long while.
|
||||||
self._retry_queue.append(target)
|
self._retry_queue.append(target)
|
||||||
self._failed.add(target)
|
self._failed.add(target)
|
||||||
print "Failed %s, retrying later." % details
|
self._Print("Failed %s, retrying later." % details)
|
||||||
else:
|
else:
|
||||||
if target in self._failed and self._retry_queue:
|
if target in self._failed and self._retry_queue:
|
||||||
# If we have successfully retried a failed package, and there
|
# If we have successfully retried a failed package, and there
|
||||||
@ -1418,13 +1477,17 @@ class EmergeQueue(object):
|
|||||||
# one retrying package actively running at a time.
|
# one retrying package actively running at a time.
|
||||||
self._Retry()
|
self._Retry()
|
||||||
|
|
||||||
print "Completed %s" % details
|
self._Print("Completed %s" % details)
|
||||||
# Mark as completed and unblock waiting ebuilds.
|
# Mark as completed and unblock waiting ebuilds.
|
||||||
self._Finish(target)
|
self._Finish(target)
|
||||||
|
|
||||||
# Print an update.
|
# Print an update.
|
||||||
self._Status()
|
self._Status()
|
||||||
|
|
||||||
|
# Tell the print worker we're done, and wait for it to exit.
|
||||||
|
self._print_queue.put(None)
|
||||||
|
self._print_worker.join()
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user