From 8d7caa720ab3f9fcb6ac73632d41e3da5cfd5174 Mon Sep 17 00:00:00 2001 From: Chris Sosa Date: Tue, 22 Mar 2011 13:17:39 -0700 Subject: [PATCH] Remove au_test_harness code and change symlinks to point to new location Change-Id: I55a198e09ceac1e05b766864dbc920bfaedffb1f BUG=chromium-os:11172 TEST=Tested ctest and cros_au_test_harness running from src/scripts with buildbot params. Review URL: http://codereview.chromium.org/6717011 --- bin/au_test_harness/__init__.py | 0 bin/au_test_harness/au_test.py | 288 ------------------ bin/au_test_harness/au_worker.py | 272 ----------------- bin/au_test_harness/cros_au_test_harness.py | 278 ----------------- bin/au_test_harness/cros_test_proxy.py | 121 -------- bin/au_test_harness/dev_server_wrapper.py | 54 ---- bin/au_test_harness/dummy_au_worker.py | 45 --- bin/au_test_harness/parallel_test_job.py | 109 ------- bin/au_test_harness/public_key_manager.py | 91 ------ bin/au_test_harness/real_au_worker.py | 63 ---- bin/au_test_harness/update_exception.py | 11 - bin/au_test_harness/vm_au_worker.py | 120 -------- bin/cros_au_test_harness | 2 +- bin/ctest | 2 +- bin/ctest.py | 320 -------------------- bin/ctest_unittest.py | 228 -------------- lib/cros_build_lib.py | 4 +- 17 files changed, 4 insertions(+), 2004 deletions(-) delete mode 100644 bin/au_test_harness/__init__.py delete mode 100644 bin/au_test_harness/au_test.py delete mode 100644 bin/au_test_harness/au_worker.py delete mode 100755 bin/au_test_harness/cros_au_test_harness.py delete mode 100644 bin/au_test_harness/cros_test_proxy.py delete mode 100644 bin/au_test_harness/dev_server_wrapper.py delete mode 100644 bin/au_test_harness/dummy_au_worker.py delete mode 100644 bin/au_test_harness/parallel_test_job.py delete mode 100644 bin/au_test_harness/public_key_manager.py delete mode 100644 bin/au_test_harness/real_au_worker.py delete mode 100644 bin/au_test_harness/update_exception.py delete mode 100644 bin/au_test_harness/vm_au_worker.py delete mode 100755 bin/ctest.py delete mode 100755 bin/ctest_unittest.py diff --git a/bin/au_test_harness/__init__.py b/bin/au_test_harness/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/bin/au_test_harness/au_test.py b/bin/au_test_harness/au_test.py deleted file mode 100644 index 47aad12500..0000000000 --- a/bin/au_test_harness/au_test.py +++ /dev/null @@ -1,288 +0,0 @@ -# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Module containing a test suite that is run to test auto updates.""" - -import os -import tempfile -import time -import unittest - -import cros_build_lib as cros_lib - -import cros_test_proxy -import dummy_au_worker -import real_au_worker -import vm_au_worker - - -class AUTest(unittest.TestCase): - """Test harness that uses an au_worker to perform and validate updates. - - Defines a test suite that is run using an au_worker. An au_worker can - be created to perform and validates updates on both virtual and real devices. - See documentation for au_worker for more information. - """ - test_results_root = None - public_key_managers = [] - - @classmethod - def ProcessOptions(cls, options, use_dummy_worker): - """Processes options for the test suite and sets up the worker class. - - Args: - options: options class to be parsed from main class. - use_dummy_worker: If True, use a dummy_worker_class rather than deriving - one from options.type. - """ - cls.base_image_path = options.base_image - cls.target_image_path = options.target_image - cls.clean = options.clean - - assert options.type in ['real', 'vm'], 'Failed to specify either real|vm.' - if use_dummy_worker: - cls.worker_class = dummy_au_worker.DummyAUWorker - elif options.type == 'vm': - cls.worker_class = vm_au_worker.VMAUWorker - else: - cls.worker_class = real_au_worker.RealAUWorker - - # Sanity checks. - if not cls.base_image_path: - cros_lib.Die('Need path to base image for vm.') - elif not os.path.exists(cls.base_image_path): - cros_lib.Die('%s does not exist' % cls.base_image_path) - - if not cls.target_image_path: - cros_lib.Die('Need path to target image to update with.') - elif not os.path.exists(cls.target_image_path): - cros_lib.Die('%s does not exist' % cls.target_image_path) - - # Initialize test root. Test root path must be in the chroot. - if not cls.test_results_root: - if options.test_results_root: - assert 'chroot/tmp' in options.test_results_root, \ - 'Must specify a test results root inside tmp in a chroot.' - cls.test_results_root = options.test_results_root - else: - cls.test_results_root = tempfile.mkdtemp( - prefix='au_test_harness', - dir=cros_lib.PrependChrootPath('/tmp')) - - cros_lib.Info('Using %s as the test results root' % cls.test_results_root) - - # Cache away options to instantiate workers later. - cls.options = options - - def AttemptUpdateWithPayloadExpectedFailure(self, payload, expected_msg): - """Attempt a payload update, expect it to fail with expected log""" - try: - self.worker.UpdateUsingPayload(payload) - except UpdateException as err: - # Will raise ValueError if expected is not found. - if re.search(re.escape(expected_msg), err.stdout, re.MULTILINE): - return - else: - cros_lib.Warning("Didn't find '%s' in:" % expected_msg) - cros_lib.Warning(err.stdout) - - self.fail('We managed to update when failure was expected') - - def AttemptUpdateWithFilter(self, filter, proxy_port=8081): - """Update through a proxy, with a specified filter, and expect success.""" - self.worker.PrepareBase(self.target_image_path) - - # The devserver runs at port 8080 by default. We assume that here, and - # start our proxy at a different one. We then tell our update tools to - # have the client connect to our proxy_port instead of 8080. - proxy = cros_test_proxy.CrosTestProxy(port_in=proxy_port, - address_out='127.0.0.1', - port_out=8080, - filter=filter) - proxy.serve_forever_in_thread() - try: - self.worker.PerformUpdate(self.target_image_path, self.target_image_path, - proxy_port=proxy_port) - finally: - proxy.shutdown() - - # --- UNITTEST SPECIFIC METHODS --- - - def setUp(self): - """Overrides unittest.TestCase.setUp and called before every test. - - Sets instance specific variables and initializes worker. - """ - unittest.TestCase.setUp(self) - self.worker = self.worker_class(self.options, AUTest.test_results_root) - self.crosutils = os.path.join(os.path.dirname(__file__), '..', '..') - self.download_folder = os.path.join(self.crosutils, 'latest_download') - if not os.path.exists(self.download_folder): - os.makedirs(self.download_folder) - - def tearDown(self): - """Overrides unittest.TestCase.tearDown and called after every test.""" - self.worker.CleanUp() - - def testUpdateKeepStateful(self): - """Tests if we can update normally. - - This test checks that we can update by updating the stateful partition - rather than wiping it. - """ - self.worker.InitializeResultsDirectory() - # Just make sure some tests pass on original image. Some old images - # don't pass many tests. - self.worker.PrepareBase(self.base_image_path) - # TODO(sosa): move to 100% once we start testing using the autotest paired - # with the dev channel. - percent_passed = self.worker.VerifyImage(self, 10) - - # Update to - all tests should pass on new image. - self.worker.PerformUpdate(self.target_image_path, self.base_image_path) - percent_passed = self.worker.VerifyImage(self) - - # Update from - same percentage should pass that originally passed. - self.worker.PerformUpdate(self.base_image_path, self.target_image_path) - self.worker.VerifyImage(self, percent_passed) - - def testUpdateWipeStateful(self): - """Tests if we can update after cleaning the stateful partition. - - This test checks that we can update successfully after wiping the - stateful partition. - """ - self.worker.InitializeResultsDirectory() - # Just make sure some tests pass on original image. Some old images - # don't pass many tests. - self.worker.PrepareBase(self.base_image_path) - percent_passed = self.worker.VerifyImage(self, 10) - - # Update to - all tests should pass on new image. - self.worker.PerformUpdate(self.target_image_path, self.base_image_path, - 'clean') - self.worker.VerifyImage(self) - - # Update from - same percentage should pass that originally passed. - self.worker.PerformUpdate(self.base_image_path, self.target_image_path, - 'clean') - self.worker.VerifyImage(self, percent_passed) - - def testInterruptedUpdate(self): - """Tests what happens if we interrupt payload delivery 3 times.""" - - class InterruptionFilter(cros_test_proxy.Filter): - """This filter causes the proxy to interrupt the download 3 times - - It does this by closing the first three connections to transfer - 2M total in the outbound connection after they transfer the - 2M. - """ - def __init__(self): - """Defines variable shared across all connections""" - self.close_count = 0 - - def setup(self): - """Called once at the start of each connection.""" - self.data_size = 0 - - def OutBound(self, data): - """Called once per packet for outgoing data. - - The first three connections transferring more than 2M - outbound will be closed. - """ - if self.close_count < 3: - if self.data_size > (2 * 1024 * 1024): - self.close_count += 1 - return None - - self.data_size += len(data) - return data - - self.worker.InitializeResultsDirectory() - self.AttemptUpdateWithFilter(InterruptionFilter(), proxy_port=8082) - - def testDelayedUpdate(self): - """Tests what happens if some data is delayed during update delivery""" - - class DelayedFilter(cros_test_proxy.Filter): - """Causes intermittent delays in data transmission. - - It does this by inserting 3 20 second delays when transmitting - data after 2M has been sent. - """ - def setup(self): - """Called once at the start of each connection.""" - self.data_size = 0 - self.delay_count = 0 - - def OutBound(self, data): - """Called once per packet for outgoing data. - - The first three packets after we reach 2M transferred - are delayed by 20 seconds. - """ - if self.delay_count < 3: - if self.data_size > (2 * 1024 * 1024): - self.delay_count += 1 - time.sleep(20) - - self.data_size += len(data) - return data - - self.worker.InitializeResultsDirectory() - self.AttemptUpdateWithFilter(DelayedFilter(), proxy_port=8083) - - def SimpleTest(self): - """A simple update that updates once from a base image to a target. - - We explicitly don't use test prefix so that isn't run by default. Can be - run using test_prefix option. - """ - self.worker.InitializeResultsDirectory() - self.worker.PrepareBase(self.base_image_path) - self.worker.PerformUpdate(self.target_image_path, self.base_image_path) - self.worker.VerifyImage(self) - - # --- DISABLED TESTS --- - - # TODO(sosa): Get test to work with verbose. - def NotestPartialUpdate(self): - """Tests what happens if we attempt to update with a truncated payload.""" - self.worker.InitializeResultsDirectory() - # Preload with the version we are trying to test. - self.worker.PrepareBase(self.target_image_path) - - # Image can be updated at: - # ~chrome-eng/chromeos/localmirror/autest-images - url = 'http://gsdview.appspot.com/chromeos-localmirror/' \ - 'autest-images/truncated_image.gz' - payload = os.path.join(self.download_folder, 'truncated_image.gz') - - # Read from the URL and write to the local file - urllib.urlretrieve(url, payload) - - expected_msg = 'download_hash_data == update_check_response_hash failed' - self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg) - - # TODO(sosa): Get test to work with verbose. - def NotestCorruptedUpdate(self): - """Tests what happens if we attempt to update with a corrupted payload.""" - self.worker.InitializeResultsDirectory() - # Preload with the version we are trying to test. - self.worker.PrepareBase(self.target_image_path) - - # Image can be updated at: - # ~chrome-eng/chromeos/localmirror/autest-images - url = 'http://gsdview.appspot.com/chromeos-localmirror/' \ - 'autest-images/corrupted_image.gz' - payload = os.path.join(self.download_folder, 'corrupted.gz') - - # Read from the URL and write to the local file - urllib.urlretrieve(url, payload) - - # This update is expected to fail... - expected_msg = 'zlib inflate() error:-3' - self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg) diff --git a/bin/au_test_harness/au_worker.py b/bin/au_test_harness/au_worker.py deleted file mode 100644 index 303c180208..0000000000 --- a/bin/au_test_harness/au_worker.py +++ /dev/null @@ -1,272 +0,0 @@ -# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Module that contains the interface for au_test_harness workers. - -An au test harnss worker is a class that contains the logic for performing -and validating updates on a target. This should be subclassed to handle -various types of target. Types of targets include VM's, real devices, etc. -""" - -import inspect -import threading -import os -import sys - -import cros_build_lib as cros_lib - -import dev_server_wrapper -import update_exception - - -class AUWorker(object): - """Interface for a worker that updates and verifies images.""" - # Mapping between cached payloads to directory locations. - update_cache = None - - # --- INTERFACE --- - - def __init__(self, options, test_results_root): - """Processes options for the specific-type of worker.""" - self.board = options.board - self.private_key = options.private_key - self.test_results_root = test_results_root - self.use_delta_updates = options.delta - self.verbose = options.verbose - self.vm_image_path = None - if options.quick_test: - self.verify_suite = 'build_RootFilesystemSize' - else: - self.verify_suite = 'suite_Smoke' - - # Set these up as they are used often. - self.crosutils = os.path.join(os.path.dirname(__file__), '..', '..') - self.crosutilsbin = os.path.join(os.path.dirname(__file__), '..') - - def CleanUp(self): - """Called at the end of every test.""" - pass - - def UpdateImage(self, image_path, src_image_path='', stateful_change='old', - proxy_port=None, private_key_path=None): - """Implementation of an actual update. - - See PerformUpdate for description of args. Subclasses must override this - method with the correct update procedure for the class. - """ - pass - - def UpdateUsingPayload(self, update_path, stateful_change='old', - proxy_port=None): - """Updates target with the pre-generated update stored in update_path. - - Subclasses must override this method with the correct update procedure for - the class. - - Args: - update_path: Path to the image to update with. This directory should - contain both update.gz, and stateful.image.gz - proxy_port: Port to have the client connect to. For use with - CrosTestProxy. - """ - pass - - def VerifyImage(self, unittest, percent_required_to_pass=100): - """Verifies the image with tests. - - Verifies that the test images passes the percent required. Subclasses must - override this method with the correct update procedure for the class. - - Args: - unittest: pointer to a unittest to fail if we cannot verify the image. - percent_required_to_pass: percentage required to pass. This should be - fall between 0-100. - - Returns: - Returns the percent that passed. - """ - pass - - # --- INTERFACE TO AU_TEST --- - - def PerformUpdate(self, image_path, src_image_path='', stateful_change='old', - proxy_port=None, private_key_path=None): - """Performs an update using _UpdateImage and reports any error. - - Subclasses should not override this method but override _UpdateImage - instead. - - Args: - image_path: Path to the image to update with. This image must be a test - image. - src_image_path: Optional. If set, perform a delta update using the - image specified by the path as the source image. - stateful_change: How to modify the stateful partition. Values are: - 'old': Don't modify stateful partition. Just update normally. - 'clean': Uses clobber-state to wipe the stateful partition with the - exception of code needed for ssh. - proxy_port: Port to have the client connect to. For use with - CrosTestProxy. - private_key_path: Path to a private key to use with update payload. - Raises an update_exception.UpdateException if _UpdateImage returns an error. - """ - try: - if not self.use_delta_updates: src_image_path = '' - if private_key_path: - key_to_use = private_key_path - else: - key_to_use = self.private_key - - self.UpdateImage(image_path, src_image_path, stateful_change, - proxy_port, key_to_use) - except update_exception.UpdateException as err: - # If the update fails, print it out - Warning(err.stdout) - raise - - @classmethod - def SetUpdateCache(cls, update_cache): - """Sets the global update cache for getting paths to devserver payloads.""" - cls.update_cache = update_cache - - # --- METHODS FOR SUB CLASS USE --- - - def PrepareRealBase(self, image_path): - """Prepares a remote device for worker test by updating it to the image.""" - self.UpdateImage(image_path) - - def PrepareVMBase(self, image_path): - """Prepares a VM image for worker test by creating the VM file from the img. - """ - # VM Constants. - FULL_VDISK_SIZE = 6072 - FULL_STATEFULFS_SIZE = 3074 - # Needed for VM delta updates. We need to use the qemu image rather - # than the base image on a first update. By tracking the first_update - # we can set src_image to the qemu form of the base image when - # performing generating the delta payload. - self._first_update = True - self.vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname( - image_path) - if not os.path.exists(self.vm_image_path): - cros_lib.Info('Creating %s' % self.vm_image_path) - cros_lib.RunCommand(['./image_to_vm.sh', - '--full', - '--from=%s' % cros_lib.ReinterpretPathForChroot( - os.path.dirname(image_path)), - '--vdisk_size=%s' % FULL_VDISK_SIZE, - '--statefulfs_size=%s' % FULL_STATEFULFS_SIZE, - '--board=%s' % self.board, - '--test_image' - ], enter_chroot=True, cwd=self.crosutils) - - cros_lib.Info('Using %s as base' % self.vm_image_path) - assert os.path.exists(self.vm_image_path) - - def GetStatefulChangeFlag(self, stateful_change): - """Returns the flag to pass to image_to_vm for the stateful change.""" - stateful_change_flag = '' - if stateful_change: - stateful_change_flag = '--stateful_update_flag=%s' % stateful_change - - return stateful_change_flag - - def AppendUpdateFlags(self, cmd, image_path, src_image_path, proxy_port, - private_key_path): - """Appends common args to an update cmd defined by an array. - - Modifies cmd in places by appending appropriate items given args. - """ - if proxy_port: cmd.append('--proxy_port=%s' % proxy_port) - - # Get pregenerated update if we have one. - update_id = dev_server_wrapper.GenerateUpdateId(image_path, src_image_path, - private_key_path) - cache_path = self.update_cache[update_id] - if cache_path: - update_url = dev_server_wrapper.DevServerWrapper.GetDevServerURL( - proxy_port, cache_path) - cmd.append('--update_url=%s' % update_url) - else: - cmd.append('--image=%s' % image_path) - if src_image_path: cmd.append('--src_image=%s' % src_image_path) - - def RunUpdateCmd(self, cmd, log_directory=None): - """Runs the given update cmd given verbose options. - - Raises an update_exception.UpdateException if the update fails. - """ - if self.verbose: - try: - if log_directory: - cros_lib.RunCommand(cmd, log_to_file=os.path.join(log_directory, - 'update.log')) - else: - cros_lib.RunCommand(cmd) - except Exception as e: - Warning(str(e)) - raise update_exception.UpdateException(1, str(e)) - else: - (code, stdout, stderr) = cros_lib.RunCommandCaptureOutput(cmd) - if code != 0: - Warning(stdout) - raise update_exception.UpdateException(code, stdout) - - def AssertEnoughTestsPassed(self, unittest, output, percent_required_to_pass): - """Helper function that asserts a sufficient number of tests passed. - - Args: - output: stdout from a test run. - percent_required_to_pass: percentage required to pass. This should be - fall between 0-100. - Returns: - percent that passed. - """ - cros_lib.Info('Output from VerifyImage():') - print >> sys.stderr, output - sys.stderr.flush() - percent_passed = self._ParseGenerateTestReportOutput(output) - cros_lib.Info('Percent passed: %d vs. Percent required: %d' % ( - percent_passed, percent_required_to_pass)) - unittest.assertTrue(percent_passed >= percent_required_to_pass) - return percent_passed - - def InitializeResultsDirectory(self): - """Called by a test to initialize a results directory for this worker.""" - # Use the name of the test. - test_name = inspect.stack()[1][3] - self.results_directory = os.path.join(self.test_results_root, test_name) - self.results_count = 0 - - def GetNextResultsPath(self, label): - """Returns a path for the results directory for this label. - - Prefixes directory returned for worker with time called i.e. 1_label, - 2_label, etc. The directory returned is outside the chroot so if passing - to an script that is called with enther_chroot, make sure to use - ReinterpretPathForChroot. - """ - self.results_count += 1 - dir = os.path.join(self.results_directory, '%s_%s' % (self.results_count, - label)) - if not os.path.exists(dir): - os.makedirs(dir) - - return dir - - # --- PRIVATE HELPER FUNCTIONS --- - - def _ParseGenerateTestReportOutput(self, output): - """Returns the percentage of tests that passed based on output.""" - percent_passed = 0 - lines = output.split('\n') - - for line in lines: - if line.startswith("Total PASS:"): - # FORMAT: ^TOTAL PASS: num_passed/num_total (percent%)$ - percent_passed = line.split()[3].strip('()%') - cros_lib.Info('Percent of tests passed %s' % percent_passed) - break - - return int(percent_passed) diff --git a/bin/au_test_harness/cros_au_test_harness.py b/bin/au_test_harness/cros_au_test_harness.py deleted file mode 100755 index 10c6abcd46..0000000000 --- a/bin/au_test_harness/cros_au_test_harness.py +++ /dev/null @@ -1,278 +0,0 @@ -#!/usr/bin/python - -# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""This module runs a suite of Auto Update tests. - - The tests can be run on either a virtual machine or actual device depending - on parameters given. Specific tests can be run by invoking --test_prefix. - Verbose is useful for many of the tests if you want to see individual commands - being run during the update process. -""" - -import optparse -import os -import re -import sys -import unittest - -sys.path.append(os.path.join(os.path.dirname(__file__), '../lib')) -import cros_build_lib as cros_lib - -import au_test -import au_worker -import dummy_au_worker -import dev_server_wrapper -import parallel_test_job -import public_key_manager -import update_exception - -def _PrepareTestSuite(options, use_dummy_worker=False): - """Returns a prepared test suite given by the options and test class.""" - au_test.AUTest.ProcessOptions(options, use_dummy_worker) - test_loader = unittest.TestLoader() - test_loader.testMethodPrefix = options.test_prefix - return test_loader.loadTestsFromTestCase(au_test.AUTest) - - -def _PregenerateUpdates(options): - """Determines all deltas that will be generated and generates them. - - This method effectively pre-generates the dev server cache for all tests. - - Args: - options: options from parsed parser. - Returns: - Dictionary of Update Identifiers->Relative cache locations. - Raises: - update_exception.UpdateException if we fail to generate an update. - """ - def _GenerateVMUpdate(target, src, private_key_path): - """Generates an update using the devserver.""" - command = ['./enter_chroot.sh', - '--', - 'sudo', - 'start_devserver', - '--pregenerate_update', - '--exit', - ] - # Add actual args to command. - command.append('--image=%s' % cros_lib.ReinterpretPathForChroot(target)) - if src: command.append('--src_image=%s' % - cros_lib.ReinterpretPathForChroot(src)) - if options.type == 'vm': command.append('--for_vm') - if private_key_path: - command.append('--private_key=%s' % - cros_lib.ReinterpretPathForChroot(private_key_path)) - - return cros_lib.RunCommandCaptureOutput(command, combine_stdout_stderr=True, - print_cmd=True) - - # Use dummy class to mock out updates that would be run as part of a test. - test_suite = _PrepareTestSuite(options, use_dummy_worker=True) - test_result = unittest.TextTestRunner(verbosity=0).run(test_suite) - if not test_result.wasSuccessful(): - raise update_exception.UpdateException(1, - 'Error finding updates to generate.') - - cros_lib.Info('The following delta updates are required.') - update_ids = [] - jobs = [] - args = [] - modified_images = set() - for target, srcs in dummy_au_worker.DummyAUWorker.delta_list.items(): - modified_images.add(target) - for src_key in srcs: - (src, _ , key) = src_key.partition('+') - if src: modified_images.add(src) - # TODO(sosa): Add private key as part of caching name once devserver can - # handle it its own cache. - update_id = dev_server_wrapper.GenerateUpdateId(target, src, key) - print >> sys.stderr, 'AU: %s' % update_id - update_ids.append(update_id) - jobs.append(_GenerateVMUpdate) - args.append((target, src, key)) - - # Always add the base image path. This is only useful for non-delta updates. - modified_images.add(options.base_image) - - # Add public key to all images we are using. - if options.public_key: - cros_lib.Info('Adding public keys to images for testing.') - for image in modified_images: - manager = public_key_manager.PublicKeyManager(image, options.public_key) - manager.AddKeyToImage() - au_test.AUTest.public_key_managers.append(manager) - - raw_results = parallel_test_job.RunParallelJobs(options.jobs, jobs, args, - print_status=True) - results = [] - - # Looking for this line in the output. - key_line_re = re.compile('^PREGENERATED_UPDATE=([\w/.]+)') - for result in raw_results: - (return_code, output, _) = result - if return_code != 0: - cros_lib.Warning(output) - raise update_exception.UpdateException(return_code, - 'Failed to generate all updates.') - else: - for line in output.splitlines(): - match = key_line_re.search(line) - if match: - # Convert blah/blah/update.gz -> update/blah/blah. - path_to_update_gz = match.group(1).rstrip() - (path_to_update_dir, _, _) = path_to_update_gz.rpartition( - '/update.gz') - results.append('/'.join(['update', path_to_update_dir])) - break - - # Make sure all generation of updates returned cached locations. - if len(raw_results) != len(results): - raise update_exception.UpdateException( - 1, 'Insufficient number cache directories returned.') - - # Build the dictionary from our id's and returned cache paths. - cache_dictionary = {} - for index, id in enumerate(update_ids): - cache_dictionary[id] = results[index] - - return cache_dictionary - - -def _RunTestsInParallel(options): - """Runs the tests given by the options in parallel.""" - threads = [] - args = [] - test_suite = _PrepareTestSuite(options) - for test in test_suite: - test_name = test.id() - test_case = unittest.TestLoader().loadTestsFromName(test_name) - threads.append(unittest.TextTestRunner(verbosity=2).run) - args.append(test_case) - - results = parallel_test_job.RunParallelJobs(options.jobs, threads, args, - print_status=False) - for test_result in results: - if not test_result.wasSuccessful(): - cros_lib.Die('Test harness was not successful') - - -def _CleanPreviousWork(options): - """Cleans up previous work from the devserver cache and local image cache.""" - cros_lib.Info('Cleaning up previous work.') - # Wipe devserver cache. - cros_lib.RunCommandCaptureOutput( - ['sudo', 'start_devserver', '--clear_cache', '--exit', ], - enter_chroot=True, print_cmd=False, combine_stdout_stderr=True) - - # Clean previous vm images if they exist. - if options.type == 'vm': - target_vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname( - options.target_image) - base_vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname( - options.base_image) - if os.path.exists(target_vm_image_path): os.remove(target_vm_image_path) - if os.path.exists(base_vm_image_path): os.remove(base_vm_image_path) - - -def main(): - parser = optparse.OptionParser() - parser.add_option('-b', '--base_image', - help='path to the base image.') - parser.add_option('-r', '--board', - help='board for the images.') - parser.add_option('--clean', default=False, dest='clean', action='store_true', - help='Clean all previous state') - parser.add_option('--no_delta', action='store_false', default=True, - dest='delta', - help='Disable using delta updates.') - parser.add_option('--no_graphics', action='store_true', - help='Disable graphics for the vm test.') - parser.add_option('-j', '--jobs', default=8, type=int, - help='Number of simultaneous jobs') - parser.add_option('--public_key', default=None, - help='Public key to use on images and updates.') - parser.add_option('--private_key', default=None, - help='Private key to use on images and updates.') - parser.add_option('-q', '--quick_test', default=False, action='store_true', - help='Use a basic test to verify image.') - parser.add_option('-m', '--remote', - help='Remote address for real test.') - parser.add_option('-t', '--target_image', - help='path to the target image.') - parser.add_option('--test_results_root', default=None, - help='Root directory to store test results. Should ' - 'be defined relative to chroot root.') - parser.add_option('--test_prefix', default='test', - help='Only runs tests with specific prefix i.e. ' - 'testFullUpdateWipeStateful.') - parser.add_option('-p', '--type', default='vm', - help='type of test to run: [vm, real]. Default: vm.') - parser.add_option('--verbose', default=True, action='store_true', - help='Print out rather than capture output as much as ' - 'possible.') - (options, leftover_args) = parser.parse_args() - - if leftover_args: parser.error('Found unsupported flags: %s' % leftover_args) - - assert options.target_image and os.path.exists(options.target_image), \ - 'Target image path does not exist' - if not options.base_image: - cros_lib.Info('Base image not specified. Using target as base image.') - options.base_image = options.target_image - - if options.private_key or options.public_key: - error_msg = ('Could not find %s key. Both private and public keys must be ' - 'specified if either is specified.') - assert options.private_key and os.path.exists(options.private_key), \ - error_msg % 'private' - assert options.public_key and os.path.exists(options.public_key), \ - error_msg % 'public' - - # Clean up previous work if requested. - if options.clean: _CleanPreviousWork(options) - - # Make sure we have a log directory. - if options.test_results_root and not os.path.exists( - options.test_results_root): - os.makedirs(options.test_results_root) - - # Pre-generate update modifies images by adding public keys to them. - # Wrap try to make sure we clean this up before we're done. - try: - # Generate cache of updates to use during test harness. - update_cache = _PregenerateUpdates(options) - au_worker.AUWorker.SetUpdateCache(update_cache) - - my_server = dev_server_wrapper.DevServerWrapper( - au_test.AUTest.test_results_root) - my_server.start() - try: - if options.type == 'vm': - _RunTestsInParallel(options) - else: - # TODO(sosa) - Take in a machine pool for a real test. - # Can't run in parallel with only one remote device. - test_suite = _PrepareTestSuite(options) - test_result = unittest.TextTestRunner(verbosity=2).run(test_suite) - if not test_result.wasSuccessful(): cros_lib.Die('Test harness failed.') - - finally: - my_server.Stop() - - finally: - # Un-modify any target images we modified. We don't need to un-modify - # non-targets because they aren't important for archival steps. - if options.public_key: - cros_lib.Info('Cleaning up. Removing keys added as part of testing.') - target_directory = os.path.dirname(options.target_image) - for key_manager in au_test.AUTest.public_key_managers: - if key_manager.image_path.startswith(target_directory): - key_manager.RemoveKeyFromImage() - - -if __name__ == '__main__': - main() diff --git a/bin/au_test_harness/cros_test_proxy.py b/bin/au_test_harness/cros_test_proxy.py deleted file mode 100644 index 0b3728638b..0000000000 --- a/bin/au_test_harness/cros_test_proxy.py +++ /dev/null @@ -1,121 +0,0 @@ -# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Module containing various classes pertaining to inserting a proxy in a test. -""" - -import os -import select -import socket -import SocketServer -import threading - -class Filter(object): - """Base class for data filters. - - Pass subclass of this to CrosTestProxy which will perform whatever - connection manipulation you prefer. - """ - - def setup(self): - """This setup method is called once per connection.""" - pass - - def InBound(self, data): - """This method is called once per packet of incoming data. - - The value returned is what is sent through the proxy. If - None is returned, the connection will be closed. - """ - return data - - def OutBound(self, data): - """This method is called once per packet of outgoing data. - - The value returned is what is sent through the proxy. If - None is returned, the connection will be closed. - """ - return data - - -class CrosTestProxy(SocketServer.ThreadingMixIn, SocketServer.TCPServer): - """A transparent proxy for simulating network errors""" - - class _Handler(SocketServer.BaseRequestHandler): - """Proxy connection handler that passes data though a filter""" - - def setup(self): - """Setup is called once for each connection proxied.""" - self.server.filter.setup() - - def handle(self): - """Handles each incoming connection. - - Opens a new connection to the port we are proxing to, then - passes each packet along in both directions after passing - them through the filter object passed in. - """ - # Open outgoing socket - s_in = self.request - s_out = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s_out.connect((self.server.address_out, self.server.port_out)) - - while True: - rlist, wlist, xlist = select.select([s_in, s_out], [], []) - - if s_in in rlist: - data = s_in.recv(1024) - data = self.server.filter.InBound(data) - if not data: break - try: - # If there is any error sending data, close both connections. - s_out.sendall(data) - except socket.error: - break - - if s_out in rlist: - data = s_out.recv(1024) - data = self.server.filter.OutBound(data) - if not data: break - try: - # If there is any error sending data, close both connections. - s_in.sendall(data) - except socket.error: - break - - s_in.close() - s_out.close() - - def __init__(self, - filter, - port_in=8081, - address_out='127.0.0.1', port_out=8080): - """Configures the proxy object. - - Args: - filter: An instance of a subclass of Filter. - port_in: Port on which to listen for incoming connections. - address_out: Address to which outgoing connections will go. - address_port: Port to which outgoing connections will go. - """ - self.port_in = port_in - self.address_out = address_out - self.port_out = port_out - self.filter = filter - - try: - SocketServer.TCPServer.__init__(self, - ('', port_in), - self._Handler) - except socket.error: - os.system('sudo netstat -l --tcp -n -p') - raise - - def serve_forever_in_thread(self): - """Helper method to start the server in a new background thread.""" - server_thread = threading.Thread(target=self.serve_forever) - server_thread.setDaemon(True) - server_thread.start() - - return server_thread diff --git a/bin/au_test_harness/dev_server_wrapper.py b/bin/au_test_harness/dev_server_wrapper.py deleted file mode 100644 index 3ddc1b8a34..0000000000 --- a/bin/au_test_harness/dev_server_wrapper.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Module containing methods and classes to interact with a devserver instance. -""" - -import os -import threading - -import cros_build_lib as cros_lib - -def GenerateUpdateId(target, src, key): - """Returns a simple representation id of target and src paths.""" - update_id = target - if src: update_id = '->'.join([src, update_id]) - if key: update_id = '+'.join([update_id, key]) - return update_id - -class DevServerWrapper(threading.Thread): - """A Simple wrapper around a dev server instance.""" - - def __init__(self, test_root): - self.proc = None - self.test_root = test_root - threading.Thread.__init__(self) - - def run(self): - # Kill previous running instance of devserver if it exists. - cros_lib.RunCommand(['sudo', 'pkill', '-f', 'devserver.py'], error_ok=True, - print_cmd=False) - cros_lib.RunCommand(['sudo', - 'start_devserver', - '--archive_dir=./static', - '--client_prefix=ChromeOSUpdateEngine', - '--production', - ], enter_chroot=True, print_cmd=False, - log_to_file=os.path.join(self.test_root, - 'dev_server.log')) - - def Stop(self): - """Kills the devserver instance.""" - cros_lib.RunCommand(['sudo', 'pkill', '-f', 'devserver.py'], error_ok=True, - print_cmd=False) - - @classmethod - def GetDevServerURL(cls, port, sub_dir): - """Returns the dev server url for a given port and sub directory.""" - ip_addr = cros_lib.GetIPAddress() - if not port: port = 8080 - url = 'http://%(ip)s:%(port)s/%(dir)s' % {'ip': ip_addr, - 'port': str(port), - 'dir': sub_dir} - return url diff --git a/bin/au_test_harness/dummy_au_worker.py b/bin/au_test_harness/dummy_au_worker.py deleted file mode 100644 index fa1bee042a..0000000000 --- a/bin/au_test_harness/dummy_au_worker.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Module containing a fake au worker class.""" - -import unittest - -import au_worker - -class DummyAUWorker(au_worker.AUWorker): - """AU worker that emulates work for an au_worker without actually doing work. - - Collects different updates that would be generated that can be obtained - from the class object delta_list. - """ - - # Class variable that stores the list of payloads that would be needed. - delta_list = {} - - def __init__(self, options, test_results_root): - au_worker.AUWorker.__init__(self, options, test_results_root) - self.au_type = options.type - - def PrepareBase(self, image_path): - """Copy how the actual worker would prepare the base image.""" - if self.au_type == 'vm': - self.PrepareVMBase(image_path) - else: - self.PrepareRealBase(image_path) - - def UpdateImage(self, image_path, src_image_path='', stateful_change='old', - proxy_port=None, private_key_path=None): - """Emulate Update and record the update payload in delta_list.""" - if self.au_type == 'vm' and src_image_path and self._first_update: - src_image_path = self.vm_image_path - self._first_update = False - - # Generate a value that combines delta with private key path. - val = src_image_path - if private_key_path: val = '%s+%s' % (val, private_key_path) - if not self.delta_list.has_key(image_path): - self.delta_list[image_path] = set([val]) - else: - self.delta_list[image_path].add(val) diff --git a/bin/au_test_harness/parallel_test_job.py b/bin/au_test_harness/parallel_test_job.py deleted file mode 100644 index 096ec17043..0000000000 --- a/bin/au_test_harness/parallel_test_job.py +++ /dev/null @@ -1,109 +0,0 @@ -# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Module containing methods/classes related to running parallel test jobs.""" - -import sys -import threading -import time - -import cros_build_lib as cros_lib - -class ParallelJob(threading.Thread): - """Small wrapper for threading. Thread that releases a semaphores on exit.""" - - def __init__(self, starting_semaphore, ending_semaphore, target, args): - """Initializes an instance of a job. - - Args: - starting_semaphore: Semaphore used by caller to wait on such that - there isn't more than a certain number of threads running. Should - be initialized to a value for the number of threads wanting to be run - at a time. - ending_semaphore: Semaphore is released every time a job ends. Should be - initialized to 0 before starting first job. Should be acquired once for - each job. Threading.Thread.join() has a bug where if the run function - terminates too quickly join() will hang forever. - target: The func to run. - args: Args to pass to the fun. - """ - threading.Thread.__init__(self, target=target, args=args) - self._target = target - self._args = args - self._starting_semaphore = starting_semaphore - self._ending_semaphore = ending_semaphore - self._output = None - self._completed = False - - def run(self): - """Thread override. Runs the method specified and sets output.""" - try: - self._output = self._target(*self._args) - finally: - # Our own clean up. - self._Cleanup() - self._completed = True - # From threading.py to avoid a refcycle. - del self._target, self._args - - def GetOutput(self): - """Returns the output of the method run.""" - assert self._completed, 'GetOutput called before thread was run.' - return self._output - - def _Cleanup(self): - """Releases semaphores for a waiting caller.""" - self._starting_semaphore.release() - self._ending_semaphore.release() - - def __str__(self): - return '%s(%s)' % (self._target, self._args) - - -def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args, - print_status): - """Runs set number of specified jobs in parallel. - - Args: - number_of_simultaneous_jobs: Max number of threads to be run in parallel. - jobs: Array of methods to run. - jobs_args: Array of args associated with method calls. - print_status: True if you'd like this to print out .'s as it runs jobs. - Returns: - Returns an array of results corresponding to each thread. - """ - def _TwoTupleize(x, y): - return (x, y) - - threads = [] - job_start_semaphore = threading.Semaphore(number_of_simultaneous_jobs) - join_semaphore = threading.Semaphore(0) - assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' - - # Create the parallel jobs. - for job, args in map(_TwoTupleize, jobs, jobs_args): - thread = ParallelJob(job_start_semaphore, join_semaphore, target=job, - args=args) - threads.append(thread) - - # Cache sudo access. - cros_lib.RunCommand(['sudo', 'echo', 'Caching sudo credentials'], - print_cmd=False, redirect_stdout=True, - redirect_stderr=True) - - # We use a semaphore to ensure we don't run more jobs than required. - # After each thread finishes, it releases (increments semaphore). - # Acquire blocks of num jobs reached and continues when a thread finishes. - for next_thread in threads: - job_start_semaphore.acquire(blocking=True) - next_thread.start() - - # Wait on the rest of the threads to finish. - for thread in threads: - while not join_semaphore.acquire(blocking=False): - time.sleep(5) - if print_status: - print >> sys.stderr, '.', - - return [thread.GetOutput() for thread in threads] diff --git a/bin/au_test_harness/public_key_manager.py b/bin/au_test_harness/public_key_manager.py deleted file mode 100644 index 355e7445fd..0000000000 --- a/bin/au_test_harness/public_key_manager.py +++ /dev/null @@ -1,91 +0,0 @@ -# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""This module manages interactions between an image and a public key.""" - -import os -import tempfile - -import cros_build_lib as cros_lib - -class PublicKeyManager(object): - """Class wrapping interactions with a public key on an image.""" - TARGET_KEY_PATH = 'usr/share/update_engine/update-payload-key.pub.pem' - - def __init__(self, image_path, key_path): - """Initializes a manager with image_path and key_path we plan to insert.""" - self.image_path = image_path - self.key_path = key_path - self._rootfs_dir = tempfile.mkdtemp(suffix='rootfs', prefix='tmp') - self._stateful_dir = tempfile.mkdtemp(suffix='stateful', prefix='tmp') - - # Gather some extra information about the image. - try: - cros_lib.MountImage(image_path, self._rootfs_dir, self._stateful_dir, - read_only=True) - self._full_target_key_path = os.path.join( - self._rootfs_dir, PublicKeyManager.TARGET_KEY_PATH) - self._is_key_new = True - if os.path.exists(self._full_target_key_path): - diff_output = cros_lib.RunCommand(['diff', - self.key_path, - self._full_target_key_path], - print_cmd=False, redirect_stdout=True, - redirect_stderr=True, error_ok=True) - - if not diff_output: self._is_key_new = False - - finally: - cros_lib.UnmountImage(self._rootfs_dir, self._stateful_dir) - - def __del__(self): - """Remove our temporary directories we created in init.""" - os.rmdir(self._rootfs_dir) - os.rmdir(self._stateful_dir) - - def AddKeyToImage(self): - """Adds the key specified in init to the image.""" - if not self._is_key_new: - cros_lib.Info('Public key already on image %s. No work to do.' % - self.image_path) - return - - cros_lib.Info('Copying %s into %s' % (self.key_path, self.image_path)) - try: - cros_lib.MountImage(self.image_path, self._rootfs_dir, self._stateful_dir, - read_only=False) - - dir_path = os.path.dirname(self._full_target_key_path) - cros_lib.RunCommand(['sudo', 'mkdir', '--parents', dir_path], - print_cmd=False) - cros_lib.RunCommand(['sudo', 'cp', '--force', '-p', self.key_path, - self._full_target_key_path], print_cmd=False) - finally: - cros_lib.UnmountImage(self._rootfs_dir, self._stateful_dir) - self._MakeImageBootable() - - def RemoveKeyFromImage(self): - """Removes the key specified in init from the image.""" - cros_lib.Info('Removing public key from image %s.' % self.image_path) - try: - cros_lib.MountImage(self.image_path, self._rootfs_dir, self._stateful_dir, - read_only=False) - cros_lib.RunCommand(['sudo', 'rm', '--force', self._full_target_key_path], - print_cmd=False) - finally: - cros_lib.UnmountImage(self._rootfs_dir, self._stateful_dir) - self._MakeImageBootable() - - def _MakeImageBootable(self): - """Makes the image bootable. Note, it is only useful for non-vm images.""" - image = os.path.basename(self.image_path) - if 'qemu' in image: - return - - from_dir = os.path.dirname(self.image_path) - cros_lib.RunCommand(['bin/cros_make_image_bootable', - cros_lib.ReinterpretPathForChroot(from_dir), - image], print_cmd=False, redirect_stdout=True, - redirect_stderr=True, enter_chroot=True, - cwd=cros_lib.CROSUTILS_DIRECTORY) diff --git a/bin/au_test_harness/real_au_worker.py b/bin/au_test_harness/real_au_worker.py deleted file mode 100644 index 43b8c2e62c..0000000000 --- a/bin/au_test_harness/real_au_worker.py +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Module containing class that implements an au_worker for a test device.""" - -import unittest - -import cros_build_lib as cros_lib - -import au_worker - -class RealAUWorker(au_worker.AUWorker): - """Test harness for updating real images.""" - - def __init__(self, options, test_results_root): - """Processes non-vm-specific options.""" - au_worker.AUWorker.__init__(self, options, test_results_root) - self.remote = options.remote - if not self.remote: cros_lib.Die('We require a remote address for tests.') - - def PrepareBase(self, image_path): - """Auto-update to base image to prepare for test.""" - self.PrepareRealBase(image_path) - - def UpdateImage(self, image_path, src_image_path='', stateful_change='old', - proxy_port=None, private_key_path=None): - """Updates a remote image using image_to_live.sh.""" - stateful_change_flag = self.GetStatefulChangeFlag(stateful_change) - cmd = ['%s/image_to_live.sh' % self.crosutils, - '--remote=%s' % self.remote, - stateful_change_flag, - '--verify', - ] - self.AppendUpdateFlags(cmd, image_path, src_image_path, proxy_port, - private_key_path) - self.RunUpdateCmd(cmd) - - def UpdateUsingPayload(self, update_path, stateful_change='old', - proxy_port=None): - """Updates a remote image using image_to_live.sh.""" - stateful_change_flag = self.GetStatefulChangeFlag(stateful_change) - cmd = ['%s/image_to_live.sh' % self.crosutils, - '--payload=%s' % update_path, - '--remote=%s' % self.remote, - stateful_change_flag, - '--verify', - ] - if proxy_port: cmd.append('--proxy_port=%s' % proxy_port) - self.RunUpdateCmd(cmd) - - def VerifyImage(self, unittest, percent_required_to_pass=100): - """Verifies an image using run_remote_tests.sh with verification suite.""" - test_directory = self.GetNextResultsPath('verify') - output = cros_lib.RunCommand( - ['%s/run_remote_tests.sh' % self.crosutils, - '--remote=%s' % self.remote, - '--results_dir_root=%s' % test_directory, - self.verify_suite, - ], error_ok=True, enter_chroot=False, redirect_stdout=True) - return self.AssertEnoughTestsPassed(unittest, output, - percent_required_to_pass) - diff --git a/bin/au_test_harness/update_exception.py b/bin/au_test_harness/update_exception.py deleted file mode 100644 index 4c1e73ec64..0000000000 --- a/bin/au_test_harness/update_exception.py +++ /dev/null @@ -1,11 +0,0 @@ -# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Module containing update exceptions.""" - -class UpdateException(Exception): - """Exception thrown when _UpdateImage or _UpdateUsingPayload fail""" - def __init__(self, code, stdout): - self.code = code - self.stdout = stdout diff --git a/bin/au_test_harness/vm_au_worker.py b/bin/au_test_harness/vm_au_worker.py deleted file mode 100644 index 9788fddcb2..0000000000 --- a/bin/au_test_harness/vm_au_worker.py +++ /dev/null @@ -1,120 +0,0 @@ -# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Module containing implementation of an au_worker for virtual machines.""" - -import os -import threading -import unittest - -import cros_build_lib as cros_lib - -import au_worker - - -class VMAUWorker(au_worker.AUWorker): - """Test harness for updating virtual machines.""" - - # Class variables used to acquire individual VM variables per test. - _vm_lock = threading.Lock() - _next_port = 9222 - - def __init__(self, options, test_results_root): - """Processes vm-specific options.""" - au_worker.AUWorker.__init__(self, options, test_results_root) - self.graphics_flag = '' - if options.no_graphics: self.graphics_flag = '--no_graphics' - if not self.board: cros_lib.Die('Need board to convert base image to vm.') - - self._AcquireUniquePortAndPidFile() - self._KillExistingVM(self._kvm_pid_file) - - def _KillExistingVM(self, pid_file): - """Kills an existing VM specified by the pid_file.""" - if os.path.exists(pid_file): - cros_lib.Warning('Existing %s found. Deleting and killing process' % - pid_file) - cros_lib.RunCommand(['./cros_stop_vm', '--kvm_pid=%s' % pid_file], - cwd=self.crosutilsbin) - - assert not os.path.exists(pid_file) - - def _AcquireUniquePortAndPidFile(self): - """Acquires unique ssh port and pid file for VM.""" - with VMAUWorker._vm_lock: - self._ssh_port = VMAUWorker._next_port - self._kvm_pid_file = '/tmp/kvm.%d' % self._ssh_port - VMAUWorker._next_port += 1 - - def CleanUp(self): - """Stop the vm after a test.""" - self._KillExistingVM(self._kvm_pid_file) - - def PrepareBase(self, image_path): - """Creates an update-able VM based on base image.""" - self.PrepareVMBase(image_path) - - def UpdateImage(self, image_path, src_image_path='', stateful_change='old', - proxy_port='', private_key_path=None): - """Updates VM image with image_path.""" - log_directory = self.GetNextResultsPath('update') - stateful_change_flag = self.GetStatefulChangeFlag(stateful_change) - if src_image_path and self._first_update: - src_image_path = self.vm_image_path - self._first_update = False - - cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, - '--vm_image_path=%s' % self.vm_image_path, - '--update_log=%s' % os.path.join(log_directory, 'update_engine.log'), - '--snapshot', - self.graphics_flag, - '--persist', - '--kvm_pid=%s' % self._kvm_pid_file, - '--ssh_port=%s' % self._ssh_port, - stateful_change_flag, - ] - self.AppendUpdateFlags(cmd, image_path, src_image_path, proxy_port, - private_key_path) - self.RunUpdateCmd(cmd, log_directory) - - def UpdateUsingPayload(self, update_path, stateful_change='old', - proxy_port=None): - """Updates a vm image using cros_run_vm_update.""" - log_directory = self.GetNextResultsPath('update') - stateful_change_flag = self.GetStatefulChangeFlag(stateful_change) - cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, - '--payload=%s' % update_path, - '--vm_image_path=%s' % self.vm_image_path, - '--update_log=%s' % os.path.join(log_directory, 'update_engine.log'), - '--snapshot', - self.graphics_flag, - '--persist', - '--kvm_pid=%s' % self._kvm_pid_file, - '--ssh_port=%s' % self._ssh_port, - stateful_change_flag, - ] - if proxy_port: cmd.append('--proxy_port=%s' % proxy_port) - self.RunUpdateCmd(cmd, log_directory) - - def VerifyImage(self, unittest, percent_required_to_pass=100): - """Runs vm smoke suite to verify image.""" - log_directory = self.GetNextResultsPath('verify') - (_, _, log_directory_in_chroot) = log_directory.rpartition('chroot') - # image_to_live already verifies lsb-release matching. This is just - # for additional steps. - commandWithArgs = ['%s/cros_run_vm_test' % self.crosutilsbin, - '--image_path=%s' % self.vm_image_path, - '--snapshot', - '--persist', - '--kvm_pid=%s' % self._kvm_pid_file, - '--ssh_port=%s' % self._ssh_port, - '--results_dir_root=%s' % log_directory_in_chroot, - self.verify_suite, - ] - if self.graphics_flag: commandWithArgs.append(self.graphics_flag) - output = cros_lib.RunCommand(commandWithArgs, error_ok=True, - enter_chroot=False, redirect_stdout=True) - return self.AssertEnoughTestsPassed(unittest, output, - percent_required_to_pass) - diff --git a/bin/cros_au_test_harness b/bin/cros_au_test_harness index 601ca3047f..daa10b912c 120000 --- a/bin/cros_au_test_harness +++ b/bin/cros_au_test_harness @@ -1 +1 @@ -au_test_harness/cros_au_test_harness.py \ No newline at end of file +../../platform/crostestutils/au_test_harness/cros_au_test_harness.py \ No newline at end of file diff --git a/bin/ctest b/bin/ctest index 2a65c0d330..cb76d89795 120000 --- a/bin/ctest +++ b/bin/ctest @@ -1 +1 @@ -ctest.py \ No newline at end of file +../../platform/crostestutils/ctest/ctest.py \ No newline at end of file diff --git a/bin/ctest.py b/bin/ctest.py deleted file mode 100755 index 3e424b7046..0000000000 --- a/bin/ctest.py +++ /dev/null @@ -1,320 +0,0 @@ -#!/usr/bin/python -# -# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Wrapper for tests that are run on builders.""" - -import fileinput -import optparse -import os -import re -import sys -import traceback -import urllib -import HTMLParser - -sys.path.append(os.path.join(os.path.dirname(__file__), '../lib')) -from cros_build_lib import Info -from cros_build_lib import ReinterpretPathForChroot -from cros_build_lib import RunCommand -from cros_build_lib import Warning - -_IMAGE_TO_EXTRACT = 'chromiumos_test_image.bin' -_NEW_STYLE_VERSION = '0.9.131.0' - -class HTMLDirectoryParser(HTMLParser.HTMLParser): - """HTMLParser for parsing the default apache file index.""" - - def __init__(self, regex): - HTMLParser.HTMLParser.__init__(self) - self.regex_object = re.compile(regex) - self.link_list = [] - - def handle_starttag(self, tag, attrs): - """Overrides from HTMLParser and is called at the start of every tag. - - This implementation grabs attributes from links (i.e. - and adds the target from href= if the matches the - regex given at the start. - """ - if not tag.lower() == 'a': - return - - for attr in attrs: - if not attr[0].lower() == 'href': - continue - - match = self.regex_object.match(attr[1]) - if match: - self.link_list.append(match.group(0).rstrip('/')) - - -def ModifyBootDesc(download_folder, redirect_file=None): - """Modifies the boot description of a downloaded image to work with path. - - The default boot.desc from another system is specific to the directory - it was created in. This modifies the boot description to be compatiable - with the download folder. - - Args: - download_folder: Absoulte path to the download folder. - redirect_file: For testing. Where to copy new boot desc. - """ - boot_desc_path = os.path.join(download_folder, 'boot.desc') - in_chroot_folder = ReinterpretPathForChroot(download_folder) - - for line in fileinput.input(boot_desc_path, inplace=1): - # Has to be done here to get changes to sys.stdout from fileinput.input. - if not redirect_file: - redirect_file = sys.stdout - split_line = line.split('=') - if len(split_line) > 1: - var_part = split_line[0] - potential_path = split_line[1].replace('"', '').strip() - - if potential_path.startswith('/home') and not 'output_dir' in var_part: - new_path = os.path.join(in_chroot_folder, - os.path.basename(potential_path)) - new_line = '%s="%s"' % (var_part, new_path) - Info('Replacing line %s with %s' % (line, new_line)) - redirect_file.write('%s\n' % new_line) - continue - elif 'output_dir' in var_part: - # Special case for output_dir. - new_line = '%s="%s"' % (var_part, in_chroot_folder) - Info('Replacing line %s with %s' % (line, new_line)) - redirect_file.write('%s\n' % new_line) - continue - - # Line does not need to be modified. - redirect_file.write(line) - - fileinput.close() - - -def _GreaterVersion(version_a, version_b): - """Returns the higher version number of two version number strings.""" - version_regex = re.compile('.*(\d+)\.(\d+)\.(\d+)\.(\d+).*') - version_a_tokens = version_regex.match(version_a).groups() - version_b_tokens = version_regex.match(version_b).groups() - for i in range(4): - (a, b) = (int(version_a_tokens[i]), int(version_b_tokens[i])) - if a != b: - if a > b: return version_a - return version_b - return version_a - - -def GetLatestLinkFromPage(url, regex): - """Returns the latest link from the given url that matches regex. - - Args: - url: Url to download and parse. - regex: Regular expression to match links against. - """ - url_file = urllib.urlopen(url) - url_html = url_file.read() - - url_file.close() - - # Parses links with versions embedded. - url_parser = HTMLDirectoryParser(regex=regex) - url_parser.feed(url_html) - return reduce(_GreaterVersion, url_parser.link_list) - - -def GetNewestLinkFromZipBase(board, channel, zip_server_base): - """Returns the url to the newest image from the zip server. - - Args: - board: board for the image zip. - channel: channel for the image zip. - zip_server_base: base url for zipped images. - """ - zip_base = os.path.join(zip_server_base, channel, board) - latest_version = GetLatestLinkFromPage(zip_base, '\d+\.\d+\.\d+\.\d+/') - - zip_dir = os.path.join(zip_base, latest_version) - zip_name = GetLatestLinkFromPage(zip_dir, - 'ChromeOS-\d+\.\d+\.\d+\.\d+-.*\.zip') - return os.path.join(zip_dir, zip_name) - - -def GetLatestZipUrl(board, channel, latest_url_base, zip_server_base): - """Returns the url of the latest image zip for the given arguments. - - Args: - board: board for the image zip. - channel: channel for the image zip. - latest_url_base: base url for latest links. - zip_server_base: base url for zipped images. - """ - if latest_url_base: - try: - # Grab the latest image info. - latest_file_url = os.path.join(latest_url_base, channel, - 'LATEST-%s' % board) - latest_image_file = urllib.urlopen(latest_file_url) - latest_image = latest_image_file.read() - latest_image_file.close() - # Convert bin.gz into zip. - latest_image = latest_image.replace('.bin.gz', '.zip') - version = latest_image.split('-')[1] - zip_base = os.path.join(zip_server_base, channel, board) - return os.path.join(zip_base, version, latest_image) - except IOError: - Warning(('Could not use latest link provided, defaulting to parsing' - ' latest from zip url base.')) - - try: - return GetNewestLinkFromZipBase(board, channel, zip_server_base) - except: - Warning('Failed to get url from standard zip base. Trying rc.') - return GetNewestLinkFromZipBase(board + '-rc', channel, zip_server_base) - - -def GrabZipAndExtractImage(zip_url, download_folder, image_name) : - """Downloads the zip and extracts the given image. - - Doesn't re-download if matching version found already in download folder. - Args: - zip_url - url for the image. - download_folder - download folder to store zip file and extracted images. - image_name - name of the image to extract from the zip file. - """ - zip_path = os.path.join(download_folder, 'image.zip') - versioned_url_path = os.path.join(download_folder, 'download_url') - found_cached = False - - if os.path.exists(versioned_url_path): - fh = open(versioned_url_path) - version_url = fh.read() - fh.close() - - if version_url == zip_url and os.path.exists(os.path.join(download_folder, - image_name)): - Info('Using cached %s' % image_name) - found_cached = True - - if not found_cached: - Info('Downloading %s' % zip_url) - RunCommand(['rm', '-rf', download_folder], print_cmd=False) - os.mkdir(download_folder) - urllib.urlretrieve(zip_url, zip_path) - - # Using unzip because python implemented unzip in native python so - # extraction is really slow. - Info('Unzipping image %s' % image_name) - RunCommand(['unzip', '-d', download_folder, zip_path], - print_cmd=False, error_message='Failed to download %s' % zip_url) - - ModifyBootDesc(download_folder) - - # Put url in version file so we don't have to do this every time. - fh = open(versioned_url_path, 'w+') - fh.write(zip_url) - fh.close() - - version = zip_url.split('/')[-2] - if not _GreaterVersion(version, _NEW_STYLE_VERSION) == version: - # If the version isn't ready for new style, touch file to use old style. - old_style_touch_path = os.path.join(download_folder, '.use_e1000') - fh = open(old_style_touch_path, 'w+') - fh.close() - - -def RunAUTestHarness(board, channel, latest_url_base, zip_server_base, - no_graphics, type, remote, clean, test_results_root): - """Runs the auto update test harness. - - The auto update test harness encapsulates testing the auto-update mechanism - for the latest image against the latest official image from the channel. This - also tests images with suite_Smoke (built-in as part of its verification - process). - - Args: - board: the board for the latest image. - channel: the channel to run the au test harness against. - latest_url_base: base url for getting latest links. - zip_server_base: base url for zipped images. - no_graphics: boolean - If True, disable graphics during vm test. - type: which test harness to run. Possible values: real, vm. - remote: ip address for real test harness run. - clean: Clean the state of test harness before running. - test_results_root: Root directory to store au_test_harness results. - """ - crosutils_root = os.path.join(os.path.dirname(__file__), '..') - download_folder = os.path.abspath('latest_download') - zip_url = GetLatestZipUrl(board, channel, latest_url_base, zip_server_base) - GrabZipAndExtractImage(zip_url, download_folder, _IMAGE_TO_EXTRACT) - - # Tests go here. - latest_image = RunCommand(['./get_latest_image.sh', '--board=%s' % board], - cwd=crosutils_root, redirect_stdout=True, - print_cmd=True).strip() - - update_engine_path = os.path.join(crosutils_root, '..', 'platform', - 'update_engine') - - cmd = ['bin/cros_au_test_harness', - '--base_image=%s' % os.path.join(download_folder, - _IMAGE_TO_EXTRACT), - '--target_image=%s' % os.path.join(latest_image, - _IMAGE_TO_EXTRACT), - '--board=%s' % board, - '--type=%s' % type, - '--remote=%s' % remote, - '--private_key=%s' % os.path.join(update_engine_path, - 'unittest_key.pem'), - '--public_key=%s' % os.path.join(update_engine_path, - 'unittest_key.pub.pem'), - ] - if test_results_root: cmd.append('--test_results_root=%s' % test_results_root) - if no_graphics: cmd.append('--no_graphics') - if clean: cmd.append('--clean') - - RunCommand(cmd, cwd=crosutils_root) - - -def main(): - parser = optparse.OptionParser() - parser.add_option('-b', '--board', - help='board for the image to compare against.') - parser.add_option('-c', '--channel', - help='channel for the image to compare against.') - parser.add_option('--cache', default=False, action='store_true', - help='Cache payloads') - parser.add_option('-l', '--latestbase', - help='Base url for latest links.') - parser.add_option('-z', '--zipbase', - help='Base url for hosted images.') - parser.add_option('--no_graphics', action='store_true', default=False, - help='Disable graphics for the vm test.') - parser.add_option('--test_results_root', default=None, - help='Root directory to store test results. Should ' - 'be defined relative to chroot root.') - parser.add_option('--type', default='vm', - help='type of test to run: [vm, real]. Default: vm.') - parser.add_option('--remote', default='0.0.0.0', - help='For real tests, ip address of the target machine.') - - # Set the usage to include flags. - parser.set_usage(parser.format_help()) - (options, args) = parser.parse_args() - - if args: parser.error('Extra args found %s.' % args) - if not options.board: parser.error('Need board for image to compare against.') - if not options.channel: parser.error('Need channel e.g. dev-channel.') - if not options.zipbase: parser.error('Need zip url base to get images.') - - RunAUTestHarness(options.board, options.channel, options.latestbase, - options.zipbase, options.no_graphics, options.type, - options.remote, not options.cache, - options.test_results_root) - - -if __name__ == '__main__': - main() - diff --git a/bin/ctest_unittest.py b/bin/ctest_unittest.py deleted file mode 100755 index cc00e329ce..0000000000 --- a/bin/ctest_unittest.py +++ /dev/null @@ -1,228 +0,0 @@ -#!/usr/bin/python -# -# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Unit tests for ctest.""" - -import mox -import os -import unittest -import urllib - -import ctest - -_TEST_BOOT_DESC = """ - --arch="x86" - --output_dir="/home/chrome-bot/0.8.70.5-a1" - --espfs_mountpoint="/home/chrome-bot/0.8.70.5-a1/esp" - --enable_rootfs_verification -""" - -class CrosTestTest(mox.MoxTestBase): - """Test class for CTest.""" - - def setUp(self): - mox.MoxTestBase.setUp(self) - self.board = 'test-board' - self.channel = 'test-channel' - self.version = '1.2.3.4.5' - self.revision = '7ghfa9999-12345' - self.image_name = 'TestOS-%s-%s' % (self.version, self.revision) - self.download_folder = 'test_folder' - self.latestbase = 'http://test-latest/TestOS' - self.zipbase = 'http://test-zips/archive/TestOS' - self.image_url = '%s/%s/%s/%s/%s.zip' % (self.zipbase, self.channel, - self.board, self.version, - self.image_name) - self.test_regex = 'ChromeOS-\d+\.\d+\.\d+\.\d+-.*\.zip' - - def testModifyBootDesc(self): - """Tests to make sure we correctly modify a boot desc.""" - in_chroot_path = ctest.ReinterpretPathForChroot(os.path.abspath( - self.download_folder)) - self.mox.StubOutWithMock(__builtins__, 'open') - self.mox.StubOutWithMock(ctest.fileinput, 'input') - m_file = self.mox.CreateMock(file) - - mock_file = _TEST_BOOT_DESC.splitlines(True) - ctest.fileinput.input('%s/%s' % (os.path.abspath(self.download_folder), - 'boot.desc'), - inplace=1).AndReturn(mock_file) - - m_file.write('\n') - m_file.write(' --arch="x86"\n') - m_file.write(' --output_dir="%s"\n' % in_chroot_path) - m_file.write(' --espfs_mountpoint="%s/%s"\n' % (in_chroot_path, 'esp')) - m_file.write(' --enable_rootfs_verification\n') - - self.mox.ReplayAll() - ctest.ModifyBootDesc(os.path.abspath(self.download_folder), m_file) - self.mox.VerifyAll() - - - def testGetLatestZipUrl(self): - """Test case that tests GetLatestZipUrl with test urls.""" - self.mox.StubOutWithMock(urllib, 'urlopen') - m_file = self.mox.CreateMock(file) - - urllib.urlopen('%s/%s/LATEST-%s' % (self.latestbase, self.channel, - self.board)).AndReturn(m_file) - m_file.read().AndReturn('%s.bin.gz' % self.image_name) - m_file.close() - - self.mox.ReplayAll() - self.assertEquals(ctest.GetLatestZipUrl(self.board, self.channel, - self.latestbase, self.zipbase), - self.image_url) - self.mox.VerifyAll() - - def testGetLatestZipFromBadUrl(self): - """Tests whether GetLatestZipUrl returns correct url given bad link.""" - self.mox.StubOutWithMock(urllib, 'urlopen') - self.mox.StubOutWithMock(ctest, 'GetNewestLinkFromZipBase') - m_file = self.mox.CreateMock(file) - - urllib.urlopen('%s/%s/LATEST-%s' % (self.latestbase, self.channel, - self.board)).AndRaise(IOError('Cannot open url.')) - ctest.GetNewestLinkFromZipBase(self.board, self.channel, - self.zipbase).AndReturn(self.image_url) - - self.mox.ReplayAll() - self.assertEquals(ctest.GetLatestZipUrl(self.board, self.channel, - self.latestbase, self.zipbase), - self.image_url) - self.mox.VerifyAll() - - def testGrabZipAndExtractImageUseCached(self): - """Test case where cache holds our image.""" - self.mox.StubOutWithMock(os.path, 'exists') - self.mox.StubOutWithMock(__builtins__, 'open') - m_file = self.mox.CreateMock(file) - - os.path.exists('%s/%s' % ( - self.download_folder, 'download_url')).AndReturn(True) - - open('%s/%s' % (self.download_folder, 'download_url')).AndReturn(m_file) - m_file.read().AndReturn(self.image_url) - m_file.close() - - os.path.exists('%s/%s' % ( - self.download_folder, ctest._IMAGE_TO_EXTRACT)).AndReturn(True) - - self.mox.ReplayAll() - ctest.GrabZipAndExtractImage(self.image_url, self.download_folder, - ctest._IMAGE_TO_EXTRACT) - self.mox.VerifyAll() - - def CommonDownloadAndExtractImage(self): - """Common code to mock downloading image, unzipping it and setting url.""" - zip_path = os.path.join(self.download_folder, 'image.zip') - m_file = self.mox.CreateMock(file) - - ctest.RunCommand(['rm', '-rf', self.download_folder], print_cmd=False) - os.mkdir(self.download_folder) - urllib.urlretrieve(self.image_url, zip_path) - ctest.RunCommand(['unzip', '-d', self.download_folder, zip_path], - print_cmd=False, error_message=mox.IgnoreArg()) - - ctest.ModifyBootDesc(self.download_folder) - - open('%s/%s' % (self.download_folder, 'download_url'), - 'w+').AndReturn(m_file) - m_file.write(self.image_url) - m_file.close() - - self.mox.ReplayAll() - ctest.GrabZipAndExtractImage(self.image_url, self.download_folder, - ctest._IMAGE_TO_EXTRACT) - self.mox.VerifyAll() - - def testGrabZipAndExtractImageNoCache(self): - """Test case where download_url doesn't exist.""" - self.mox.StubOutWithMock(os.path, 'exists') - self.mox.StubOutWithMock(os, 'mkdir') - self.mox.StubOutWithMock(__builtins__, 'open') - self.mox.StubOutWithMock(ctest, 'RunCommand') - self.mox.StubOutWithMock(urllib, 'urlretrieve') - self.mox.StubOutWithMock(ctest, 'ModifyBootDesc') - - m_file = self.mox.CreateMock(file) - - os.path.exists('%s/%s' % ( - self.download_folder, 'download_url')).AndReturn(False) - - self.CommonDownloadAndExtractImage() - - - def testGrabZipAndExtractImageWrongCache(self): - """Test case where download_url exists but doesn't match our url.""" - self.mox.StubOutWithMock(os.path, 'exists') - self.mox.StubOutWithMock(os, 'mkdir') - self.mox.StubOutWithMock(__builtins__, 'open') - self.mox.StubOutWithMock(ctest, 'RunCommand') - self.mox.StubOutWithMock(urllib, 'urlretrieve') - self.mox.StubOutWithMock(ctest, 'ModifyBootDesc') - - m_file = self.mox.CreateMock(file) - - os.path.exists('%s/%s' % ( - self.download_folder, 'download_url')).AndReturn(True) - - open('%s/%s' % (self.download_folder, 'download_url')).AndReturn(m_file) - m_file.read().AndReturn(self.image_url) - m_file.close() - - os.path.exists('%s/%s' % ( - self.download_folder, ctest._IMAGE_TO_EXTRACT)).AndReturn(False) - - self.CommonDownloadAndExtractImage() - - def testGetLatestLinkFromPage(self): - """Tests whether we get the latest link from a url given a regex.""" - test_url = 'test_url' - test_html = """ - - - -

