mirror of
https://github.com/flatcar/scripts.git
synced 2025-08-08 05:26:58 +02:00
extra noise in the final exception output. This change stops catching and rethrowing the same exception, and generally makes the RunCommand slightly more readable (if longer). Some unit tests are added, but they only test RunCommand, and not all options to RunCommand. BUG=chromium-os:11717 TEST=Manual, and new lib/cros_build_lib_unittest Review URL: http://codereview.chromium.org/6576016 Change-Id: Ibe9b13ff302621e93196008f86914cb65a232d61
274 lines
8.6 KiB
Python
274 lines
8.6 KiB
Python
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""Common python commands used by various build scripts."""
|
|
|
|
import inspect
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
|
|
_STDOUT_IS_TTY = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
|
|
|
|
# TODO(sosa): Move logging to logging module.
|
|
|
|
class RunCommandException(Exception):
|
|
"""Raised when there is an error in RunCommand."""
|
|
pass
|
|
|
|
|
|
def GetCallerName():
|
|
"""Returns the name of the calling module with __main__."""
|
|
top_frame = inspect.stack()[-1][0]
|
|
return os.path.basename(top_frame.f_code.co_filename)
|
|
|
|
|
|
def RunCommand(cmd, print_cmd=True, error_ok=False, error_message=None,
|
|
exit_code=False, redirect_stdout=False, redirect_stderr=False,
|
|
cwd=None, input=None, enter_chroot=False, num_retries=0):
|
|
"""Runs a shell command.
|
|
|
|
Arguments:
|
|
cmd: cmd to run. Should be input to subprocess.POpen. If a string,
|
|
converted to an array using split().
|
|
print_cmd: prints the command before running it.
|
|
error_ok: does not raise an exception on error.
|
|
error_message: prints out this message when an error occurrs.
|
|
exit_code: returns the return code of the shell command.
|
|
redirect_stdout: returns the stdout.
|
|
redirect_stderr: holds stderr output until input is communicated.
|
|
cwd: the working directory to run this cmd.
|
|
input: input to pipe into this command through stdin.
|
|
enter_chroot: this command should be run from within the chroot. If set,
|
|
cwd must point to the scripts directory.
|
|
num_retries: the number of retries to perform before dying
|
|
|
|
Returns:
|
|
If exit_code is True, returns the return code of the shell command.
|
|
Else returns the output of the shell command.
|
|
|
|
Raises:
|
|
Exception: Raises RunCommandException on error with optional error_message.
|
|
"""
|
|
# Set default for variables.
|
|
stdout = None
|
|
stderr = None
|
|
stdin = None
|
|
output = ''
|
|
|
|
# Modify defaults based on parameters.
|
|
if redirect_stdout: stdout = subprocess.PIPE
|
|
if redirect_stderr: stderr = subprocess.PIPE
|
|
if input: stdin = subprocess.PIPE
|
|
if enter_chroot: cmd = ['./enter_chroot.sh', '--'] + cmd
|
|
|
|
# Print out the command before running.
|
|
if print_cmd:
|
|
Info('PROGRAM(%s) -> RunCommand: %r in dir %s' %
|
|
(GetCallerName(), cmd, cwd))
|
|
|
|
for retry_count in range(num_retries + 1):
|
|
|
|
# If it's not the first attempt, it's a retry
|
|
if retry_count > 0 and print_cmd:
|
|
Info('PROGRAM(%s) -> RunCommand: retrying %r in dir %s' %
|
|
(GetCallerName(), cmd, cwd))
|
|
|
|
proc = subprocess.Popen(cmd, cwd=cwd, stdin=stdin,
|
|
stdout=stdout, stderr=stderr)
|
|
(output, error) = proc.communicate(input)
|
|
|
|
# if the command worked, don't retry any more.
|
|
if proc.returncode == 0:
|
|
break
|
|
|
|
# If the command (and all retries) failed, handle error result
|
|
if proc.returncode != 0:
|
|
if error_ok:
|
|
if print_cmd:
|
|
Warning('Command "%r" failed.\n' % (cmd) +
|
|
(error_message or error or output or ''))
|
|
else:
|
|
raise RunCommandException('Command "%r" failed.\n' % (cmd) +
|
|
(error_message or error or output or ''))
|
|
|
|
# return final result
|
|
if exit_code:
|
|
return proc.returncode
|
|
else:
|
|
return output
|
|
|
|
|
|
def RunCommandCaptureOutput(cmd, print_cmd=True, cwd=None, input=None,
|
|
enter_chroot=False,
|
|
combine_stdout_stderr=True,
|
|
verbose=False):
|
|
"""Runs a shell command. Differs from RunCommand, because it allows
|
|
you to run a command and capture the exit code, output, and stderr
|
|
all at the same time.
|
|
|
|
Arguments:
|
|
cmd: cmd to run. Should be input to subprocess.POpen. If a string,
|
|
converted to an array using split().
|
|
print_cmd: prints the command before running it.
|
|
cwd: the working directory to run this cmd.
|
|
input: input to pipe into this command through stdin.
|
|
enter_chroot: this command should be run from within the chroot. If set,
|
|
cwd must point to the scripts directory.
|
|
combine_stdout_stderr -- combine outputs together.
|
|
verbose -- also echo cmd.stdout and cmd.stderr to stdout and stderr
|
|
|
|
Returns:
|
|
Returns a tuple: (exit_code, stdout, stderr) (integer, string, string)
|
|
stderr is None if combine_stdout_stderr is True
|
|
"""
|
|
# Set default for variables.
|
|
stdout = subprocess.PIPE
|
|
stderr = subprocess.PIPE
|
|
stdin = None
|
|
|
|
# Modify defaults based on parameters.
|
|
if input: stdin = subprocess.PIPE
|
|
if combine_stdout_stderr: stderr = subprocess.STDOUT
|
|
|
|
if enter_chroot: cmd = ['./enter_chroot.sh', '--'] + cmd
|
|
|
|
# Print out the command before running.
|
|
if print_cmd:
|
|
Info('PROGRAM(%s) -> RunCommand: %r in dir %s' %
|
|
(GetCallerName(), cmd, cwd))
|
|
|
|
proc = subprocess.Popen(cmd, cwd=cwd, stdin=stdin,
|
|
stdout=stdout, stderr=stderr)
|
|
output, error = proc.communicate(input)
|
|
|
|
if verbose:
|
|
if output: sys.stdout.write(output)
|
|
if error: sys.stderr.write(error)
|
|
|
|
# Error is None if stdout, stderr are combined.
|
|
return proc.returncode, output, error
|
|
|
|
|
|
class Color(object):
|
|
"""Conditionally wraps text in ANSI color escape sequences."""
|
|
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
|
|
BOLD = -1
|
|
COLOR_START = '\033[1;%dm'
|
|
BOLD_START = '\033[1m'
|
|
RESET = '\033[0m'
|
|
|
|
def __init__(self, enabled=True):
|
|
self._enabled = enabled
|
|
|
|
def Color(self, color, text):
|
|
"""Returns text with conditionally added color escape sequences.
|
|
|
|
Keyword arguments:
|
|
color: Text color -- one of the color constants defined in this class.
|
|
text: The text to color.
|
|
|
|
Returns:
|
|
If self._enabled is False, returns the original text. If it's True,
|
|
returns text with color escape sequences based on the value of color.
|
|
"""
|
|
if not self._enabled:
|
|
return text
|
|
if color == self.BOLD:
|
|
start = self.BOLD_START
|
|
else:
|
|
start = self.COLOR_START % (color + 30)
|
|
return start + text + self.RESET
|
|
|
|
|
|
def Die(message):
|
|
"""Emits a red error message and halts execution.
|
|
|
|
Keyword arguments:
|
|
message: The message to be emitted before exiting.
|
|
"""
|
|
print >> sys.stderr, (
|
|
Color(_STDOUT_IS_TTY).Color(Color.RED, '\nERROR: ' + message))
|
|
sys.stderr.flush()
|
|
sys.exit(1)
|
|
|
|
|
|
def Warning(message):
|
|
"""Emits a yellow warning message and continues execution.
|
|
|
|
Keyword arguments:
|
|
message: The message to be emitted.
|
|
"""
|
|
print >> sys.stderr, (
|
|
Color(_STDOUT_IS_TTY).Color(Color.YELLOW, '\nWARNING: ' + message))
|
|
sys.stderr.flush()
|
|
|
|
|
|
def Info(message):
|
|
"""Emits a blue informational message and continues execution.
|
|
|
|
Keyword arguments:
|
|
message: The message to be emitted.
|
|
"""
|
|
print >> sys.stderr, (
|
|
Color(_STDOUT_IS_TTY).Color(Color.BLUE, '\nINFO: ' + message))
|
|
sys.stderr.flush()
|
|
|
|
|
|
def FindRepoDir(path=None):
|
|
"""Returns the nearest higher-level repo dir from the specified path.
|
|
|
|
Args:
|
|
path: The path to use. Defaults to cwd.
|
|
"""
|
|
if path is None:
|
|
path = os.getcwd()
|
|
path = os.path.abspath(path)
|
|
while path != '/':
|
|
repo_dir = os.path.join(path, '.repo')
|
|
if os.path.isdir(repo_dir):
|
|
return repo_dir
|
|
path = os.path.dirname(path)
|
|
return None
|
|
|
|
|
|
def ReinterpretPathForChroot(path):
|
|
"""Returns reinterpreted path from outside the chroot for use inside.
|
|
|
|
Keyword arguments:
|
|
path: The path to reinterpret. Must be in src tree.
|
|
"""
|
|
root_path = os.path.join(FindRepoDir(path), '..')
|
|
|
|
path_abs_path = os.path.abspath(path)
|
|
root_abs_path = os.path.abspath(root_path)
|
|
|
|
# Strip the repository root from the path and strip first /.
|
|
relative_path = path_abs_path.replace(root_abs_path, '')[1:]
|
|
|
|
if relative_path == path_abs_path:
|
|
raise Exception('Error: path is outside your src tree, cannot reinterpret.')
|
|
|
|
new_path = os.path.join('/home', os.getenv('USER'), 'trunk', relative_path)
|
|
return new_path
|
|
|
|
|
|
def GetIPAddress(device='eth0'):
|
|
"""Returns the IP Address for a given device using ifconfig.
|
|
|
|
socket.gethostname() is insufficient for machines where the host files are
|
|
not set up "correctly." Since some of our builders may have this issue,
|
|
this method gives you a generic way to get the address so you are reachable
|
|
either via a VM or remote machine on the same network.
|
|
"""
|
|
ifconfig_output = RunCommand(['/sbin/ifconfig', device],
|
|
redirect_stdout=True, print_cmd=False)
|
|
match = re.search('.*inet addr:(\d+\.\d+\.\d+\.\d+).*', ifconfig_output)
|
|
if match:
|
|
return match.group(1)
|
|
else:
|
|
Warning('Failed to find ip address in %s' % ifconfig_output)
|
|
return None
|