#!/usr/bin/python2.6 # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Program to run emerge in parallel, for significant speedup. Usage: ./parallel_emerge --board=BOARD [emerge args] package Basic operation: Runs 'emerge -p --debug' to display dependencies, and stores a dependency graph. All non-blocked packages are launched in parallel, as 'emerge --nodeps package' with any blocked packages being emerged immediately upon deps being met. For this to work effectively, /usr/lib/portage/pym/portage/locks.py must be stubbed out, preventing portage from slowing itself with unneccesary locking, as this script ensures that emerge is run in such a way that common resources are never in conflict. This is controlled by an environment variable PORTAGE_LOCKS set in parallel emerge subprocesses. Parallel Emerge unlocks two things during operation, here's what you must do to keep this safe: * Storage dir containing binary packages. - Don't emerge new packages while installing the existing ones. * Portage database - You must not examine deps while modifying the database. Therefore you may only parallelize "-p" read only access, or "--nodeps" write only access. Caveats: * Some ebuild packages have incorrectly specified deps, and running them in parallel is more likely to bring out these failures. * Portage "world" is a record of explicitly installed packages. In this parallel scheme, explicitly installed packages are installed twice, once for the real install, and once for world file addition. * Some ebuilds (especially the build part) have complex dependencies that are not captured well by this script (it may be necessary to install an old package to build, but then install a newer version of the same package for a runtime dep). This script is only currently stable for binpkg installs. """ import os import re import shlex import subprocess import sys import tempfile import time def Usage(): print "Usage:" print " ./parallel_emerge --board=BOARD [emerge args] package" sys.exit(1) # These are dependencies that are not specified in the package, # but will prevent the package from installing. secret_deps = {} # Globals: package we are building, board we are targeting, # emerge args we are passing through. PACKAGE = None EMERGE_ARGS = "" BOARD = None # Runtime flags. TODO(): maybe make these commandline options or # environment veriables. VERBOSE = False AUTOCLEAN = False def ParseArgs(argv): """Set global vars based on command line. We need to be compatible with emerge arg format. We scrape --board-XXX, and distinguish between args and package names. TODO(): robustify argument processing, as it's possible to pass in many two argument parameters that are difficult to programmaitcally identify, although we don't currently use any besides --bdeps . Args: argv: arguments list Returns: triplet of (package list, emerge argumens, board string) """ if VERBOSE: print argv board_arg = None package_args = [] emerge_passthru_args = "" re_board = re.compile(r"--board=(?P.*)") for arg in argv[1:]: # Check if the arg begins with '-' if arg[0] == "-" or arg == "y" or arg == "n": # Specifically match "--board=" m = re_board.match(arg) if m: board_arg = m.group("board") else: # Pass through to emerge. emerge_passthru_args = emerge_passthru_args + " " + arg else: # Only non-dashed arg should be the target package. package_args.append(arg) if not package_args: Usage() sys.exit(1) # Set globals. return " ".join(package_args), emerge_passthru_args, board_arg def EmergeCommand(): """Helper function to return the base emerge commandline. This is configured for board type, and including pass thru args, using global variables. TODO(): unglobalfy. Returns: string containing emerge command. """ emerge = "emerge" if BOARD: emerge += "-" + BOARD return emerge + " " + EMERGE_ARGS def GetDepsFromPortage(package): """Get dependency tree info by running emerge. Run 'emerge -p --debug package', and get a text output of all deps. TODO(): Put dep caclation in a library, as cros_extract_deps also uses this code. Args: package: string containing the packages to build. Returns: text output of emerge -p --debug, which can be processed elsewhere. """ print "Calculating deps for package %s" % package cmdline = EmergeCommand() + " -p --debug --color=n " + package print "+ %s" % cmdline # Store output in a temp file as it is too big for a unix pipe. stderr_buffer = tempfile.TemporaryFile() stdout_buffer = tempfile.TemporaryFile() # Launch the subprocess. start = time.time() depsproc = subprocess.Popen(shlex.split(cmdline), stderr=stderr_buffer, stdout=stdout_buffer, bufsize=64*1024) depsproc.wait() seconds = time.time() - start print "Deps calculated in %d:%04.1fs" % (seconds / 60, seconds % 60) stderr_buffer.seek(0) stderr_raw = stderr_buffer.read() info_start = stderr_raw.find("digraph") stdout_buffer.seek(0) stdout_raw = stdout_buffer.read() lines = [] if info_start != -1: lines = stderr_raw[info_start:].split("\n") lines.extend(stdout_raw.split("\n")) if VERBOSE or depsproc.returncode != 0: output = stderr_raw + stdout_raw print output if depsproc.returncode != 0: print "Failed to generate deps" sys.exit(1) return lines def DepsToTree(lines): """Regex the output from 'emerge --debug' to generate a nested dict of deps. Args: lines: output from 'emerge -p --debug package' Returns: dep_tree: nested dict of dependencies, as specified by emerge. there may be dupes, or circular deps. We need to regex lines as follows: hard-host-depends depends on ('ebuild', '/', 'dev-lang/swig-1.3.36', 'merge') depends on ('ebuild', '/', 'dev-lang/perl-5.8.8-r8', 'merge') (buildtime) ('binary', '/.../rootfs/', 'sys-auth/policykit-0.9-r1', 'merge') depends on ('binary', '/.../rootfs/', 'x11-misc/xbitmaps-1.1.0', 'merge') (no children) """ re_deps = re.compile(r"(?P\W*)\(\'(?P\w+)\', " r"\'(?P[\w/\.-]+)\'," r" \'(?P[\w\+-]+)/(?P[\w\+-]+)-" r"(?P\d+[\w\.-]*)\', \'(?P\w+)\'\) " r"(?P(depends on|\(.*\)))") re_origdeps = re.compile(r"(?P[\w\+/-]+) depends on") re_installed_package = re.compile( r"\[(?P[^\]]*)\] " r"(?P[\w\+-]+)/" r"(?P[\w\+-]+)-" r"(?P\d+[\w\.-]*)( \[" r"(?P\d+[\w\.-]*)\])?" ) re_failed = re.compile(r".*depends on.*") deps_tree = {} deps_stack = [] deps_info = {} for line in lines: m = re_deps.match(line) m_orig = re_origdeps.match(line) m_installed = re_installed_package.match(line) if m: pkgname = m.group("pkgname") pkgdir = m.group("pkgdir") pkgtype = m.group("pkgtype") indent = m.group("indent") doins = m.group("action") deptype = m.group("deptype") depth = 1 if not indent: depth = 0 version = m.group("version") # If we are indented, we should have # found a "depends on" previously. if len(deps_stack) < depth: print "FAIL: corrupt input at:" print line print "No Parent." sys.exit(1) # Go step by step through stack and tree # until we find our parent. Generate updatedep = deps_tree for i in range(0, depth): updatedep = updatedep[deps_stack[i]]["deps"] # Pretty print what we've captured. indent = "|" + "".ljust(depth, "_") fullpkg = "%s/%s-%s" % (pkgdir, pkgname, version) if VERBOSE: print ("" + indent + " " + pkgdir + "/" + pkgname + " - " + version + " (" + pkgtype + ", " + doins + ", " + deptype + ")") # Add our new package into the tree, if it's not already there. updatedep.setdefault(fullpkg, {}) # Add an empty deps for this new package. updatedep[fullpkg].setdefault("deps", {}) # Add the action we should take (merge, nomerge). updatedep[fullpkg].setdefault("action", doins) # Add the type of dep. updatedep[fullpkg].setdefault("deptype", deptype) # Drop any stack entries below our depth. deps_stack = deps_stack[0:depth] # Add ourselves to the end of the stack. deps_stack.append(fullpkg) elif m_orig: # Also capture "pseudo packages", which are the freeform test # we requested to be installed. These are generic package names # like "chromeos" rather than chromeos/chromeos-0.0.1 depth = 0 # Tag these with "original" in case they overlap with real packages. pkgname = "original-%s" % m_orig.group("pkgname") # Insert this into the deps tree so so we can stick it in "world" updatedep = deps_tree for i in range(0, depth): updatedep = updatedep[deps_stack[i]]["deps"] if VERBOSE: print pkgname # Add our new package into the tree, if it's not already there. updatedep.setdefault(pkgname, {}) updatedep[pkgname].setdefault("deps", {}) # Add the type of dep. updatedep[pkgname].setdefault("action", "world") updatedep[pkgname].setdefault("deptype", "normal") # Drop any obsolete stack entries. deps_stack = deps_stack[0:depth] # Add ourselves to the end of the stack. deps_stack.append(pkgname) elif m_installed: pkgname = m_installed.group("pkgname") pkgdir = m_installed.group("pkgdir") version = m_installed.group("version") oldversion = m_installed.group("oldversion") desc = m_installed.group("desc") uninstall = False if oldversion and (desc.find("U") != -1 or desc.find("D") != -1): uninstall = True fullpkg = "%s/%s-%s" % (pkgdir, pkgname, version) deps_info[fullpkg] = {"idx": len(deps_info), "pkgdir": pkgdir, "pkgname": pkgname, "oldversion": oldversion, "uninstall": uninstall} else: # Is this a package that failed to match our huge regex? m = re_failed.match(line) if m: print "FAIL: Couldn't understand line:" print line sys.exit(1) return deps_tree, deps_info def PrintTree(deps, depth=""): """Print the deps we have seen in the emerge output. Args: deps: dependency tree structure. depth: allows printing the tree recursively, with indentation. """ for entry in deps: action = deps[entry]["action"] print "%s %s (%s)" % (depth, entry, action) PrintTree(deps[entry]["deps"], depth=depth + " ") def GenDependencyGraph(deps_tree, deps_info): """Generate a doubly linked dependency graph. Args: deps_tree: dependency tree structure. deps_info: more info on the dependencies. Returns: Deps graph in the form of a dict of packages, with each package specifying a "needs" list and "provides" list. """ deps_map = {} def ReverseTree(packages): """Convert tree to digraph. Take the tree of package -> requirements and reverse it to a digraph of buildable packages -> packages they unblock Args: packages: tree(s) of dependencies Returns: unsanitized digraph """ for pkg in packages: action = packages[pkg]["action"] this_pkg = deps_map.setdefault( pkg, {"needs": set(), "provides": set(), "action": "nomerge"}) if action != "nomerge": this_pkg["action"] = action this_pkg["deps_info"] = deps_info.get(pkg) ReverseTree(packages[pkg]["deps"]) for dep, dep_item in packages[pkg]["deps"].items(): dep_pkg = deps_map[dep] dep_type = dep_item["deptype"] if dep_type != "(runtime_post)": dep_pkg["provides"].add(pkg) this_pkg["needs"].add(dep) def RemoveInstalledPackages(): """Remove installed packages, propagating dependencies""" rm_pkgs = set(deps_map.keys()) - set(deps_info.keys()) for pkg in rm_pkgs: this_pkg = deps_map[pkg] needs = this_pkg["needs"] provides = this_pkg["provides"] for dep in needs: dep_provides = deps_map[dep]["provides"] dep_provides.update(provides) dep_provides.discard(pkg) dep_provides.discard(dep) for target in provides: target_needs = deps_map[target]["needs"] target_needs.update(needs) target_needs.discard(pkg) target_needs.discard(target) del deps_map[pkg] def SanitizeDep(basedep, currdep, oldstack, limit): """Search for circular deps between basedep and currdep, then recurse. Args: basedep: original dependency, top of stack. currdep: bottom of our current recursion, bottom of stack. oldstack: current dependency chain. limit: how many more levels of recusion to go through, max. TODO(): Break RDEPEND preferentially. Returns: True iff circular dependencies are found. """ if limit == 0: return for dep in deps_map[currdep]["needs"]: stack = oldstack + [dep] if basedep in deps_map[dep]["needs"] or dep == basedep: if dep != basedep: stack += [basedep] print "Remove cyclic dependency from:" for i in xrange(0, len(stack) - 1): print " %s -> %s " % (stack[i], stack[i+1]) return True if dep not in oldstack and SanitizeDep(basedep, dep, stack, limit - 1): return True return def SanitizeTree(): """Remove circular dependencies up to cycle length 32.""" start = time.time() for basedep in deps_map: for dep in deps_map[basedep]["needs"].copy(): if deps_info[basedep]["idx"] <= deps_info[dep]["idx"]: if SanitizeDep(basedep, dep, [basedep, dep], 31): print "Breaking", basedep, " -> ", dep deps_map[basedep]["needs"].remove(dep) deps_map[dep]["provides"].remove(basedep) seconds = time.time() - start print "Tree sanitized in %d:%04.1fs" % (seconds / 60, seconds % 60) def AddSecretDeps(): """Find these tagged packages and add extra dependencies. For debugging dependency problems. """ for bad in secret_deps: needed = secret_deps[bad] bad_pkg = None needed_pkg = None for dep in deps_map: if dep.find(bad) != -1: bad_pkg = dep if dep.find(needed) != -1: needed_pkg = dep if bad_pkg and needed_pkg: deps_map[needed_pkg]["provides"].add(bad_pkg) deps_map[bad_pkg]["needs"].add(needed_pkg) ReverseTree(deps_tree) AddSecretDeps() RemoveInstalledPackages() SanitizeTree() return deps_map def PrintDepsMap(deps_map): """Print dependency graph, for each package list it's prerequisites.""" for i in deps_map: print "%s: (%s) needs" % (i, deps_map[i]["action"]) for j in deps_map[i]["needs"]: print " %s" % (j) class EmergeQueue(object): """Class to schedule emerge jobs according to a dependency graph.""" def __init__(self, deps_map): # Store the dependency graph. self._deps_map = deps_map # Initialize the runnable queue to empty. self._jobs = [] # List of total package installs represented in deps_map. install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"] self._total_jobs = len(install_jobs) # Initialize the ready queue, these are jobs with no unmet dependencies. self._emerge_queue = [x for x in deps_map if not deps_map[x]["needs"]] # Initialize the failed queue to empty. self._retry_queue = [] self._failed = {} def _Status(self): """Print status.""" print "Pending %s, Ready %s, Running %s, Retrying %s, Total %s" % ( len(self._deps_map), len(self._emerge_queue), len(self._jobs), len(self._retry_queue), self._total_jobs) def _LaunchOneEmerge(self, target): """Run emerge --nodeps to do a single package install. If this is a pseudopackage, that means we're done, and can select in in the world file. Args: target: the full package name of the package to install. eg. "sys-apps/portage-2.17" Returns: triplet containing (target name, subprocess object, output buffer object) """ if target.startswith("original-"): # "original-" signifies one of the packages we originally requested. # Since we have explicitly installed the versioned package as a dep of # this, we only need to tag in "world" that we are done with this # install request. "--select -n" indicates an addition to "world" # without an actual install. newtarget = target.replace("original-", "") cmdline = EmergeCommand() + " --nodeps --select --noreplace " + newtarget else: # This package is a dependency of something we specifically # requested. Therefore we should install it but not allow it # in the "world" file, which represents explicit intalls. # "--oneshot" here will prevent it from being tagged in world. cmdline = EmergeCommand() + " --nodeps --oneshot =" + target deps_info = self._deps_map[target]["deps_info"] if deps_info["uninstall"]: package = "%(pkgdir)s/%(pkgname)s-%(oldversion)s" % deps_info cmdline += " && %s -1C =%s" % (EmergeCommand(), package) print "+ %s" % cmdline # Store output in a temp file as it is too big for a unix pipe. stdout_buffer = tempfile.TemporaryFile() # Modify the environment to disable locking. portage_env = os.environ.copy() portage_env["PORTAGE_LOCKS"] = "false" portage_env["UNMERGE_DELAY"] = "0" # Autoclean rummages around in the portage database and uninstalls # old packages. Definitely not necessary for build_image. However # it may be necessary for incremental build_packages. It may also # not be parallel safe. if not AUTOCLEAN: portage_env["AUTOCLEAN"] = "no" # Launch the subprocess. emerge_proc = subprocess.Popen( cmdline, shell=True, stdout=stdout_buffer, stderr=subprocess.STDOUT, bufsize=64*1024, env=portage_env) return (target, emerge_proc, stdout_buffer) def _Finish(self, target): """Mark a target as completed and unblock dependecies.""" for dep in self._deps_map[target]["provides"]: self._deps_map[dep]["needs"].remove(target) if not self._deps_map[dep]["needs"]: if VERBOSE: print "Unblocking %s" % dep self._emerge_queue.append(dep) self._deps_map.pop(target) def _Retry(self): if self._retry_queue: target = self._retry_queue.pop(0) self._emerge_queue.append(target) print "Retrying emerge of %s." % target def Run(self): """Run through the scheduled ebuilds. Keep running so long as we have uninstalled packages in the dependency graph to merge. """ while self._deps_map: # If we have packages that are ready, kick them off. if self._emerge_queue: target = self._emerge_queue.pop(0) action = self._deps_map[target]["action"] # We maintain a tree of all deps, if this doesn't need # to be installed just free up it's children and continue. # It is possible to reinstall deps of deps, without reinstalling # first level deps, like so: # chromeos (merge) -> eselect (nomerge) -> python (merge) if action == "nomerge": self._Finish(target) else: # Kick off the build if it's marked to be built. print "Emerging %s (%s)" % (target, action) job = self._LaunchOneEmerge(target) # Append it to the active jobs list. self._jobs.append(job) continue # Wait a bit to see if maybe some jobs finish. You can't # wait on a set of jobs in python, so we'll just poll. time.sleep(1) # Check here that we are actually waiting for something. if (not self._emerge_queue and not self._jobs and self._deps_map): # If we have failed on a package retry it now. if self._retry_queue: self._Retry() # If we have failed a package twice, just give up. elif self._failed: for failure, output in self._failed.items(): print "Package failed: %s" % failure print output PrintDepsMap(self._deps_map) print "Packages failed: %s" % " ,".join(self._failed.keys()) sys.exit(1) # If we have dependency cycles. else: print "Deadlock! Circular dependencies!" PrintDepsMap(self._deps_map) sys.exit(1) # Check every running job to see if we've finished any jobs. for target, job, stdout in self._jobs: # Is it done? if job.poll() is not None: # Clean up the subprocess. job.wait() # Get the output if we want to print it. stdout.seek(0) output = stdout.read() # Remove from active jobs list, we are done with this process. self._jobs.remove((target, job, stdout)) # Print if necessary. if VERBOSE: print output if job.returncode != 0: # Handle job failure. if target in self._failed: # If this job has failed previously, give up. print "Failed %s. Your build has failed." % target else: # Queue up this build to try again after a long while. self._retry_queue.append(target) self._failed[target] = output print "Failed %s, retrying later." % target else: if target in self._failed and self._retry_queue: # If we have successfully retried a failed package, and there # are more failed packages, try the next one. We will only have # one retrying package actively running at a time. self._Retry() print "Completed %s" % target # Mark as completed and unblock waiting ebuilds. self._Finish(target) # Print an update. self._Status() # Main control code. print "Starting fast-emerge." PACKAGE, EMERGE_ARGS, BOARD = ParseArgs(sys.argv) print " Building package %s on %s (%s)" % (PACKAGE, EMERGE_ARGS, BOARD) print "Running emerge to generate deps" deps_output = GetDepsFromPortage(PACKAGE) print "Processing emerge output" dependency_tree, dependency_info = DepsToTree(deps_output) if VERBOSE: print "Print tree" PrintTree(dependency_tree) print "Generate dependency graph." dependency_graph = GenDependencyGraph(dependency_tree, dependency_info) if VERBOSE: PrintDepsMap(dependency_graph) # Run the queued emerges. scheduler = EmergeQueue(dependency_graph) scheduler.Run() print "Done"