Test Index

- Cruft - Cruft - testlink1/ - testlink2/ - testlink3/ - - """ - self.mox.StubOutWithMock(urllib, 'urlopen') - m_file = self.mox.CreateMock(file) - - urllib.urlopen(test_url).AndReturn(m_file) - m_file.read().AndReturn(test_html) - m_file.close() - - self.mox.ReplayAll() - latest_link = ctest.GetLatestLinkFromPage(test_url, regex=self.test_regex) - self.assertTrue(latest_link == 'ChromeOS-0.9.12.4-blahblah.zip') - self.mox.VerifyAll() - - -class HTMLDirectoryParserTest(unittest.TestCase): - """Test class for HTMLDirectoryParser.""" - - def setUp(self): - self.test_regex = '\d+\.\d+\.\d+\.\d+/' - - def testHandleStarttagGood(self): - parser = ctest.HTMLDirectoryParser(regex=self.test_regex) - parser.handle_starttag('a', [('href', '0.9.74.1/')]) - self.assertTrue('0.9.74.1' in parser.link_list) - - def testHandleStarttagBad(self): - parser = ctest.HTMLDirectoryParser(regex=self.test_regex) - parser.handle_starttag('a', [('href', 'ZsomeCruft/')]) - self.assertTrue('ZsomeCruft' not in parser.link_list) - - -if __name__ == '__main__': - unittest.main() diff --git a/lib/cros_build_lib.py b/lib/cros_build_lib.py index 5cd39b1bd3..d846757677 100644 --- a/lib/cros_build_lib.py +++ b/lib/cros_build_lib.py @@ -11,8 +11,8 @@ import subprocess import sys _STDOUT_IS_TTY = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty() -CROSUTILS_DIRECTORY = os.path.realpath(os.path.dirname(os.path.dirname( - __file__))) +CROSUTILS_DIRECTORY = os.path.dirname(os.path.dirname( + os.path.realpath(__file__))) # TODO(sosa): Move logging to logging module.