#!/usr/bin/python2.6 # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Program to run emerge in parallel, for significant speedup. Usage: ./parallel_emerge [--board=BOARD] [--workon=PKGS] [--no-workon-deps] [emerge args] package" Basic operation: Runs 'emerge -p --debug' to display dependencies, and stores a dependency graph. All non-blocked packages are launched in parallel, as 'emerge --nodeps package' with any blocked packages being emerged immediately upon deps being met. For this to work effectively, /usr/lib/portage/pym/portage/locks.py must be stubbed out, preventing portage from slowing itself with unneccesary locking, as this script ensures that emerge is run in such a way that common resources are never in conflict. This is controlled by an environment variable PORTAGE_LOCKS set in parallel emerge subprocesses. Parallel Emerge unlocks two things during operation, here's what you must do to keep this safe: * Storage dir containing binary packages. - Don't emerge new packages while installing the existing ones. * Portage database - You must not examine deps while modifying the database. Therefore you may only parallelize "-p" read only access, or "--nodeps" write only access. Caveats: * Some ebuild packages have incorrectly specified deps, and running them in parallel is more likely to bring out these failures. * Some ebuilds (especially the build part) have complex dependencies that are not captured well by this script (it may be necessary to install an old package to build, but then install a newer version of the same package for a runtime dep). """ import os import re import shlex import subprocess import sys import tempfile import time import _emerge.main def Usage(): """Print usage.""" print "Usage:" print " ./parallel_emerge [--board=BOARD] [--workon=PKGS] [--no-workon-deps]" print " [emerge args] package" print print "Packages specified as workon packages are always built from source." print "Unless --no-workon-deps is specified, packages that depend on these" print "packages are also built from source." print print "The --workon argument is mainly useful when you want to build and" print "install packages that you are working on unconditionally, but do not" print "to have to rev the package to indicate you want to build it from" print "source. The build_packages script will automatically supply the" print "workon argument to emerge, ensuring that packages selected using" print "cros-workon are rebuilt." sys.exit(1) # These are dependencies that are not specified in the package, # but will prevent the package from installing. secret_deps = {} # Runtime flags. TODO(): Maybe make these command-line options or # environment variables. VERBOSE = False AUTOCLEAN = False # Global start time GLOBAL_START = time.time() def ParseArgs(argv): """Set global vars based on command line. We need to be compatible with emerge arg format. We scrape arguments that are specific to parallel_emerge, and pass through the rest directly to emerge. Args: argv: arguments list Returns: triplet of (package list, emerge argumens, board string) """ if VERBOSE: print argv workon_set = set() myopts = {} myopts["workon"] = workon_set emerge_args = [] for arg in argv[1:]: # Specifically match arguments that are specific to parallel_emerge, and # pass through the rest. if arg.startswith("--board="): myopts["board"] = arg.replace("--board=", "") elif arg.startswith("--workon="): workon_str = arg.replace("--workon=", "") workon_set.update(shlex.split(" ".join(shlex.split(workon_str)))) elif arg == "--no-workon-deps": myopts["no-workon-deps"] = True else: # Not a package name, so pass through to emerge. emerge_args.append(arg) emerge_action, emerge_opts, emerge_files = _emerge.main.parse_opts( emerge_args) return myopts, emerge_action, emerge_opts, emerge_files def EmergeCommand(): """Helper function to return the base emerge commandline. This is configured for board type, and including pass thru args, using global variables. TODO(): Unglobalfy. Returns: string containing emerge command. """ emerge = "emerge" if "board" in OPTS: emerge += "-" + OPTS["board"] cmd = [emerge] for key, val in EMERGE_OPTS.items(): if val is True: cmd.append(key) else: cmd.extend([key, str(val)]) return " ".join(cmd) def GetDepsFromPortage(package): """Get dependency tree info by running emerge. Run 'emerge -p --debug package', and get a text output of all deps. TODO(): Put dep calculation in a library, as cros_extract_deps also uses this code. Args: package: String containing the packages to build. Returns: Text output of emerge -p --debug, which can be processed elsewhere. """ print "Calculating deps for package %s" % package cmdline = (EmergeCommand() + " -p --debug --color=n --with-bdeps=y " + "--selective=n " + package) if OPTS["workon"]: cmdline += " " + " ".join(OPTS["workon"]) print "+ %s" % cmdline # Store output in a temp file as it is too big for a unix pipe. stderr_buffer = tempfile.TemporaryFile() stdout_buffer = tempfile.TemporaryFile() # Launch the subprocess. start = time.time() depsproc = subprocess.Popen(shlex.split(str(cmdline)), stderr=stderr_buffer, stdout=stdout_buffer, bufsize=64*1024) depsproc.wait() seconds = time.time() - start print "Deps calculated in %dm%.1fs" % (seconds / 60, seconds % 60) stderr_buffer.seek(0) stderr_raw = stderr_buffer.read() info_start = stderr_raw.find("digraph") stdout_buffer.seek(0) stdout_raw = stdout_buffer.read() lines = [] if info_start != -1: lines = stderr_raw[info_start:].split("\n") lines.extend(stdout_raw.split("\n")) if VERBOSE or depsproc.returncode != 0: output = stderr_raw + stdout_raw print output if depsproc.returncode != 0: print "Failed to generate deps" sys.exit(1) return lines def DepsToTree(lines): """Regex the output from 'emerge --debug' to generate a nested dict of deps. Args: lines: Output from 'emerge -p --debug package'. Returns: dep_tree: Nested dict of dependencies, as specified by emerge. There may be dupes, or circular deps. We need to regex lines as follows: hard-host-depends depends on ('ebuild', '/', 'dev-lang/swig-1.3.36', 'merge') depends on ('ebuild', '/', 'dev-lang/perl-5.8.8-r8', 'merge') (buildtime) ('binary', '/.../rootfs/', 'sys-auth/policykit-0.9-r1', 'merge') depends on ('binary', '/.../rootfs/', 'x11-misc/xbitmaps-1.1.0', 'merge') (no children) """ re_deps = re.compile(r"(?P\W*)\(\'(?P\w+)\', " r"\'(?P[\w/\.-]+)\'," r" \'(?P[\w\+-]+)/(?P[\w\+-]+)-" r"(?P\d+[\w\.-]*)\', \'(?P\w+)\'\) " r"(?P(depends on|\(.*\)))") re_origdeps = re.compile(r"(?P[\w\+/-]+) depends on") re_installed_package = re.compile( r"\[(?P[^\]]*)\] " r"(?P[\w\+-]+)/" r"(?P[\w\+-]+)-" r"(?P\d+[\w\.-]*)( \[" r"(?P\d+[\w\.-]*)\])?" ) re_failed = re.compile(r".*\) depends on.*") deps_tree = {} deps_stack = [] deps_info = {} for line in lines: m = re_deps.match(line) m_orig = re_origdeps.match(line) m_installed = re_installed_package.match(line) if m: pkgname = m.group("pkgname") pkgdir = m.group("pkgdir") pkgtype = m.group("pkgtype") indent = m.group("indent") doins = m.group("action") deptype = m.group("deptype") depth = 1 if not indent: depth = 0 version = m.group("version") # If we are indented, we should have # found a "depends on" previously. if len(deps_stack) < depth: print "FAIL: corrupt input at:" print line print "No Parent." sys.exit(1) # Go step by step through stack and tree # until we find our parent. updatedep = deps_tree for i in range(0, depth): updatedep = updatedep[deps_stack[i]]["deps"] # Pretty print what we've captured. indent = "|" + "".ljust(depth, "_") fullpkg = "%s/%s-%s" % (pkgdir, pkgname, version) if VERBOSE: print ("" + indent + " " + pkgdir + "/" + pkgname + " - " + version + " (" + pkgtype + ", " + doins + ", " + deptype + ")") # Add our new package into the tree, if it's not already there. updatedep.setdefault(fullpkg, {}) # Add an empty deps for this new package. updatedep[fullpkg].setdefault("deps", {}) # Add the action we should take (merge, nomerge). updatedep[fullpkg].setdefault("action", doins) # Add the type of dep. updatedep[fullpkg].setdefault("deptype", deptype) # Add the long name of the package updatedep[fullpkg].setdefault("pkgpath", "%s/%s" % (pkgdir, pkgname)) # Add the short name of the package updatedep[fullpkg].setdefault("pkgname", pkgname) # Drop any stack entries below our depth. deps_stack = deps_stack[0:depth] # Add ourselves to the end of the stack. deps_stack.append(fullpkg) elif m_orig: # Also capture "pseudo packages", which are the freeform test # we requested to be installed. These are generic package names # like "chromeos" rather than chromeos/chromeos-0.0.1 depth = 0 # Tag these with "original" in case they overlap with real packages. pkgname = "original-%s" % m_orig.group("pkgname") # Insert this into the deps tree so so we can stick it in "world" updatedep = deps_tree for i in range(0, depth): updatedep = updatedep[deps_stack[i]]["deps"] if VERBOSE: print pkgname # Add our new package into the tree, if it's not already there. updatedep.setdefault(pkgname, {}) updatedep[pkgname].setdefault("deps", {}) # Add the type of dep. updatedep[pkgname].setdefault("action", "world") updatedep[pkgname].setdefault("deptype", "normal") updatedep[pkgname].setdefault("pkgpath", None) updatedep[pkgname].setdefault("pkgname", None) # Drop any obsolete stack entries. deps_stack = deps_stack[0:depth] # Add ourselves to the end of the stack. deps_stack.append(pkgname) elif m_installed: pkgname = m_installed.group("pkgname") pkgdir = m_installed.group("pkgdir") version = m_installed.group("version") oldversion = m_installed.group("oldversion") desc = m_installed.group("desc") uninstall = False if oldversion and (desc.find("U") != -1 or desc.find("D") != -1): uninstall = True replace = desc.find("R") != -1 fullpkg = "%s/%s-%s" % (pkgdir, pkgname, version) deps_info[fullpkg] = {"idx": len(deps_info), "pkgdir": pkgdir, "pkgname": pkgname, "oldversion": oldversion, "uninstall": uninstall, "replace": replace} else: # Is this a package that failed to match our huge regex? m = re_failed.match(line) if m: print "\n".join(lines) print "FAIL: Couldn't understand line:" print line sys.exit(1) return deps_tree, deps_info def PrintTree(deps, depth=""): """Print the deps we have seen in the emerge output. Args: deps: Dependency tree structure. depth: Allows printing the tree recursively, with indentation. """ for entry in deps: action = deps[entry]["action"] print "%s %s (%s)" % (depth, entry, action) PrintTree(deps[entry]["deps"], depth=depth + " ") def GenDependencyGraph(deps_tree, deps_info, package_names): """Generate a doubly linked dependency graph. Args: deps_tree: Dependency tree structure. deps_info: More details on the dependencies. package_names: Names of packages to add to the world file. Returns: Deps graph in the form of a dict of packages, with each package specifying a "needs" list and "provides" list. """ deps_map = {} pkgpaths = {} def ReverseTree(packages): """Convert tree to digraph. Take the tree of package -> requirements and reverse it to a digraph of buildable packages -> packages they unblock. Args: packages: Tree(s) of dependencies. Returns: Unsanitized digraph. """ for pkg in packages: action = packages[pkg]["action"] pkgpath = packages[pkg]["pkgpath"] pkgname = packages[pkg]["pkgname"] pkgpaths[pkgpath] = pkg pkgpaths[pkgname] = pkg this_pkg = deps_map.setdefault( pkg, {"needs": {}, "provides": set(), "action": "nomerge", "workon": False, "cmdline": False}) if action != "nomerge": this_pkg["action"] = action this_pkg["deps_info"] = deps_info.get(pkg) ReverseTree(packages[pkg]["deps"]) for dep, dep_item in packages[pkg]["deps"].items(): dep_pkg = deps_map[dep] dep_type = dep_item["deptype"] if dep_type != "(runtime_post)": dep_pkg["provides"].add(pkg) this_pkg["needs"][dep] = dep_type def RemoveInstalledPackages(): """Remove installed packages, propagating dependencies.""" if "--selective" in EMERGE_OPTS: selective = EMERGE_OPTS["--selective"] != "n" else: selective = "--noreplace" in EMERGE_OPTS or "--update" in EMERGE_OPTS rm_pkgs = set(deps_map.keys()) - set(deps_info.keys()) for pkg, info in deps_info.items(): if selective and not deps_map[pkg]["workon"] and info["replace"]: rm_pkgs.add(pkg) for pkg in rm_pkgs: this_pkg = deps_map[pkg] if this_pkg["cmdline"] and "--oneshot" not in EMERGE_OPTS: # If "cmdline" is set, this is a world update that was passed on the # command-line. Keep these unless we're in --oneshot mode. continue needs = this_pkg["needs"] provides = this_pkg["provides"] for dep in needs: dep_provides = deps_map[dep]["provides"] dep_provides.update(provides) dep_provides.discard(pkg) dep_provides.discard(dep) for target in provides: target_needs = deps_map[target]["needs"] target_needs.update(needs) if pkg in target_needs: del target_needs[pkg] if target in target_needs: del target_needs[target] del deps_map[pkg] def SanitizeDep(basedep, currdep, visited, cycle): """Search for circular deps between basedep and currdep, then recurse. Args: basedep: Original dependency, top of stack. currdep: Bottom of our current recursion, bottom of stack. visited: Nodes visited so far. cycle: Array where cycle of circular dependencies should be stored. TODO(): Break RDEPEND preferentially. Returns: True iff circular dependencies are found. """ if currdep not in visited: visited.add(currdep) for dep in deps_map[currdep]["needs"]: if dep == basedep or SanitizeDep(basedep, dep, visited, cycle): cycle.insert(0, dep) return True return False def SanitizeTree(): """Remove circular dependencies.""" start = time.time() for basedep in deps_map: this_pkg = deps_map[basedep] if this_pkg["action"] == "world": # world file updates can't be involved in cycles, # and they don't have deps_info, so skip them. continue for dep in this_pkg["needs"].copy(): cycle = [] if (deps_info[basedep]["idx"] <= deps_info[dep]["idx"] and SanitizeDep(basedep, dep, set(), cycle)): cycle[:0] = [basedep, dep] print "Breaking cycle:" for i in range(len(cycle) - 1): deptype = deps_map[cycle[i]]["needs"][cycle[i+1]] print " %s -> %s %s" % (cycle[i], cycle[i+1], deptype) del this_pkg["needs"][dep] deps_map[dep]["provides"].remove(basedep) seconds = time.time() - start print "Tree sanitized in %d:%04.1fs" % (seconds / 60, seconds % 60) def AddSecretDeps(): """Find these tagged packages and add extra dependencies. For debugging dependency problems. """ for bad in secret_deps: needed = secret_deps[bad] bad_pkg = None needed_pkg = None for dep in deps_map: if dep.find(bad) != -1: bad_pkg = dep if dep.find(needed) != -1: needed_pkg = dep if bad_pkg and needed_pkg: deps_map[needed_pkg]["provides"].add(bad_pkg) deps_map[bad_pkg]["needs"].add(needed_pkg) def WorkOnChildren(pkg): """Mark this package and all packages it provides as workon packages.""" this_pkg = deps_map[pkg] if this_pkg["workon"]: return False this_pkg["workon"] = True updated = False for w in this_pkg["provides"]: if WorkOnChildren(w): updated = True if this_pkg["action"] == "nomerge": pkgpath = deps_tree[pkg]["pkgpath"] if pkgpath is not None: OPTS["workon"].add(pkgpath) updated = True return updated ReverseTree(deps_tree) AddSecretDeps() if "no-workon-deps" in OPTS: for pkgpath in OPTS["workon"].copy(): pkg = pkgpaths[pkgpath] deps_map[pkg]["workon"] = True else: mergelist_updated = False for pkgpath in OPTS["workon"].copy(): pkg = pkgpaths[pkgpath] if WorkOnChildren(pkg): mergelist_updated = True if mergelist_updated: print "List of packages to merge updated. Recalculate dependencies..." return None for pkgpath in package_names: dep_pkg = deps_map.get("original-%s" % pkgpath) if dep_pkg and len(dep_pkg["needs"]) == 1: dep_pkg["cmdline"] = True RemoveInstalledPackages() SanitizeTree() return deps_map def PrintDepsMap(deps_map): """Print dependency graph, for each package list it's prerequisites.""" for i in deps_map: print "%s: (%s) needs" % (i, deps_map[i]["action"]) for j in deps_map[i]["needs"]: print " %s" % (j) class EmergeQueue(object): """Class to schedule emerge jobs according to a dependency graph.""" def __init__(self, deps_map): # Store the dependency graph. self._deps_map = deps_map # Initialize the runnable queue to empty. self._jobs = [] # List of total package installs represented in deps_map. install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"] self._total_jobs = len(install_jobs) # Initialize the ready queue, these are jobs with no unmet dependencies. self._emerge_queue = [x for x in deps_map if not deps_map[x]["needs"]] # Initialize the failed queue to empty. self._retry_queue = [] self._failed = {} def _LoadAvg(self): loads = open("/proc/loadavg", "r").readline().split()[:3] return " ".join(loads) def _Status(self): """Print status.""" seconds = time.time() - GLOBAL_START line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s " "[Time %dm%ds Load %s]") print line % (len(self._deps_map), len(self._emerge_queue), len(self._jobs), len(self._retry_queue), self._total_jobs, seconds / 60, seconds % 60, self._LoadAvg()) def _LaunchOneEmerge(self, target, action): """Run emerge --nodeps to do a single package install. If this is a pseudopackage, that means we're done, and can select in in the world file. Args: target: The full package name of the package to install. eg. "sys-apps/portage-2.17" Returns: Triplet containing (target name, subprocess object, output buffer object). """ if target.startswith("original-"): # "original-" signifies one of the packages we originally requested. # Since we have explicitly installed the versioned package as a dep of # this, we only need to tag in "world" that we are done with this # install request. # --nodeps: Ignore dependencies -- we handle them internally. # --noreplace: Don't replace or upgrade any packages. (In this case, the # package is already installed, so we are just updating the # world file.) # --selective: Make sure that --noreplace sticks even if --selective=n is # specified by the user on the command-line. # NOTE: If the user specifies --oneshot on the command-line, this command # will do nothing. That is desired, since the user requested not to # update the world file. newtarget = target.replace("original-", "") cmdline = (EmergeCommand() + " --nodeps --selective --noreplace " + newtarget) elif action == "uninstall": cmdline = EmergeCommand() + " --nodeps --unmerge =" + target else: # This package is a dependency of something we specifically # requested. Therefore we should install it but not allow it # in the "world" file, which represents explicit installs. # --oneshot" here will prevent it from being tagged in world. cmdline = EmergeCommand() + " --nodeps --oneshot " this_pkg = self._deps_map[target] if this_pkg["workon"]: # --usepkg=n --getbinpkg=n: Build from source # --selective=n: Re-emerge even if package is already installed. cmdline += "--usepkg=n --getbinpkg=n --selective=n " cmdline += "=" + target deps_info = this_pkg["deps_info"] if deps_info["uninstall"]: package = "%(pkgdir)s/%(pkgname)s-%(oldversion)s" % deps_info cmdline += " && %s -C =%s" % (EmergeCommand(), package) print "+ %s" % cmdline # Store output in a temp file as it is too big for a unix pipe. stdout_buffer = tempfile.TemporaryFile() # Modify the environment to disable locking. portage_env = os.environ.copy() portage_env["PORTAGE_LOCKS"] = "false" portage_env["UNMERGE_DELAY"] = "0" # Autoclean rummages around in the portage database and uninstalls # old packages. It's not parallel safe, so we skip it. Instead, we # handle the cleaning ourselves by uninstalling old versions of any # new packages we install. if not AUTOCLEAN: portage_env["AUTOCLEAN"] = "no" # Launch the subprocess. emerge_proc = subprocess.Popen( cmdline, shell=True, stdout=stdout_buffer, stderr=subprocess.STDOUT, bufsize=64*1024, env=portage_env) return (target, emerge_proc, stdout_buffer) def _Finish(self, target): """Mark a target as completed and unblock dependecies.""" for dep in self._deps_map[target]["provides"]: del self._deps_map[dep]["needs"][target] if not self._deps_map[dep]["needs"]: if VERBOSE: print "Unblocking %s" % dep self._emerge_queue.append(dep) self._deps_map.pop(target) def _Retry(self): if self._retry_queue: target = self._retry_queue.pop(0) self._emerge_queue.append(target) print "Retrying emerge of %s." % target def Run(self): """Run through the scheduled ebuilds. Keep running so long as we have uninstalled packages in the dependency graph to merge. """ secs = 0 max_jobs = EMERGE_OPTS.get("--jobs", 256) while self._deps_map: # If we have packages that are ready, kick them off. if self._emerge_queue and len(self._jobs) < max_jobs: target = self._emerge_queue.pop(0) action = self._deps_map[target]["action"] # We maintain a tree of all deps, if this doesn't need # to be installed just free up it's children and continue. # It is possible to reinstall deps of deps, without reinstalling # first level deps, like so: # chromeos (merge) -> eselect (nomerge) -> python (merge) if action == "nomerge": self._Finish(target) else: # Kick off the build if it's marked to be built. print "Emerging %s (%s)" % (target, action) job = self._LaunchOneEmerge(target, action) # Append it to the active jobs list. self._jobs.append(job) continue # Wait a bit to see if maybe some jobs finish. You can't # wait on a set of jobs in python, so we'll just poll. time.sleep(1) secs += 1 if secs % 30 == 0: # Print an update. self._Status() # Check here that we are actually waiting for something. if (not self._emerge_queue and not self._jobs and self._deps_map): # If we have failed on a package, retry it now. if self._retry_queue: self._Retry() # If we have failed a package twice, just give up. elif self._failed: for failure, output in self._failed.items(): print "Package failed: %s" % failure print output PrintDepsMap(self._deps_map) print "Packages failed: %s" % " ,".join(self._failed.keys()) sys.exit(1) # If we have dependency cycles. else: print "Deadlock! Circular dependencies!" PrintDepsMap(self._deps_map) sys.exit(1) # Check every running job to see if we've finished any jobs. for target, job, stdout in self._jobs: # Is it done? if job.poll() is not None: # Clean up the subprocess. job.wait() # Get the output if we want to print it. stdout.seek(0) output = stdout.read() # Remove from active jobs list, we are done with this process. self._jobs.remove((target, job, stdout)) # Print if necessary. if VERBOSE or job.returncode != 0: print output if job.returncode != 0: # Handle job failure. if target in self._failed: # If this job has failed previously, give up. print "Failed %s. Your build has failed." % target else: # Queue up this build to try again after a long while. self._retry_queue.append(target) self._failed[target] = output print "Failed %s, retrying later." % target else: if target in self._failed and self._retry_queue: # If we have successfully retried a failed package, and there # are more failed packages, try the next one. We will only have # one retrying package actively running at a time. self._Retry() print "Completed %s" % target # Mark as completed and unblock waiting ebuilds. self._Finish(target) # Print an update. self._Status() # Main control code. OPTS, EMERGE_ACTION, EMERGE_OPTS, EMERGE_FILES = ParseArgs(sys.argv) if EMERGE_ACTION is not None: # Pass action arguments straight through to emerge EMERGE_OPTS["--%s" % EMERGE_ACTION] = True sys.exit(os.system(EmergeCommand())) elif not EMERGE_FILES: Usage() sys.exit(1) print "Starting fast-emerge." print " Building package %s on %s" % (" ".join(EMERGE_FILES), OPTS.get("board", "root")) # If the user supplied the --workon option, we may have to run emerge twice # to generate a dependency ordering for packages that depend on the workon # packages. for it in range(2): print "Running emerge to generate deps" deps_output = GetDepsFromPortage(" ".join(EMERGE_FILES)) print "Processing emerge output" dependency_tree, dependency_info = DepsToTree(deps_output) if VERBOSE: print "Print tree" PrintTree(dependency_tree) print "Generate dependency graph." dependency_graph = GenDependencyGraph(dependency_tree, dependency_info, EMERGE_FILES) if dependency_graph is not None: break else: print "Can't crack cycle" sys.exit(1) if VERBOSE: PrintDepsMap(dependency_graph) # Run the queued emerges. scheduler = EmergeQueue(dependency_graph) scheduler.Run() print "Done"