flatcar-scripts/parallel_emerge
David James a27ae994b5 Robustify package upgrades and dependency checking.
- Unmerge appropriate packages during upgrades and downgrades.
- Calculate time spent in dependency generation to the tenth of a second.
- Only track dependencies of packages that are actually being installed.
- Ignore PDEPEND, as it has no impact on dependency ordering.
- Only break dependency chains that go against Portage's install order.
- Rename Failed -> Retrying.
- Print emerge command lines as they are run.

TEST=Emerged hard-host-depends and ran build_packages with parallel_emerge
BUG=none

Review URL: http://codereview.chromium.org/2886010
2010-07-01 20:52:59 -07:00

662 lines
23 KiB
Python
Executable File

#!/usr/bin/python2.6
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Program to run emerge in parallel, for significant speedup.
Usage:
./parallel_emerge --board=BOARD [emerge args] package
Basic operation:
Runs 'emerge -p --debug' to display dependencies, and stores a
dependency graph. All non-blocked packages are launched in parallel,
as 'emerge --nodeps package' with any blocked packages being emerged
immediately upon deps being met.
For this to work effectively, /usr/lib/portage/pym/portage/locks.py
must be stubbed out, preventing portage from slowing itself with
unneccesary locking, as this script ensures that emerge is run in such
a way that common resources are never in conflict. This is controlled
by an environment variable PORTAGE_LOCKS set in parallel emerge
subprocesses.
Parallel Emerge unlocks two things during operation, here's what you
must do to keep this safe:
* Storage dir containing binary packages. - Don't emerge new
packages while installing the existing ones.
* Portage database - You must not examine deps while modifying the
database. Therefore you may only parallelize "-p" read only access,
or "--nodeps" write only access.
Caveats:
* Some ebuild packages have incorrectly specified deps, and running
them in parallel is more likely to bring out these failures.
* Portage "world" is a record of explicitly installed packages. In
this parallel scheme, explicitly installed packages are installed
twice, once for the real install, and once for world file addition.
* Some ebuilds (especially the build part) have complex dependencies
that are not captured well by this script (it may be necessary to
install an old package to build, but then install a newer version
of the same package for a runtime dep). This script is only
currently stable for binpkg installs.
"""
import os
import re
import shlex
import subprocess
import sys
import tempfile
import time
def Usage():
print "Usage:"
print " ./parallel_emerge --board=BOARD [emerge args] package"
sys.exit(1)
# These are dependencies that are not specified in the package,
# but will prevent the package from installing.
secret_deps = {}
# Globals: package we are building, board we are targeting,
# emerge args we are passing through.
PACKAGE = None
EMERGE_ARGS = ""
BOARD = None
# Runtime flags. TODO(): maybe make these commandline options or
# environment veriables.
VERBOSE = False
AUTOCLEAN = False
def ParseArgs(argv):
"""Set global vars based on command line.
We need to be compatible with emerge arg format.
We scrape --board-XXX, and distinguish between args
and package names.
TODO(): robustify argument processing, as it's possible to
pass in many two argument parameters that are difficult
to programmaitcally identify, although we don't currently
use any besides --bdeps <y|n>.
Args:
argv: arguments list
Returns:
triplet of (package list, emerge argumens, board string)
"""
if VERBOSE:
print argv
board_arg = None
package_args = []
emerge_passthru_args = ""
re_board = re.compile(r"--board=(?P<board>.*)")
for arg in argv[1:]:
# Check if the arg begins with '-'
if arg[0] == "-" or arg == "y" or arg == "n":
# Specifically match "--board="
m = re_board.match(arg)
if m:
board_arg = m.group("board")
else:
# Pass through to emerge.
emerge_passthru_args = emerge_passthru_args + " " + arg
else:
# Only non-dashed arg should be the target package.
package_args.append(arg)
if not package_args:
Usage()
sys.exit(1)
# Set globals.
return " ".join(package_args), emerge_passthru_args, board_arg
def EmergeCommand():
"""Helper function to return the base emerge commandline.
This is configured for board type, and including pass thru args,
using global variables. TODO(): unglobalfy.
Returns:
string containing emerge command.
"""
emerge = "emerge"
if BOARD:
emerge += "-" + BOARD
return emerge + " " + EMERGE_ARGS
def GetDepsFromPortage(package):
"""Get dependency tree info by running emerge.
Run 'emerge -p --debug package', and get a text output of all deps.
TODO(): Put dep caclation in a library, as cros_extract_deps
also uses this code.
Args:
package: string containing the packages to build.
Returns:
text output of emerge -p --debug, which can be processed elsewhere.
"""
print "Calculating deps for package %s" % package
cmdline = EmergeCommand() + " -p --debug --color=n " + package
print "+ %s" % cmdline
# Store output in a temp file as it is too big for a unix pipe.
stderr_buffer = tempfile.TemporaryFile()
stdout_buffer = tempfile.TemporaryFile()
# Launch the subprocess.
start = time.time()
depsproc = subprocess.Popen(shlex.split(cmdline), stderr=stderr_buffer,
stdout=stdout_buffer, bufsize=64*1024)
depsproc.wait()
seconds = time.time() - start
print "Deps calculated in %d:%04.1fs" % (seconds / 60, seconds % 60)
stderr_buffer.seek(0)
stderr_raw = stderr_buffer.read()
info_start = stderr_raw.find("digraph")
stdout_buffer.seek(0)
stdout_raw = stdout_buffer.read()
lines = []
if info_start != -1:
lines = stderr_raw[info_start:].split("\n")
lines.extend(stdout_raw.split("\n"))
if VERBOSE or depsproc.returncode != 0:
output = stderr_raw + stdout_raw
print output
if depsproc.returncode != 0:
print "Failed to generate deps"
sys.exit(1)
return lines
def DepsToTree(lines):
"""Regex the output from 'emerge --debug' to generate a nested dict of deps.
Args:
lines: output from 'emerge -p --debug package'
Returns:
dep_tree: nested dict of dependencies, as specified by emerge.
there may be dupes, or circular deps.
We need to regex lines as follows:
hard-host-depends depends on
('ebuild', '/', 'dev-lang/swig-1.3.36', 'merge') depends on
('ebuild', '/', 'dev-lang/perl-5.8.8-r8', 'merge') (buildtime)
('binary', '/.../rootfs/', 'sys-auth/policykit-0.9-r1', 'merge') depends on
('binary', '/.../rootfs/', 'x11-misc/xbitmaps-1.1.0', 'merge') (no children)
"""
re_deps = re.compile(r"(?P<indent>\W*)\(\'(?P<pkgtype>\w+)\', "
r"\'(?P<destination>[\w/\.-]+)\',"
r" \'(?P<pkgdir>[\w\+-]+)/(?P<pkgname>[\w\+-]+)-"
r"(?P<version>\d+[\w\.-]*)\', \'(?P<action>\w+)\'\) "
r"(?P<deptype>(depends on|\(.*\)))")
re_origdeps = re.compile(r"(?P<pkgname>[\w\+/-]+) depends on")
re_installed_package = re.compile(
r"\[(?P<desc>[^\]]*)\] "
r"(?P<pkgdir>[\w\+-]+)/"
r"(?P<pkgname>[\w\+-]+)-"
r"(?P<version>\d+[\w\.-]*)( \["
r"(?P<oldversion>\d+[\w\.-]*)\])?"
)
re_failed = re.compile(r".*depends on.*")
deps_tree = {}
deps_stack = []
deps_info = {}
for line in lines:
m = re_deps.match(line)
m_orig = re_origdeps.match(line)
m_installed = re_installed_package.match(line)
if m:
pkgname = m.group("pkgname")
pkgdir = m.group("pkgdir")
pkgtype = m.group("pkgtype")
indent = m.group("indent")
doins = m.group("action")
deptype = m.group("deptype")
depth = 1
if not indent:
depth = 0
version = m.group("version")
# If we are indented, we should have
# found a "depends on" previously.
if len(deps_stack) < depth:
print "FAIL: corrupt input at:"
print line
print "No Parent."
sys.exit(1)
# Go step by step through stack and tree
# until we find our parent. Generate
updatedep = deps_tree
for i in range(0, depth):
updatedep = updatedep[deps_stack[i]]["deps"]
# Pretty print what we've captured.
indent = "|" + "".ljust(depth, "_")
fullpkg = "%s/%s-%s" % (pkgdir, pkgname, version)
if VERBOSE:
print ("" + indent + " " + pkgdir + "/" + pkgname + " - " +
version + " (" + pkgtype + ", " + doins +
", " + deptype + ")")
# Add our new package into the tree, if it's not already there.
updatedep.setdefault(fullpkg, {})
# Add an empty deps for this new package.
updatedep[fullpkg].setdefault("deps", {})
# Add the action we should take (merge, nomerge).
updatedep[fullpkg].setdefault("action", doins)
# Add the type of dep.
updatedep[fullpkg].setdefault("deptype", deptype)
# Drop any stack entries below our depth.
deps_stack = deps_stack[0:depth]
# Add ourselves to the end of the stack.
deps_stack.append(fullpkg)
elif m_orig:
# Also capture "pseudo packages", which are the freeform test
# we requested to be installed. These are generic package names
# like "chromeos" rather than chromeos/chromeos-0.0.1
depth = 0
# Tag these with "original" in case they overlap with real packages.
pkgname = "original-%s" % m_orig.group("pkgname")
# Insert this into the deps tree so so we can stick it in "world"
updatedep = deps_tree
for i in range(0, depth):
updatedep = updatedep[deps_stack[i]]["deps"]
if VERBOSE:
print pkgname
# Add our new package into the tree, if it's not already there.
updatedep.setdefault(pkgname, {})
updatedep[pkgname].setdefault("deps", {})
# Add the type of dep.
updatedep[pkgname].setdefault("action", "world")
updatedep[pkgname].setdefault("deptype", "normal")
# Drop any obsolete stack entries.
deps_stack = deps_stack[0:depth]
# Add ourselves to the end of the stack.
deps_stack.append(pkgname)
elif m_installed:
pkgname = m_installed.group("pkgname")
pkgdir = m_installed.group("pkgdir")
version = m_installed.group("version")
oldversion = m_installed.group("oldversion")
desc = m_installed.group("desc")
uninstall = False
if oldversion and (desc.find("U") != -1 or desc.find("D") != -1):
uninstall = True
fullpkg = "%s/%s-%s" % (pkgdir, pkgname, version)
deps_info[fullpkg] = {"idx": len(deps_info),
"pkgdir": pkgdir,
"pkgname": pkgname,
"oldversion": oldversion,
"uninstall": uninstall}
else:
# Is this a package that failed to match our huge regex?
m = re_failed.match(line)
if m:
print "FAIL: Couldn't understand line:"
print line
sys.exit(1)
return deps_tree, deps_info
def PrintTree(deps, depth=""):
"""Print the deps we have seen in the emerge output.
Args:
deps: dependency tree structure.
depth: allows printing the tree recursively, with indentation.
"""
for entry in deps:
action = deps[entry]["action"]
print "%s %s (%s)" % (depth, entry, action)
PrintTree(deps[entry]["deps"], depth=depth + " ")
def GenDependencyGraph(deps_tree, deps_info):
"""Generate a doubly linked dependency graph.
Args:
deps_tree: dependency tree structure.
deps_info: more info on the dependencies.
Returns:
Deps graph in the form of a dict of packages, with each package
specifying a "needs" list and "provides" list.
"""
deps_map = {}
def ReverseTree(packages):
"""Convert tree to digraph.
Take the tree of package -> requirements and reverse it to a digraph of
buildable packages -> packages they unblock
Args:
packages: tree(s) of dependencies
Returns:
unsanitized digraph
"""
for pkg in packages:
action = packages[pkg]["action"]
this_pkg = deps_map.setdefault(
pkg, {"needs": set(), "provides": set(), "action": "nomerge"})
if action != "nomerge":
this_pkg["action"] = action
this_pkg["deps_info"] = deps_info.get(pkg)
ReverseTree(packages[pkg]["deps"])
for dep, dep_item in packages[pkg]["deps"].items():
dep_pkg = deps_map[dep]
dep_type = dep_item["deptype"]
if dep_type != "(runtime_post)":
dep_pkg["provides"].add(pkg)
this_pkg["needs"].add(dep)
def RemoveInstalledPackages():
"""Remove installed packages, propagating dependencies"""
rm_pkgs = set(deps_map.keys()) - set(deps_info.keys())
for pkg in rm_pkgs:
this_pkg = deps_map[pkg]
needs = this_pkg["needs"]
provides = this_pkg["provides"]
for dep in needs:
dep_provides = deps_map[dep]["provides"]
dep_provides.update(provides)
dep_provides.discard(pkg)
dep_provides.discard(dep)
for target in provides:
target_needs = deps_map[target]["needs"]
target_needs.update(needs)
target_needs.discard(pkg)
target_needs.discard(target)
del deps_map[pkg]
def SanitizeDep(basedep, currdep, oldstack, limit):
"""Search for circular deps between basedep and currdep, then recurse.
Args:
basedep: original dependency, top of stack.
currdep: bottom of our current recursion, bottom of stack.
oldstack: current dependency chain.
limit: how many more levels of recusion to go through, max.
TODO(): Break RDEPEND preferentially.
Returns:
True iff circular dependencies are found.
"""
if limit == 0:
return
for dep in deps_map[currdep]["needs"]:
stack = oldstack + [dep]
if basedep in deps_map[dep]["needs"] or dep == basedep:
if dep != basedep:
stack += [basedep]
print "Remove cyclic dependency from:"
for i in xrange(0, len(stack) - 1):
print " %s -> %s " % (stack[i], stack[i+1])
return True
if dep not in oldstack and SanitizeDep(basedep, dep, stack, limit - 1):
return True
return
def SanitizeTree():
"""Remove circular dependencies up to cycle length 32."""
start = time.time()
for basedep in deps_map:
for dep in deps_map[basedep]["needs"].copy():
if deps_info[basedep]["idx"] <= deps_info[dep]["idx"]:
if SanitizeDep(basedep, dep, [basedep, dep], 31):
print "Breaking", basedep, " -> ", dep
deps_map[basedep]["needs"].remove(dep)
deps_map[dep]["provides"].remove(basedep)
seconds = time.time() - start
print "Tree sanitized in %d:%04.1fs" % (seconds / 60, seconds % 60)
def AddSecretDeps():
"""Find these tagged packages and add extra dependencies.
For debugging dependency problems.
"""
for bad in secret_deps:
needed = secret_deps[bad]
bad_pkg = None
needed_pkg = None
for dep in deps_map:
if dep.find(bad) != -1:
bad_pkg = dep
if dep.find(needed) != -1:
needed_pkg = dep
if bad_pkg and needed_pkg:
deps_map[needed_pkg]["provides"].add(bad_pkg)
deps_map[bad_pkg]["needs"].add(needed_pkg)
ReverseTree(deps_tree)
AddSecretDeps()
RemoveInstalledPackages()
SanitizeTree()
return deps_map
def PrintDepsMap(deps_map):
"""Print dependency graph, for each package list it's prerequisites."""
for i in deps_map:
print "%s: (%s) needs" % (i, deps_map[i]["action"])
for j in deps_map[i]["needs"]:
print " %s" % (j)
class EmergeQueue(object):
"""Class to schedule emerge jobs according to a dependency graph."""
def __init__(self, deps_map):
# Store the dependency graph.
self._deps_map = deps_map
# Initialize the runnable queue to empty.
self._jobs = []
# List of total package installs represented in deps_map.
install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"]
self._total_jobs = len(install_jobs)
# Initialize the ready queue, these are jobs with no unmet dependencies.
self._emerge_queue = [x for x in deps_map if not deps_map[x]["needs"]]
# Initialize the failed queue to empty.
self._retry_queue = []
self._failed = {}
def _Status(self):
"""Print status."""
print "Pending %s, Ready %s, Running %s, Retrying %s, Total %s" % (
len(self._deps_map), len(self._emerge_queue),
len(self._jobs), len(self._retry_queue), self._total_jobs)
def _LaunchOneEmerge(self, target):
"""Run emerge --nodeps to do a single package install.
If this is a pseudopackage, that means we're done, and can select in in the
world file.
Args:
target: the full package name of the package to install.
eg. "sys-apps/portage-2.17"
Returns:
triplet containing (target name, subprocess object, output buffer object)
"""
if target.startswith("original-"):
# "original-" signifies one of the packages we originally requested.
# Since we have explicitly installed the versioned package as a dep of
# this, we only need to tag in "world" that we are done with this
# install request. "--select -n" indicates an addition to "world"
# without an actual install.
newtarget = target.replace("original-", "")
cmdline = EmergeCommand() + " --nodeps --select --noreplace " + newtarget
else:
# This package is a dependency of something we specifically
# requested. Therefore we should install it but not allow it
# in the "world" file, which represents explicit intalls.
# "--oneshot" here will prevent it from being tagged in world.
cmdline = EmergeCommand() + " --nodeps --oneshot =" + target
deps_info = self._deps_map[target]["deps_info"]
if deps_info["uninstall"]:
package = "%(pkgdir)s/%(pkgname)s-%(oldversion)s" % deps_info
cmdline += " && %s -1C =%s" % (EmergeCommand(), package)
print "+ %s" % cmdline
# Store output in a temp file as it is too big for a unix pipe.
stdout_buffer = tempfile.TemporaryFile()
# Modify the environment to disable locking.
portage_env = os.environ.copy()
portage_env["PORTAGE_LOCKS"] = "false"
portage_env["UNMERGE_DELAY"] = "0"
# Autoclean rummages around in the portage database and uninstalls
# old packages. Definitely not necessary for build_image. However
# it may be necessary for incremental build_packages. It may also
# not be parallel safe.
if not AUTOCLEAN:
portage_env["AUTOCLEAN"] = "no"
# Launch the subprocess.
emerge_proc = subprocess.Popen(
cmdline, shell=True, stdout=stdout_buffer,
stderr=subprocess.STDOUT, bufsize=64*1024, env=portage_env)
return (target, emerge_proc, stdout_buffer)
def _Finish(self, target):
"""Mark a target as completed and unblock dependecies."""
for dep in self._deps_map[target]["provides"]:
self._deps_map[dep]["needs"].remove(target)
if not self._deps_map[dep]["needs"]:
if VERBOSE:
print "Unblocking %s" % dep
self._emerge_queue.append(dep)
self._deps_map.pop(target)
def _Retry(self):
if self._retry_queue:
target = self._retry_queue.pop(0)
self._emerge_queue.append(target)
print "Retrying emerge of %s." % target
def Run(self):
"""Run through the scheduled ebuilds.
Keep running so long as we have uninstalled packages in the
dependency graph to merge.
"""
while self._deps_map:
# If we have packages that are ready, kick them off.
if self._emerge_queue:
target = self._emerge_queue.pop(0)
action = self._deps_map[target]["action"]
# We maintain a tree of all deps, if this doesn't need
# to be installed just free up it's children and continue.
# It is possible to reinstall deps of deps, without reinstalling
# first level deps, like so:
# chromeos (merge) -> eselect (nomerge) -> python (merge)
if action == "nomerge":
self._Finish(target)
else:
# Kick off the build if it's marked to be built.
print "Emerging %s (%s)" % (target, action)
job = self._LaunchOneEmerge(target)
# Append it to the active jobs list.
self._jobs.append(job)
continue
# Wait a bit to see if maybe some jobs finish. You can't
# wait on a set of jobs in python, so we'll just poll.
time.sleep(1)
# Check here that we are actually waiting for something.
if (not self._emerge_queue and
not self._jobs and
self._deps_map):
# If we have failed on a package retry it now.
if self._retry_queue:
self._Retry()
# If we have failed a package twice, just give up.
elif self._failed:
for failure, output in self._failed.items():
print "Package failed: %s" % failure
print output
PrintDepsMap(self._deps_map)
print "Packages failed: %s" % " ,".join(self._failed.keys())
sys.exit(1)
# If we have dependency cycles.
else:
print "Deadlock! Circular dependencies!"
PrintDepsMap(self._deps_map)
sys.exit(1)
# Check every running job to see if we've finished any jobs.
for target, job, stdout in self._jobs:
# Is it done?
if job.poll() is not None:
# Clean up the subprocess.
job.wait()
# Get the output if we want to print it.
stdout.seek(0)
output = stdout.read()
# Remove from active jobs list, we are done with this process.
self._jobs.remove((target, job, stdout))
# Print if necessary.
if VERBOSE:
print output
if job.returncode != 0:
# Handle job failure.
if target in self._failed:
# If this job has failed previously, give up.
print "Failed %s. Your build has failed." % target
else:
# Queue up this build to try again after a long while.
self._retry_queue.append(target)
self._failed[target] = output
print "Failed %s, retrying later." % target
else:
if target in self._failed and self._retry_queue:
# If we have successfully retried a failed package, and there
# are more failed packages, try the next one. We will only have
# one retrying package actively running at a time.
self._Retry()
print "Completed %s" % target
# Mark as completed and unblock waiting ebuilds.
self._Finish(target)
# Print an update.
self._Status()
# Main control code.
print "Starting fast-emerge."
PACKAGE, EMERGE_ARGS, BOARD = ParseArgs(sys.argv)
print " Building package %s on %s (%s)" % (PACKAGE, EMERGE_ARGS, BOARD)
print "Running emerge to generate deps"
deps_output = GetDepsFromPortage(PACKAGE)
print "Processing emerge output"
dependency_tree, dependency_info = DepsToTree(deps_output)
if VERBOSE:
print "Print tree"
PrintTree(dependency_tree)
print "Generate dependency graph."
dependency_graph = GenDependencyGraph(dependency_tree, dependency_info)
if VERBOSE:
PrintDepsMap(dependency_graph)
# Run the queued emerges.
scheduler = EmergeQueue(dependency_graph)
scheduler.Run()
print "Done"