diff --git a/parallel_emerge b/parallel_emerge index fbdcc6557d..3d5ad74205 100755 --- a/parallel_emerge +++ b/parallel_emerge @@ -1119,21 +1119,7 @@ class EmergeJobState(object): self.start_timestamp = start_timestamp -def EmergeWorker(task_queue, job_queue, emerge, package_db): - """This worker emerges any packages given to it on the task_queue. - - Args: - task_queue: The queue of tasks for this worker to do. - job_queue: The queue of results from the worker. - emerge: An EmergeData() object. - package_db: A dict, mapping package ids to portage Package objects. - - It expects package identifiers to be passed to it via task_queue. When - a task is started, it pushes the (target, filename) to the started_queue. - The output is stored in filename. When a merge starts or finishes, we push - EmergeJobState objects to the job_queue. - """ - +def SetupWorkerSignals(): def ExitHandler(signum, frame): # Remove our signal handlers so we don't get called recursively. signal.signal(signal.SIGINT, signal.SIG_DFL) @@ -1149,6 +1135,23 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db): signal.signal(signal.SIGINT, ExitHandler) signal.signal(signal.SIGTERM, ExitHandler) + +def EmergeWorker(task_queue, job_queue, emerge, package_db): + """This worker emerges any packages given to it on the task_queue. + + Args: + task_queue: The queue of tasks for this worker to do. + job_queue: The queue of results from the worker. + emerge: An EmergeData() object. + package_db: A dict, mapping package ids to portage Package objects. + + It expects package identifiers to be passed to it via task_queue. When + a task is started, it pushes the (target, filename) to the started_queue. + The output is stored in filename. When a merge starts or finishes, we push + EmergeJobState objects to the job_queue. + """ + + SetupWorkerSignals() settings, trees, mtimedb = emerge.settings, emerge.trees, emerge.mtimedb opts, spinner = emerge.opts, emerge.spinner opts["--nodeps"] = True @@ -1190,6 +1193,86 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db): job_queue.put(job) +class LinePrinter(object): + """Helper object to print a single line.""" + + def __init__(self, line): + self.line = line + + def Print(self, seek_locations): + print self.line + + +class JobPrinter(object): + """Helper object to print output of a job.""" + + def __init__(self, job, unlink=False): + """Print output of job. + + If unlink is True, unlink the job output file when done.""" + self.current_time = time.time() + self.job = job + self.unlink = unlink + + def Print(self, seek_locations): + + job = self.job + + # Calculate how long the job has been running. + seconds = self.current_time - job.start_timestamp + + # Note that we've printed out the job so far. + job.last_output_timestamp = self.current_time + + # Note that we're starting the job + info = "job %s (%dm%.1fs)" % (job.pkgname, seconds / 60, seconds % 60) + last_output_seek = seek_locations.get(job.filename, 0) + if last_output_seek: + print "=== Continue output for %s ===" % info + else: + print "=== Start output for %s ===" % info + + # Print actual output from job + f = codecs.open(job.filename, encoding='utf-8', errors='replace') + f.seek(last_output_seek) + prefix = job.pkgname + ":" + for line in f: + + # Save off our position in the file + if line and line[-1] == "\n": + last_output_seek = f.tell() + line = line[:-1] + + # Print our line + print prefix, line.encode('utf-8', 'replace') + f.close() + + # Save our last spot in the file so that we don't print out the same + # location twice. + seek_locations[job.filename] = last_output_seek + + # Note end of output section + if job.done: + print "=== Complete: %s ===" % info + else: + print "=== Still running: %s ===" % info + + if self.unlink: + os.unlink(job.filename) + + +def PrintWorker(queue): + """A worker that prints stuff to the screen as requested.""" + SetupWorkerSignals() + seek_locations = {} + while True: + job = queue.get() + if job: + job.Print(seek_locations) + else: + break + + class EmergeQueue(object): """Class to schedule emerge jobs according to a dependency graph.""" @@ -1215,8 +1298,12 @@ class EmergeQueue(object): emerge.opts.get("--jobs", multiprocessing.cpu_count())) self._emerge_queue = multiprocessing.Queue() self._job_queue = multiprocessing.Queue() + self._print_queue = multiprocessing.Queue() args = (self._emerge_queue, self._job_queue, emerge, package_db) self._pool = multiprocessing.Pool(procs, EmergeWorker, args) + self._print_worker = multiprocessing.Process(target=PrintWorker, + args=[self._print_queue]) + self._print_worker.start() # Initialize the failed queue to empty. self._retry_queue = [] @@ -1245,11 +1332,14 @@ class EmergeQueue(object): # Print our current job status for target, job in self._jobs.iteritems(): if job: - self._PrintJob(job) - os.unlink(job.filename) + self._print_queue.put(JobPrinter(job, unlink=True)) # Notify the user that we are exiting - print "Exiting on signal %s" % signum + self._Print("Exiting on signal %s" % signum) + + # Exit when print worker is done. + self._print_queue.put(None) + self._print_worker.join() sys.exit(1) # Print out job status when we are killed @@ -1273,44 +1363,9 @@ class EmergeQueue(object): loads = open("/proc/loadavg", "r").readline().split()[:3] return " ".join(loads) - def _PrintJob(self, job): - """Print output so far of specified job""" - - # Calculate how long the job has been running. - current_time = time.time() - seconds = current_time - job.start_timestamp - - # Note that we've printed out the job so far. - job.last_output_timestamp = current_time - - # Note that we're starting the job - info = "job %s (%dm%.1fs) ===" % (job.pkgname, seconds / 60, seconds % 60) - if job.last_output_seek: - print "=== Continue output for %s " % info - else: - print "=== Start output for %s ===" % info - - # Print actual output from job - f = codecs.open(job.filename, encoding='utf-8', errors='replace') - f.seek(job.last_output_seek) - prefix = job.pkgname + ":" - for line in f: - - # Save off our position in the file - if line and line[-1] == "\n": - job.last_output_seek = f.tell() - line = line[:-1] - - # Print our line - print prefix, line.encode('utf-8', 'replace') - f.close() - - # Note end of output section - if job.done: - print "=== Complete: %s ===" % info - else: - print "=== Still running: %s ===" % info - + def _Print(self, line): + """Print a single line.""" + self._print_queue.put(LinePrinter(line)) def _Status(self): """Print status.""" @@ -1319,9 +1374,9 @@ class EmergeQueue(object): line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s " "[Time %dm%.1fs Load %s]") qsize = self._emerge_queue.qsize() - print line % (len(self._deps_map), qsize, len(self._jobs) - qsize, - len(self._retry_queue), self._total_jobs, - seconds / 60, seconds % 60, self._LoadAvg()) + self._Print(line % (len(self._deps_map), qsize, len(self._jobs) - qsize, + len(self._retry_queue), self._total_jobs, + seconds / 60, seconds % 60, self._LoadAvg())) # Print interim output every minute if --show-output is used. Otherwise, # only print output if a job has been running for 60 minutes or more. @@ -1333,7 +1388,8 @@ class EmergeQueue(object): if job: last_timestamp = max(job.start_timestamp, job.last_output_timestamp) if last_timestamp + interval < current_time: - self._PrintJob(job) + self._print_queue.put(JobPrinter(job)) + job.last_output_timestamp = current_time def _Finish(self, target): """Mark a target as completed and unblock dependecies.""" @@ -1347,7 +1403,7 @@ class EmergeQueue(object): if self._retry_queue: target = self._retry_queue.pop(0) self._Schedule(target) - print "Retrying emerge of %s." % target + self._Print("Retrying emerge of %s." % target) def Run(self): """Run through the scheduled ebuilds. @@ -1364,17 +1420,19 @@ class EmergeQueue(object): # If we have failed on a package, retry it now. if self._retry_queue: self._Retry() - # If we have failed a package twice, just give up. - elif self._failed: - for failure in self._failed: - print "Package failed: %s" % failure - PrintDepsMap(self._deps_map) - print "Packages failed: %s" % " ,".join(self._failed) - sys.exit(1) - # If we have dependency cycles. else: - print "Deadlock! Circular dependencies!" + # Tell the print worker we're done, and wait for it to exit. + self._print_queue.put(None) + self._print_worker.join() + + # The dependency map is helpful for debugging failures. PrintDepsMap(self._deps_map) + + # Tell the user why we're exiting. + if self._failed: + print "Packages failed: %s" % " ,".join(self._failed) + else: + print "Deadlock! Circular dependencies!" sys.exit(1) try: @@ -1388,13 +1446,14 @@ class EmergeQueue(object): if not job.done: self._jobs[target] = job - print "Started %s (logged in %s)" % (target, job.filename) + self._Print("Started %s (logged in %s)" % (target, job.filename)) continue # Print output of job if self._show_output or job.retcode != 0: - self._PrintJob(job) - os.unlink(job.filename) + self._print_queue.put(JobPrinter(job, unlink=True)) + else: + os.unlink(job.filename) del self._jobs[target] seconds = time.time() - job.start_timestamp @@ -1405,12 +1464,12 @@ class EmergeQueue(object): # Handle job failure. if target in self._failed: # If this job has failed previously, give up. - print "Failed %s. Your build has failed." % details + self._Print("Failed %s. Your build has failed." % details) else: # Queue up this build to try again after a long while. self._retry_queue.append(target) self._failed.add(target) - print "Failed %s, retrying later." % details + self._Print("Failed %s, retrying later." % details) else: if target in self._failed and self._retry_queue: # If we have successfully retried a failed package, and there @@ -1418,13 +1477,17 @@ class EmergeQueue(object): # one retrying package actively running at a time. self._Retry() - print "Completed %s" % details + self._Print("Completed %s" % details) # Mark as completed and unblock waiting ebuilds. self._Finish(target) # Print an update. self._Status() + # Tell the print worker we're done, and wait for it to exit. + self._print_queue.put(None) + self._print_worker.join() + def main():