Drop unused scripts

The scripts were imported by some scripts from
coreos-base/cros-devutils, which we have dropped already. So the
scripts in the lib directory are currently an unused baggage.
This commit is contained in:
Krzesimir Nowak 2021-08-13 18:15:42 +02:00
parent d67a8f04dc
commit 421de42db5
3 changed files with 0 additions and 516 deletions

View File

@ -1,246 +0,0 @@
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Common python commands used by various build scripts."""
import inspect
import os
import subprocess
import sys
_STDOUT_IS_TTY = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
# TODO(sosa): Move logging to logging module.
class RunCommandException(Exception):
"""Raised when there is an error in RunCommand."""
pass
def _GetCallerName():
"""Returns the name of the calling module with __main__."""
top_frame = inspect.stack()[-1][0]
return os.path.basename(top_frame.f_code.co_filename)
def RunCommand(cmd, print_cmd=True, error_ok=False, error_message=None,
exit_code=False, redirect_stdout=False, redirect_stderr=False,
cwd=None, input=None, enter_chroot=False, num_retries=0,
log_to_file=None, combine_stdout_stderr=False):
"""Runs a shell command.
Arguments:
cmd: cmd to run. Should be input to subprocess.POpen. If a string,
converted to an array using split().
print_cmd: prints the command before running it.
error_ok: does not raise an exception on error.
error_message: prints out this message when an error occurrs.
exit_code: returns the return code of the shell command.
redirect_stdout: returns the stdout.
redirect_stderr: holds stderr output until input is communicated.
cwd: the working directory to run this cmd.
input: input to pipe into this command through stdin.
enter_chroot: this command should be run from within the chroot. If set,
cwd must point to the scripts directory.
num_retries: the number of retries to perform before dying
log_to_file: Redirects all stderr and stdout to file specified by this path.
combine_stdout_stderr: Combines stdout and stdin streams into stdout. Auto
set to true if log_to_file specifies a file.
Returns:
If exit_code is True, returns the return code of the shell command.
Else returns the output of the shell command.
Raises:
Exception: Raises RunCommandException on error with optional error_message,
but only if exit_code, and error_ok are both False.
"""
# Set default for variables.
stdout = None
stderr = None
stdin = None
file_handle = None
output = ''
# Modify defaults based on parameters.
if log_to_file:
file_handle = open(log_to_file, 'w+')
stdout = file_handle
stderr = file_handle
else:
if redirect_stdout: stdout = subprocess.PIPE
if redirect_stderr: stderr = subprocess.PIPE
if combine_stdout_stderr: stderr = subprocess.STDOUT
if input: stdin = subprocess.PIPE
if enter_chroot: cmd = ['cros_sdk', '--'] + cmd
# Print out the command before running.
cmd_string = 'PROGRAM(%s) -> RunCommand: %r in dir %s' % (_GetCallerName(),
cmd, cwd)
if print_cmd:
if not log_to_file:
_Info(cmd_string)
else:
_Info('%s -- Logging to %s' % (cmd_string, log_to_file))
for retry_count in range(num_retries + 1):
# If it's not the first attempt, it's a retry
if retry_count > 0 and print_cmd:
_Info('PROGRAM(%s) -> RunCommand: retrying %r in dir %s' %
(_GetCallerName(), cmd, cwd))
proc = subprocess.Popen(cmd, cwd=cwd, stdin=stdin,
stdout=stdout, stderr=stderr, close_fds=True)
(output, error) = proc.communicate(input)
# if the command worked, don't retry any more.
if proc.returncode == 0:
break
if file_handle: file_handle.close()
# If they asked for an exit_code, give it to them on success or failure
if exit_code:
return proc.returncode
# If the command (and all retries) failed, handle error result
if proc.returncode != 0 and not error_ok:
if output:
print >> sys.stderr, output
sys.stderr.flush()
error_info = ('Command "%r" failed.\n' % (cmd) +
(error_message or error or ''))
if log_to_file: error_info += '\nOutput logged to %s' % log_to_file
raise RunCommandException(error_info)
# return final result
return output
def RunCommandCaptureOutput(cmd, print_cmd=True, cwd=None, input=None,
enter_chroot=False,
combine_stdout_stderr=True,
verbose=False):
"""Runs a shell command. Differs from RunCommand, because it allows
you to run a command and capture the exit code, output, and stderr
all at the same time.
Arguments:
cmd: cmd to run. Should be input to subprocess.POpen. If a string,
converted to an array using split().
print_cmd: prints the command before running it.
cwd: the working directory to run this cmd.
input: input to pipe into this command through stdin.
enter_chroot: this command should be run from within the chroot. If set,
cwd must point to the scripts directory.
combine_stdout_stderr -- combine outputs together.
verbose -- also echo cmd.stdout and cmd.stderr to stdout and stderr
Returns:
Returns a tuple: (exit_code, stdout, stderr) (integer, string, string)
stderr is None if combine_stdout_stderr is True
"""
# Set default for variables.
stdout = subprocess.PIPE
stderr = subprocess.PIPE
stdin = None
# Modify defaults based on parameters.
if input: stdin = subprocess.PIPE
if combine_stdout_stderr: stderr = subprocess.STDOUT
if enter_chroot: cmd = ['cros_sdk', '--'] + cmd
# Print out the command before running.
if print_cmd:
_Info('PROGRAM(%s) -> RunCommand: %r in dir %s' %
(_GetCallerName(), cmd, cwd))
proc = subprocess.Popen(cmd, cwd=cwd, stdin=stdin,
stdout=stdout, stderr=stderr, close_fds=True)
output, error = proc.communicate(input)
if verbose:
if output: sys.stdout.write(output)
if error: sys.stderr.write(error)
# Error is None if stdout, stderr are combined.
return proc.returncode, output, error
class Color(object):
"""Conditionally wraps text in ANSI color escape sequences."""
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
BOLD = -1
COLOR_START = '\033[1;%dm'
BOLD_START = '\033[1m'
RESET = '\033[0m'
def __init__(self, enabled=True):
self._enabled = enabled
def Color(self, color, text):
"""Returns text with conditionally added color escape sequences.
Keyword arguments:
color: Text color -- one of the color constants defined in this class.
text: The text to color.
Returns:
If self._enabled is False, returns the original text. If it's True,
returns text with color escape sequences based on the value of color.
"""
if not self._enabled:
return text
if color == self.BOLD:
start = self.BOLD_START
else:
start = self.COLOR_START % (color + 30)
return start + text + self.RESET
def _Info(message):
"""Emits a blue informational message and continues execution.
Keyword arguments:
message: The message to be emitted.
"""
print >> sys.stderr, (
Color(_STDOUT_IS_TTY).Color(Color.BLUE, '\nINFO: ' + message))
sys.stderr.flush()
def FindRepoDir(path=None):
"""Returns the nearest higher-level repo dir from the specified path.
Args:
path: The path to use. Defaults to cwd.
"""
if path is None:
path = os.getcwd()
path = os.path.abspath(path)
while path != '/':
repo_dir = os.path.join(path, '.repo')
if os.path.isdir(repo_dir):
return repo_dir
path = os.path.dirname(path)
return None
def PrependChrootPath(path):
"""Assumes path is a chroot path and prepends chroot to create full path."""
chroot_path = os.path.join(FindRepoDir(), '..', 'chroot')
if path.startswith('/'):
return os.path.realpath(os.path.join(chroot_path, path[1:]))
else:
return os.path.realpath(os.path.join(chroot_path, path))
def IsInsideChroot():
"""Returns True if we are inside chroot."""
return os.path.exists('/etc/debian_chroot')

View File

@ -1,109 +0,0 @@
#!/usr/bin/python
#
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for cros_build_lib."""
import mox
import os
import tempfile
import unittest
import cros_build_lib
class CrosBuildLibTest(mox.MoxTestBase):
"""Test class for cros_build_lib."""
def testRunCommandSimple(self):
"""Test that RunCommand can run a simple successful command."""
result = cros_build_lib.RunCommand(['ls'],
# Keep the test quiet options
print_cmd=False,
redirect_stdout=True,
redirect_stderr=True,
# Test specific options
exit_code=True)
self.assertEqual(result, 0)
def testRunCommandError(self):
"""Test that RunCommand can return an error code for a failed command."""
result = cros_build_lib.RunCommand(['ls', '/nosuchdir'],
# Keep the test quiet options
print_cmd=False,
redirect_stdout=True,
redirect_stderr=True,
# Test specific options
exit_code=True)
self.assertNotEqual(result, 0)
self.assertEquals(type(result), int)
def testRunCommandErrorRetries(self):
"""Test that RunCommand can retry a failed command that always fails."""
# We don't actually check that it's retrying, just exercise the code path.
result = cros_build_lib.RunCommand(['ls', '/nosuchdir'],
# Keep the test quiet options
print_cmd=False,
redirect_stdout=True,
redirect_stderr=True,
# Test specific options
num_retries=2,
error_ok=True,
exit_code=True)
self.assertNotEqual(result, 0)
self.assertEquals(type(result), int)
def testRunCommandErrorException(self):
"""Test that RunCommand can throw an exception when a command fails."""
function = lambda : cros_build_lib.RunCommand(['ls', '/nosuchdir'],
# Keep the test quiet options
print_cmd=False,
redirect_stdout=True,
redirect_stderr=True)
self.assertRaises(cros_build_lib.RunCommandException, function)
def testRunCommandErrorCodeNoException(self):
"""Test that RunCommand doesn't throw an exception with exit_code."""
result = cros_build_lib.RunCommand(['ls', '/nosuchdir'],
# Keep the test quiet options
print_cmd=False,
redirect_stdout=True,
redirect_stderr=True,
# Test specific options
exit_code=True)
# We are really testing that it doesn't throw an exception if exit_code
# if true.
self.assertNotEqual(result, 0)
self.assertEquals(type(result), int)
def testRunCommandCaptureOutput(self):
"""Test that RunCommand can capture stdout if a command succeeds."""
result = cros_build_lib.RunCommand(['echo', '-n', 'Hi'],
# Keep the test quiet options
print_cmd=False,
redirect_stdout=True,
redirect_stderr=True)
self.assertEqual(result, 'Hi')
def testRunCommandLogToFile(self):
"""Test that RunCommand can log output to a file correctly."""
log_file = tempfile.mktemp()
cros_build_lib.RunCommand(['echo', '-n', 'Hi'],
# Keep the test quiet options
print_cmd=False,
# Test specific options
log_to_file=log_file)
log_fh = open(log_file)
log_data = log_fh.read()
self.assertEquals('Hi', log_data)
log_fh.close()
os.remove(log_file)
if __name__ == '__main__':
unittest.main()

