diff --git a/chromite/lib/cros_build_lib.py b/chromite/lib/cros_build_lib.py index e9470685e2..e382445087 100644 --- a/chromite/lib/cros_build_lib.py +++ b/chromite/lib/cros_build_lib.py @@ -10,6 +10,22 @@ import sys _STDOUT_IS_TTY = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty() + +class CommandResult(object): + """An object to store various attributes of a child process.""" + + def __init__(self): + self.cmd = None + self.error = None + self.output = None + self.returncode = None + + +class RunCommandError(Exception): + """Error caught in RunCommand() method.""" + pass + + def RunCommand(cmd, print_cmd=True, error_ok=False, error_message=None, exit_code=False, redirect_stdout=False, redirect_stderr=False, cwd=None, input=None, enter_chroot=False, shell=False): @@ -36,6 +52,7 @@ def RunCommand(cmd, print_cmd=True, error_ok=False, error_message=None, stderr = None stdin = None output = '' + cmd_result = CommandResult() # Modify defaults based on parameters. if redirect_stdout: stdout = subprocess.PIPE @@ -51,25 +68,27 @@ def RunCommand(cmd, print_cmd=True, error_ok=False, error_message=None, # Print out the command before running. if print_cmd: Info('RunCommand: %s' % cmd_str) + cmd_result.cmd = cmd_str try: - proc = subprocess.Popen(cmd, cwd=cwd, stdin=stdin, + proc = subprocess.Popen(cmd_str, cwd=cwd, stdin=stdin, stdout=stdout, stderr=stderr, shell=shell) - (output, error) = proc.communicate(input) + (cmd_result.output, cmd_result.error) = proc.communicate(input) if exit_code: - return proc.returncode + cmd_result.returncode = proc.returncode if not error_ok and proc.returncode: - raise Exception('Command "%s" failed.\n' % cmd_str + - (error_message or error or output or '')) + msg = ('Command "%s" failed.\n' % cmd_str + + (error_message or cmd_result.error or cmd_result.output or '')) + raise RunCommandError(msg) except Exception,e: if not error_ok: raise else: Warning(str(e)) - return output + return cmd_result class Color(object): diff --git a/chromite/lib/cros_build_lib_unittest.py b/chromite/lib/cros_build_lib_unittest.py index ef6ee9e27a..064dd7ca5e 100755 --- a/chromite/lib/cros_build_lib_unittest.py +++ b/chromite/lib/cros_build_lib_unittest.py @@ -1,12 +1,120 @@ #!/usr/bin/python +# +# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. import errno -import os -import unittest +import os import shutil -import tempfile - +import subprocess +import tempfile +import unittest import cros_build_lib +import mox + + +class TestRunCommand(unittest.TestCase): + + def setUp(self): + self.mox = mox.Mox() + self.mox.StubOutWithMock(subprocess, 'Popen', use_mock_anything=True) + self.proc_mock = self.mox.CreateMockAnything() + self.cmd = 'test cmd' + self.error = 'test error' + self.output = 'test output' + + def tearDown(self): + self.mox.UnsetStubs() + self.mox.VerifyAll() + + def _assertCrEqual(self, expected, actual): + """Helper method to compare two CommandResult objects. + + This is needed since assertEqual does not know how to compare two + CommandResult objects. + + Args: + expected: a CommandResult object, expected result. + actual: a CommandResult object, actual result. + """ + self.assertEqual(expected.cmd, actual.cmd) + self.assertEqual(expected.error, actual.error) + self.assertEqual(expected.output, actual.output) + self.assertEqual(expected.returncode, actual.returncode) + + def _testCmd(self, cmd, sp_kv=dict(), rc_kv=dict()): + """Factor out common setup logic for testing --cmd. + + Args: + cmd: a string or an array of strings. + sp_kv: key-value pairs passed to subprocess.Popen(). + rc_kv: key-value pairs passed to RunCommand(). + """ + expected_result = cros_build_lib.CommandResult() + expected_result.cmd = self.cmd + expected_result.error = self.error + expected_result.output = self.output + if 'exit_code' in rc_kv: + expected_result.returncode = self.proc_mock.returncode + + arg_dict = dict() + for attr in 'cwd stdin stdout stderr shell'.split(): + if attr in sp_kv: + arg_dict[attr] = sp_kv[attr] + else: + if attr == 'shell': + arg_dict[attr] = False + else: + arg_dict[attr] = None + + subprocess.Popen(self.cmd, **arg_dict).AndReturn(self.proc_mock) + self.proc_mock.communicate(None).AndReturn((self.output, self.error)) + self.mox.ReplayAll() + actual_result = cros_build_lib.RunCommand(cmd, **rc_kv) + self._assertCrEqual(expected_result, actual_result) + + def testReturnCodeZeroWithArrayCmd(self): + """--enter_chroot=False and --cmd is an array of strings.""" + self.proc_mock.returncode = 0 + cmd_list = ['foo', 'bar', 'roger'] + self.cmd = 'foo bar roger' + self._testCmd(cmd_list, rc_kv=dict(exit_code=True)) + + def testReturnCodeZeroWithArrayCmdEnterChroot(self): + """--enter_chroot=True and --cmd is an array of strings.""" + self.proc_mock.returncode = 0 + cmd_list = ['foo', 'bar', 'roger'] + self.cmd = './enter_chroot.sh -- %s' % ' '.join(cmd_list) + self._testCmd(cmd_list, rc_kv=dict(enter_chroot=True)) + + def testReturnCodeNotZeroErrorOkNotRaisesError(self): + """Raise error when proc.communicate() returns non-zero.""" + self.proc_mock.returncode = 1 + self._testCmd(self.cmd, rc_kv=dict(error_ok=True)) + + def testSubprocessCommunicateExceptionRaisesError(self): + """Verify error raised by communicate() is caught.""" + subprocess.Popen(self.cmd, cwd=None, stdin=None, stdout=None, stderr=None, + shell=False).AndReturn(self.proc_mock) + self.proc_mock.communicate(None).AndRaise(ValueError) + self.mox.ReplayAll() + self.assertRaises(ValueError, cros_build_lib.RunCommand, self.cmd) + + def testSubprocessCommunicateExceptionNotRaisesError(self): + """Don't re-raise error from communicate() when --error_ok=True.""" + expected_result = cros_build_lib.CommandResult() + cmd_str = './enter_chroot.sh -- %s' % self.cmd + expected_result.cmd = cmd_str + + subprocess.Popen(cmd_str, cwd=None, stdin=None, stdout=None, stderr=None, + shell=False).AndReturn(self.proc_mock) + self.proc_mock.communicate(None).AndRaise(ValueError) + self.mox.ReplayAll() + actual_result = cros_build_lib.RunCommand(self.cmd, error_ok=True, + enter_chroot=True) + self._assertCrEqual(expected_result, actual_result) + class TestListFiles(unittest.TestCase): @@ -36,9 +144,7 @@ class TestListFiles(unittest.TestCase): tmp.close() def testTraverse(self): - """ - Test that we are traversing the directory properly - """ + """Test that we are traversing the directory properly.""" dir_structure = ['one/two/test.txt', 'one/blah.py', 'three/extra.conf'] self._createNestedDir(dir_structure) @@ -50,9 +156,7 @@ class TestListFiles(unittest.TestCase): self.fail('%s was not found in %s' % (file, dir_structure)) def testEmptyFilePath(self): - """ - Test that we return nothing when directories are empty - """ + """Test that we return nothing when directories are empty.""" dir_structure = ['one/', 'two/', 'one/a/'] self._createNestedDir(dir_structure) files = cros_build_lib.ListFiles(self.root_dir) @@ -63,7 +167,7 @@ class TestListFiles(unittest.TestCase): cros_build_lib.ListFiles('/me/no/existe') except OSError, err: self.assertEqual(err.errno, errno.ENOENT) - + if __name__ == '__main__': unittest.main()