View File

@ -1,161 +0,0 @@
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#
# Common vm functions for use in crosutils.
DEFINE_string kvm_pid "" \
"Use this pid file. If it exists and is set, use the vm specified by pid."
DEFINE_boolean no_graphics ${FLAGS_FALSE} "Runs the KVM instance silently."
DEFINE_boolean persist "${FLAGS_FALSE}" "Persist vm."
DEFINE_boolean snapshot ${FLAGS_FALSE} "Don't commit changes to image."
DEFINE_integer ssh_port 9222 "Port to tunnel ssh traffic over."
DEFINE_string vnc "" "VNC Server to display to instead of SDL."
KVM_PID_FILE=/tmp/kvm.$$.pid
LIVE_VM_IMAGE=
if ! KVM_BINARY=$(which kvm 2> /dev/null); then
if ! KVM_BINARY=$(which qemu-kvm 2> /dev/null); then
die "no kvm binary found"
fi
fi
get_pid() {
sudo cat "${KVM_PID_FILE}"
}
# General purpose blocking kill on a pid.
# This function sends a specified kill signal [0-9] to a pid and waits for it
# die up to a given timeout. It exponentially backs off it's timeout starting
# at 1 second.
# $1 the process id.
# $2 signal to send (-#).
# $3 max timeout in seconds.
# Returns 0 on success.
blocking_kill() {
local timeout=1
sudo kill -$2 $1
while ps -p $1 > /dev/null && [ ${timeout} -le $3 ]; do
sleep ${timeout}
timeout=$((timeout*2))
done
! ps -p ${1} > /dev/null
}
kvm_version_greater_equal() {
local test_version="${1}"
local kvm_version=$(kvm --version | sed -E 's/^.*version ([0-9\.]*) .*$/\1/')
[ $(echo -e "${test_version}\n${kvm_version}" | sort -r -V | head -n 1) = \
$kvm_version ]
}
# $1: Path to the virtual image to start.
start_kvm() {
# Override default pid file.
local start_vm=0
[ -n "${FLAGS_kvm_pid}" ] && KVM_PID_FILE=${FLAGS_kvm_pid}
if [ -f "${KVM_PID_FILE}" ]; then
local pid=$(get_pid)
# Check if the process exists.
if ps -p ${pid} > /dev/null ; then
echo "Using a pre-created KVM instance specified by ${FLAGS_kvm_pid}." >&2
start_vm=1
else
# Let's be safe in case they specified a file that isn't a pid file.
echo "File ${KVM_PID_FILE} exists but specified pid doesn't." >&2
fi
fi
# No kvm specified by pid file found, start a new one.
if [ ${start_vm} -eq 0 ]; then
echo "Starting a KVM instance" >&2
local nographics=""
local usesnapshot=""
if [ ${FLAGS_no_graphics} -eq ${FLAGS_TRUE} ]; then
nographics="-nographic -serial none"
fi
if [ -n "${FLAGS_vnc}" ]; then
nographics="-vnc ${FLAGS_vnc}"
fi
if [ ${FLAGS_snapshot} -eq ${FLAGS_TRUE} ]; then
snapshot="-snapshot"
fi
local net_option="-net nic,model=virtio"
if [ -f "$(dirname "$1")/.use_e1000" ]; then
info "Detected older image, using e1000 instead of virtio."
net_option="-net nic,model=e1000"
fi
local cache_type="writeback"
if kvm_version_greater_equal "0.14"; then
cache_type="unsafe"
fi
sudo "${KVM_BINARY}" -m 2G \
-smp 4 \
-vga cirrus \
-pidfile "${KVM_PID_FILE}" \
-daemonize \
${net_option} \
${nographics} \
${snapshot} \
-net user,hostfwd=tcp::${FLAGS_ssh_port}-:22 \
-drive "file=${1},index=0,media=disk,cache=${cache_type}"
info "KVM started with pid stored in ${KVM_PID_FILE}"
LIVE_VM_IMAGE="${1}"
fi
}
# Checks to see if we can access the target virtual machine with ssh.
ssh_ping() {
# TODO(sosa): Remove outside chroot use once all callers work inside chroot.
local cmd
if [ $INSIDE_CHROOT -ne 1 ]; then
cmd="${GCLIENT_ROOT}/src/scripts/ssh_test.sh"
else
cmd=/usr/lib/crosutils/ssh_test.sh
fi
"${cmd}" \
--ssh_port=${FLAGS_ssh_port} \
--remote=127.0.0.1 >&2
}
# Tries to ssh into live image $1 times. After first failure, a try involves
# shutting down and restarting kvm.
retry_until_ssh() {
local can_ssh_into=1
local max_retries=3
local retries=0
ssh_ping && can_ssh_into=0
while [ ${can_ssh_into} -eq 1 ] && [ ${retries} -lt ${max_retries} ]; do
echo "Failed to connect to virtual machine, retrying ... " >&2
stop_kvm || echo "Could not stop kvm. Retrying anyway." >&2
start_kvm "${LIVE_VM_IMAGE}"
ssh_ping && can_ssh_into=0
retries=$((retries + 1))
done
return ${can_ssh_into}
}
stop_kvm() {
if [ "${FLAGS_persist}" -eq "${FLAGS_TRUE}" ]; then
echo "Persist requested. Use --ssh_port ${FLAGS_ssh_port} " \
"--kvm_pid ${KVM_PID_FILE} to re-connect to it." >&2
else
echo "Stopping the KVM instance" >&2
local pid=$(get_pid)
if [ -n "${pid}" ]; then
blocking_kill ${pid} 1 16 || blocking_kill ${pid} 9 1
sudo rm "${KVM_PID_FILE}"
else
echo "No kvm pid found to stop." >&2
return 1
fi
fi
}