diff --git a/bin/au_test_harness/__init__.py b/bin/au_test_harness/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/bin/au_test_harness/au_test.py b/bin/au_test_harness/au_test.py
deleted file mode 100644
index 47aad12500..0000000000
--- a/bin/au_test_harness/au_test.py
+++ /dev/null
@@ -1,288 +0,0 @@
-# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Module containing a test suite that is run to test auto updates."""
-
-import os
-import tempfile
-import time
-import unittest
-
-import cros_build_lib as cros_lib
-
-import cros_test_proxy
-import dummy_au_worker
-import real_au_worker
-import vm_au_worker
-
-
-class AUTest(unittest.TestCase):
- """Test harness that uses an au_worker to perform and validate updates.
-
- Defines a test suite that is run using an au_worker. An au_worker can
- be created to perform and validates updates on both virtual and real devices.
- See documentation for au_worker for more information.
- """
- test_results_root = None
- public_key_managers = []
-
- @classmethod
- def ProcessOptions(cls, options, use_dummy_worker):
- """Processes options for the test suite and sets up the worker class.
-
- Args:
- options: options class to be parsed from main class.
- use_dummy_worker: If True, use a dummy_worker_class rather than deriving
- one from options.type.
- """
- cls.base_image_path = options.base_image
- cls.target_image_path = options.target_image
- cls.clean = options.clean
-
- assert options.type in ['real', 'vm'], 'Failed to specify either real|vm.'
- if use_dummy_worker:
- cls.worker_class = dummy_au_worker.DummyAUWorker
- elif options.type == 'vm':
- cls.worker_class = vm_au_worker.VMAUWorker
- else:
- cls.worker_class = real_au_worker.RealAUWorker
-
- # Sanity checks.
- if not cls.base_image_path:
- cros_lib.Die('Need path to base image for vm.')
- elif not os.path.exists(cls.base_image_path):
- cros_lib.Die('%s does not exist' % cls.base_image_path)
-
- if not cls.target_image_path:
- cros_lib.Die('Need path to target image to update with.')
- elif not os.path.exists(cls.target_image_path):
- cros_lib.Die('%s does not exist' % cls.target_image_path)
-
- # Initialize test root. Test root path must be in the chroot.
- if not cls.test_results_root:
- if options.test_results_root:
- assert 'chroot/tmp' in options.test_results_root, \
- 'Must specify a test results root inside tmp in a chroot.'
- cls.test_results_root = options.test_results_root
- else:
- cls.test_results_root = tempfile.mkdtemp(
- prefix='au_test_harness',
- dir=cros_lib.PrependChrootPath('/tmp'))
-
- cros_lib.Info('Using %s as the test results root' % cls.test_results_root)
-
- # Cache away options to instantiate workers later.
- cls.options = options
-
- def AttemptUpdateWithPayloadExpectedFailure(self, payload, expected_msg):
- """Attempt a payload update, expect it to fail with expected log"""
- try:
- self.worker.UpdateUsingPayload(payload)
- except UpdateException as err:
- # Will raise ValueError if expected is not found.
- if re.search(re.escape(expected_msg), err.stdout, re.MULTILINE):
- return
- else:
- cros_lib.Warning("Didn't find '%s' in:" % expected_msg)
- cros_lib.Warning(err.stdout)
-
- self.fail('We managed to update when failure was expected')
-
- def AttemptUpdateWithFilter(self, filter, proxy_port=8081):
- """Update through a proxy, with a specified filter, and expect success."""
- self.worker.PrepareBase(self.target_image_path)
-
- # The devserver runs at port 8080 by default. We assume that here, and
- # start our proxy at a different one. We then tell our update tools to
- # have the client connect to our proxy_port instead of 8080.
- proxy = cros_test_proxy.CrosTestProxy(port_in=proxy_port,
- address_out='127.0.0.1',
- port_out=8080,
- filter=filter)
- proxy.serve_forever_in_thread()
- try:
- self.worker.PerformUpdate(self.target_image_path, self.target_image_path,
- proxy_port=proxy_port)
- finally:
- proxy.shutdown()
-
- # --- UNITTEST SPECIFIC METHODS ---
-
- def setUp(self):
- """Overrides unittest.TestCase.setUp and called before every test.
-
- Sets instance specific variables and initializes worker.
- """
- unittest.TestCase.setUp(self)
- self.worker = self.worker_class(self.options, AUTest.test_results_root)
- self.crosutils = os.path.join(os.path.dirname(__file__), '..', '..')
- self.download_folder = os.path.join(self.crosutils, 'latest_download')
- if not os.path.exists(self.download_folder):
- os.makedirs(self.download_folder)
-
- def tearDown(self):
- """Overrides unittest.TestCase.tearDown and called after every test."""
- self.worker.CleanUp()
-
- def testUpdateKeepStateful(self):
- """Tests if we can update normally.
-
- This test checks that we can update by updating the stateful partition
- rather than wiping it.
- """
- self.worker.InitializeResultsDirectory()
- # Just make sure some tests pass on original image. Some old images
- # don't pass many tests.
- self.worker.PrepareBase(self.base_image_path)
- # TODO(sosa): move to 100% once we start testing using the autotest paired
- # with the dev channel.
- percent_passed = self.worker.VerifyImage(self, 10)
-
- # Update to - all tests should pass on new image.
- self.worker.PerformUpdate(self.target_image_path, self.base_image_path)
- percent_passed = self.worker.VerifyImage(self)
-
- # Update from - same percentage should pass that originally passed.
- self.worker.PerformUpdate(self.base_image_path, self.target_image_path)
- self.worker.VerifyImage(self, percent_passed)
-
- def testUpdateWipeStateful(self):
- """Tests if we can update after cleaning the stateful partition.
-
- This test checks that we can update successfully after wiping the
- stateful partition.
- """
- self.worker.InitializeResultsDirectory()
- # Just make sure some tests pass on original image. Some old images
- # don't pass many tests.
- self.worker.PrepareBase(self.base_image_path)
- percent_passed = self.worker.VerifyImage(self, 10)
-
- # Update to - all tests should pass on new image.
- self.worker.PerformUpdate(self.target_image_path, self.base_image_path,
- 'clean')
- self.worker.VerifyImage(self)
-
- # Update from - same percentage should pass that originally passed.
- self.worker.PerformUpdate(self.base_image_path, self.target_image_path,
- 'clean')
- self.worker.VerifyImage(self, percent_passed)
-
- def testInterruptedUpdate(self):
- """Tests what happens if we interrupt payload delivery 3 times."""
-
- class InterruptionFilter(cros_test_proxy.Filter):
- """This filter causes the proxy to interrupt the download 3 times
-
- It does this by closing the first three connections to transfer
- 2M total in the outbound connection after they transfer the
- 2M.
- """
- def __init__(self):
- """Defines variable shared across all connections"""
- self.close_count = 0
-
- def setup(self):
- """Called once at the start of each connection."""
- self.data_size = 0
-
- def OutBound(self, data):
- """Called once per packet for outgoing data.
-
- The first three connections transferring more than 2M
- outbound will be closed.
- """
- if self.close_count < 3:
- if self.data_size > (2 * 1024 * 1024):
- self.close_count += 1
- return None
-
- self.data_size += len(data)
- return data
-
- self.worker.InitializeResultsDirectory()
- self.AttemptUpdateWithFilter(InterruptionFilter(), proxy_port=8082)
-
- def testDelayedUpdate(self):
- """Tests what happens if some data is delayed during update delivery"""
-
- class DelayedFilter(cros_test_proxy.Filter):
- """Causes intermittent delays in data transmission.
-
- It does this by inserting 3 20 second delays when transmitting
- data after 2M has been sent.
- """
- def setup(self):
- """Called once at the start of each connection."""
- self.data_size = 0
- self.delay_count = 0
-
- def OutBound(self, data):
- """Called once per packet for outgoing data.
-
- The first three packets after we reach 2M transferred
- are delayed by 20 seconds.
- """
- if self.delay_count < 3:
- if self.data_size > (2 * 1024 * 1024):
- self.delay_count += 1
- time.sleep(20)
-
- self.data_size += len(data)
- return data
-
- self.worker.InitializeResultsDirectory()
- self.AttemptUpdateWithFilter(DelayedFilter(), proxy_port=8083)
-
- def SimpleTest(self):
- """A simple update that updates once from a base image to a target.
-
- We explicitly don't use test prefix so that isn't run by default. Can be
- run using test_prefix option.
- """
- self.worker.InitializeResultsDirectory()
- self.worker.PrepareBase(self.base_image_path)
- self.worker.PerformUpdate(self.target_image_path, self.base_image_path)
- self.worker.VerifyImage(self)
-
- # --- DISABLED TESTS ---
-
- # TODO(sosa): Get test to work with verbose.
- def NotestPartialUpdate(self):
- """Tests what happens if we attempt to update with a truncated payload."""
- self.worker.InitializeResultsDirectory()
- # Preload with the version we are trying to test.
- self.worker.PrepareBase(self.target_image_path)
-
- # Image can be updated at:
- # ~chrome-eng/chromeos/localmirror/autest-images
- url = 'http://gsdview.appspot.com/chromeos-localmirror/' \
- 'autest-images/truncated_image.gz'
- payload = os.path.join(self.download_folder, 'truncated_image.gz')
-
- # Read from the URL and write to the local file
- urllib.urlretrieve(url, payload)
-
- expected_msg = 'download_hash_data == update_check_response_hash failed'
- self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg)
-
- # TODO(sosa): Get test to work with verbose.
- def NotestCorruptedUpdate(self):
- """Tests what happens if we attempt to update with a corrupted payload."""
- self.worker.InitializeResultsDirectory()
- # Preload with the version we are trying to test.
- self.worker.PrepareBase(self.target_image_path)
-
- # Image can be updated at:
- # ~chrome-eng/chromeos/localmirror/autest-images
- url = 'http://gsdview.appspot.com/chromeos-localmirror/' \
- 'autest-images/corrupted_image.gz'
- payload = os.path.join(self.download_folder, 'corrupted.gz')
-
- # Read from the URL and write to the local file
- urllib.urlretrieve(url, payload)
-
- # This update is expected to fail...
- expected_msg = 'zlib inflate() error:-3'
- self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg)
diff --git a/bin/au_test_harness/au_worker.py b/bin/au_test_harness/au_worker.py
deleted file mode 100644
index 303c180208..0000000000
--- a/bin/au_test_harness/au_worker.py
+++ /dev/null
@@ -1,272 +0,0 @@
-# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Module that contains the interface for au_test_harness workers.
-
-An au test harnss worker is a class that contains the logic for performing
-and validating updates on a target. This should be subclassed to handle
-various types of target. Types of targets include VM's, real devices, etc.
-"""
-
-import inspect
-import threading
-import os
-import sys
-
-import cros_build_lib as cros_lib
-
-import dev_server_wrapper
-import update_exception
-
-
-class AUWorker(object):
- """Interface for a worker that updates and verifies images."""
- # Mapping between cached payloads to directory locations.
- update_cache = None
-
- # --- INTERFACE ---
-
- def __init__(self, options, test_results_root):
- """Processes options for the specific-type of worker."""
- self.board = options.board
- self.private_key = options.private_key
- self.test_results_root = test_results_root
- self.use_delta_updates = options.delta
- self.verbose = options.verbose
- self.vm_image_path = None
- if options.quick_test:
- self.verify_suite = 'build_RootFilesystemSize'
- else:
- self.verify_suite = 'suite_Smoke'
-
- # Set these up as they are used often.
- self.crosutils = os.path.join(os.path.dirname(__file__), '..', '..')
- self.crosutilsbin = os.path.join(os.path.dirname(__file__), '..')
-
- def CleanUp(self):
- """Called at the end of every test."""
- pass
-
- def UpdateImage(self, image_path, src_image_path='', stateful_change='old',
- proxy_port=None, private_key_path=None):
- """Implementation of an actual update.
-
- See PerformUpdate for description of args. Subclasses must override this
- method with the correct update procedure for the class.
- """
- pass
-
- def UpdateUsingPayload(self, update_path, stateful_change='old',
- proxy_port=None):
- """Updates target with the pre-generated update stored in update_path.
-
- Subclasses must override this method with the correct update procedure for
- the class.
-
- Args:
- update_path: Path to the image to update with. This directory should
- contain both update.gz, and stateful.image.gz
- proxy_port: Port to have the client connect to. For use with
- CrosTestProxy.
- """
- pass
-
- def VerifyImage(self, unittest, percent_required_to_pass=100):
- """Verifies the image with tests.
-
- Verifies that the test images passes the percent required. Subclasses must
- override this method with the correct update procedure for the class.
-
- Args:
- unittest: pointer to a unittest to fail if we cannot verify the image.
- percent_required_to_pass: percentage required to pass. This should be
- fall between 0-100.
-
- Returns:
- Returns the percent that passed.
- """
- pass
-
- # --- INTERFACE TO AU_TEST ---
-
- def PerformUpdate(self, image_path, src_image_path='', stateful_change='old',
- proxy_port=None, private_key_path=None):
- """Performs an update using _UpdateImage and reports any error.
-
- Subclasses should not override this method but override _UpdateImage
- instead.
-
- Args:
- image_path: Path to the image to update with. This image must be a test
- image.
- src_image_path: Optional. If set, perform a delta update using the
- image specified by the path as the source image.
- stateful_change: How to modify the stateful partition. Values are:
- 'old': Don't modify stateful partition. Just update normally.
- 'clean': Uses clobber-state to wipe the stateful partition with the
- exception of code needed for ssh.
- proxy_port: Port to have the client connect to. For use with
- CrosTestProxy.
- private_key_path: Path to a private key to use with update payload.
- Raises an update_exception.UpdateException if _UpdateImage returns an error.
- """
- try:
- if not self.use_delta_updates: src_image_path = ''
- if private_key_path:
- key_to_use = private_key_path
- else:
- key_to_use = self.private_key
-
- self.UpdateImage(image_path, src_image_path, stateful_change,
- proxy_port, key_to_use)
- except update_exception.UpdateException as err:
- # If the update fails, print it out
- Warning(err.stdout)
- raise
-
- @classmethod
- def SetUpdateCache(cls, update_cache):
- """Sets the global update cache for getting paths to devserver payloads."""
- cls.update_cache = update_cache
-
- # --- METHODS FOR SUB CLASS USE ---
-
- def PrepareRealBase(self, image_path):
- """Prepares a remote device for worker test by updating it to the image."""
- self.UpdateImage(image_path)
-
- def PrepareVMBase(self, image_path):
- """Prepares a VM image for worker test by creating the VM file from the img.
- """
- # VM Constants.
- FULL_VDISK_SIZE = 6072
- FULL_STATEFULFS_SIZE = 3074
- # Needed for VM delta updates. We need to use the qemu image rather
- # than the base image on a first update. By tracking the first_update
- # we can set src_image to the qemu form of the base image when
- # performing generating the delta payload.
- self._first_update = True
- self.vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname(
- image_path)
- if not os.path.exists(self.vm_image_path):
- cros_lib.Info('Creating %s' % self.vm_image_path)
- cros_lib.RunCommand(['./image_to_vm.sh',
- '--full',
- '--from=%s' % cros_lib.ReinterpretPathForChroot(
- os.path.dirname(image_path)),
- '--vdisk_size=%s' % FULL_VDISK_SIZE,
- '--statefulfs_size=%s' % FULL_STATEFULFS_SIZE,
- '--board=%s' % self.board,
- '--test_image'
- ], enter_chroot=True, cwd=self.crosutils)
-
- cros_lib.Info('Using %s as base' % self.vm_image_path)
- assert os.path.exists(self.vm_image_path)
-
- def GetStatefulChangeFlag(self, stateful_change):
- """Returns the flag to pass to image_to_vm for the stateful change."""
- stateful_change_flag = ''
- if stateful_change:
- stateful_change_flag = '--stateful_update_flag=%s' % stateful_change
-
- return stateful_change_flag
-
- def AppendUpdateFlags(self, cmd, image_path, src_image_path, proxy_port,
- private_key_path):
- """Appends common args to an update cmd defined by an array.
-
- Modifies cmd in places by appending appropriate items given args.
- """
- if proxy_port: cmd.append('--proxy_port=%s' % proxy_port)
-
- # Get pregenerated update if we have one.
- update_id = dev_server_wrapper.GenerateUpdateId(image_path, src_image_path,
- private_key_path)
- cache_path = self.update_cache[update_id]
- if cache_path:
- update_url = dev_server_wrapper.DevServerWrapper.GetDevServerURL(
- proxy_port, cache_path)
- cmd.append('--update_url=%s' % update_url)
- else:
- cmd.append('--image=%s' % image_path)
- if src_image_path: cmd.append('--src_image=%s' % src_image_path)
-
- def RunUpdateCmd(self, cmd, log_directory=None):
- """Runs the given update cmd given verbose options.
-
- Raises an update_exception.UpdateException if the update fails.
- """
- if self.verbose:
- try:
- if log_directory:
- cros_lib.RunCommand(cmd, log_to_file=os.path.join(log_directory,
- 'update.log'))
- else:
- cros_lib.RunCommand(cmd)
- except Exception as e:
- Warning(str(e))
- raise update_exception.UpdateException(1, str(e))
- else:
- (code, stdout, stderr) = cros_lib.RunCommandCaptureOutput(cmd)
- if code != 0:
- Warning(stdout)
- raise update_exception.UpdateException(code, stdout)
-
- def AssertEnoughTestsPassed(self, unittest, output, percent_required_to_pass):
- """Helper function that asserts a sufficient number of tests passed.
-
- Args:
- output: stdout from a test run.
- percent_required_to_pass: percentage required to pass. This should be
- fall between 0-100.
- Returns:
- percent that passed.
- """
- cros_lib.Info('Output from VerifyImage():')
- print >> sys.stderr, output
- sys.stderr.flush()
- percent_passed = self._ParseGenerateTestReportOutput(output)
- cros_lib.Info('Percent passed: %d vs. Percent required: %d' % (
- percent_passed, percent_required_to_pass))
- unittest.assertTrue(percent_passed >= percent_required_to_pass)
- return percent_passed
-
- def InitializeResultsDirectory(self):
- """Called by a test to initialize a results directory for this worker."""
- # Use the name of the test.
- test_name = inspect.stack()[1][3]
- self.results_directory = os.path.join(self.test_results_root, test_name)
- self.results_count = 0
-
- def GetNextResultsPath(self, label):
- """Returns a path for the results directory for this label.
-
- Prefixes directory returned for worker with time called i.e. 1_label,
- 2_label, etc. The directory returned is outside the chroot so if passing
- to an script that is called with enther_chroot, make sure to use
- ReinterpretPathForChroot.
- """
- self.results_count += 1
- dir = os.path.join(self.results_directory, '%s_%s' % (self.results_count,
- label))
- if not os.path.exists(dir):
- os.makedirs(dir)
-
- return dir
-
- # --- PRIVATE HELPER FUNCTIONS ---
-
- def _ParseGenerateTestReportOutput(self, output):
- """Returns the percentage of tests that passed based on output."""
- percent_passed = 0
- lines = output.split('\n')
-
- for line in lines:
- if line.startswith("Total PASS:"):
- # FORMAT: ^TOTAL PASS: num_passed/num_total (percent%)$
- percent_passed = line.split()[3].strip('()%')
- cros_lib.Info('Percent of tests passed %s' % percent_passed)
- break
-
- return int(percent_passed)
diff --git a/bin/au_test_harness/cros_au_test_harness.py b/bin/au_test_harness/cros_au_test_harness.py
deleted file mode 100755
index 10c6abcd46..0000000000
--- a/bin/au_test_harness/cros_au_test_harness.py
+++ /dev/null
@@ -1,278 +0,0 @@
-#!/usr/bin/python
-
-# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""This module runs a suite of Auto Update tests.
-
- The tests can be run on either a virtual machine or actual device depending
- on parameters given. Specific tests can be run by invoking --test_prefix.
- Verbose is useful for many of the tests if you want to see individual commands
- being run during the update process.
-"""
-
-import optparse
-import os
-import re
-import sys
-import unittest
-
-sys.path.append(os.path.join(os.path.dirname(__file__), '../lib'))
-import cros_build_lib as cros_lib
-
-import au_test
-import au_worker
-import dummy_au_worker
-import dev_server_wrapper
-import parallel_test_job
-import public_key_manager
-import update_exception
-
-def _PrepareTestSuite(options, use_dummy_worker=False):
- """Returns a prepared test suite given by the options and test class."""
- au_test.AUTest.ProcessOptions(options, use_dummy_worker)
- test_loader = unittest.TestLoader()
- test_loader.testMethodPrefix = options.test_prefix
- return test_loader.loadTestsFromTestCase(au_test.AUTest)
-
-
-def _PregenerateUpdates(options):
- """Determines all deltas that will be generated and generates them.
-
- This method effectively pre-generates the dev server cache for all tests.
-
- Args:
- options: options from parsed parser.
- Returns:
- Dictionary of Update Identifiers->Relative cache locations.
- Raises:
- update_exception.UpdateException if we fail to generate an update.
- """
- def _GenerateVMUpdate(target, src, private_key_path):
- """Generates an update using the devserver."""
- command = ['./enter_chroot.sh',
- '--',
- 'sudo',
- 'start_devserver',
- '--pregenerate_update',
- '--exit',
- ]
- # Add actual args to command.
- command.append('--image=%s' % cros_lib.ReinterpretPathForChroot(target))
- if src: command.append('--src_image=%s' %
- cros_lib.ReinterpretPathForChroot(src))
- if options.type == 'vm': command.append('--for_vm')
- if private_key_path:
- command.append('--private_key=%s' %
- cros_lib.ReinterpretPathForChroot(private_key_path))
-
- return cros_lib.RunCommandCaptureOutput(command, combine_stdout_stderr=True,
- print_cmd=True)
-
- # Use dummy class to mock out updates that would be run as part of a test.
- test_suite = _PrepareTestSuite(options, use_dummy_worker=True)
- test_result = unittest.TextTestRunner(verbosity=0).run(test_suite)
- if not test_result.wasSuccessful():
- raise update_exception.UpdateException(1,
- 'Error finding updates to generate.')
-
- cros_lib.Info('The following delta updates are required.')
- update_ids = []
- jobs = []
- args = []
- modified_images = set()
- for target, srcs in dummy_au_worker.DummyAUWorker.delta_list.items():
- modified_images.add(target)
- for src_key in srcs:
- (src, _ , key) = src_key.partition('+')
- if src: modified_images.add(src)
- # TODO(sosa): Add private key as part of caching name once devserver can
- # handle it its own cache.
- update_id = dev_server_wrapper.GenerateUpdateId(target, src, key)
- print >> sys.stderr, 'AU: %s' % update_id
- update_ids.append(update_id)
- jobs.append(_GenerateVMUpdate)
- args.append((target, src, key))
-
- # Always add the base image path. This is only useful for non-delta updates.
- modified_images.add(options.base_image)
-
- # Add public key to all images we are using.
- if options.public_key:
- cros_lib.Info('Adding public keys to images for testing.')
- for image in modified_images:
- manager = public_key_manager.PublicKeyManager(image, options.public_key)
- manager.AddKeyToImage()
- au_test.AUTest.public_key_managers.append(manager)
-
- raw_results = parallel_test_job.RunParallelJobs(options.jobs, jobs, args,
- print_status=True)
- results = []
-
- # Looking for this line in the output.
- key_line_re = re.compile('^PREGENERATED_UPDATE=([\w/.]+)')
- for result in raw_results:
- (return_code, output, _) = result
- if return_code != 0:
- cros_lib.Warning(output)
- raise update_exception.UpdateException(return_code,
- 'Failed to generate all updates.')
- else:
- for line in output.splitlines():
- match = key_line_re.search(line)
- if match:
- # Convert blah/blah/update.gz -> update/blah/blah.
- path_to_update_gz = match.group(1).rstrip()
- (path_to_update_dir, _, _) = path_to_update_gz.rpartition(
- '/update.gz')
- results.append('/'.join(['update', path_to_update_dir]))
- break
-
- # Make sure all generation of updates returned cached locations.
- if len(raw_results) != len(results):
- raise update_exception.UpdateException(
- 1, 'Insufficient number cache directories returned.')
-
- # Build the dictionary from our id's and returned cache paths.
- cache_dictionary = {}
- for index, id in enumerate(update_ids):
- cache_dictionary[id] = results[index]
-
- return cache_dictionary
-
-
-def _RunTestsInParallel(options):
- """Runs the tests given by the options in parallel."""
- threads = []
- args = []
- test_suite = _PrepareTestSuite(options)
- for test in test_suite:
- test_name = test.id()
- test_case = unittest.TestLoader().loadTestsFromName(test_name)
- threads.append(unittest.TextTestRunner(verbosity=2).run)
- args.append(test_case)
-
- results = parallel_test_job.RunParallelJobs(options.jobs, threads, args,
- print_status=False)
- for test_result in results:
- if not test_result.wasSuccessful():
- cros_lib.Die('Test harness was not successful')
-
-
-def _CleanPreviousWork(options):
- """Cleans up previous work from the devserver cache and local image cache."""
- cros_lib.Info('Cleaning up previous work.')
- # Wipe devserver cache.
- cros_lib.RunCommandCaptureOutput(
- ['sudo', 'start_devserver', '--clear_cache', '--exit', ],
- enter_chroot=True, print_cmd=False, combine_stdout_stderr=True)
-
- # Clean previous vm images if they exist.
- if options.type == 'vm':
- target_vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname(
- options.target_image)
- base_vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname(
- options.base_image)
- if os.path.exists(target_vm_image_path): os.remove(target_vm_image_path)
- if os.path.exists(base_vm_image_path): os.remove(base_vm_image_path)
-
-
-def main():
- parser = optparse.OptionParser()
- parser.add_option('-b', '--base_image',
- help='path to the base image.')
- parser.add_option('-r', '--board',
- help='board for the images.')
- parser.add_option('--clean', default=False, dest='clean', action='store_true',
- help='Clean all previous state')
- parser.add_option('--no_delta', action='store_false', default=True,
- dest='delta',
- help='Disable using delta updates.')
- parser.add_option('--no_graphics', action='store_true',
- help='Disable graphics for the vm test.')
- parser.add_option('-j', '--jobs', default=8, type=int,
- help='Number of simultaneous jobs')
- parser.add_option('--public_key', default=None,
- help='Public key to use on images and updates.')
- parser.add_option('--private_key', default=None,
- help='Private key to use on images and updates.')
- parser.add_option('-q', '--quick_test', default=False, action='store_true',
- help='Use a basic test to verify image.')
- parser.add_option('-m', '--remote',
- help='Remote address for real test.')
- parser.add_option('-t', '--target_image',
- help='path to the target image.')
- parser.add_option('--test_results_root', default=None,
- help='Root directory to store test results. Should '
- 'be defined relative to chroot root.')
- parser.add_option('--test_prefix', default='test',
- help='Only runs tests with specific prefix i.e. '
- 'testFullUpdateWipeStateful.')
- parser.add_option('-p', '--type', default='vm',
- help='type of test to run: [vm, real]. Default: vm.')
- parser.add_option('--verbose', default=True, action='store_true',
- help='Print out rather than capture output as much as '
- 'possible.')
- (options, leftover_args) = parser.parse_args()
-
- if leftover_args: parser.error('Found unsupported flags: %s' % leftover_args)
-
- assert options.target_image and os.path.exists(options.target_image), \
- 'Target image path does not exist'
- if not options.base_image:
- cros_lib.Info('Base image not specified. Using target as base image.')
- options.base_image = options.target_image
-
- if options.private_key or options.public_key:
- error_msg = ('Could not find %s key. Both private and public keys must be '
- 'specified if either is specified.')
- assert options.private_key and os.path.exists(options.private_key), \
- error_msg % 'private'
- assert options.public_key and os.path.exists(options.public_key), \
- error_msg % 'public'
-
- # Clean up previous work if requested.
- if options.clean: _CleanPreviousWork(options)
-
- # Make sure we have a log directory.
- if options.test_results_root and not os.path.exists(
- options.test_results_root):
- os.makedirs(options.test_results_root)
-
- # Pre-generate update modifies images by adding public keys to them.
- # Wrap try to make sure we clean this up before we're done.
- try:
- # Generate cache of updates to use during test harness.
- update_cache = _PregenerateUpdates(options)
- au_worker.AUWorker.SetUpdateCache(update_cache)
-
- my_server = dev_server_wrapper.DevServerWrapper(
- au_test.AUTest.test_results_root)
- my_server.start()
- try:
- if options.type == 'vm':
- _RunTestsInParallel(options)
- else:
- # TODO(sosa) - Take in a machine pool for a real test.
- # Can't run in parallel with only one remote device.
- test_suite = _PrepareTestSuite(options)
- test_result = unittest.TextTestRunner(verbosity=2).run(test_suite)
- if not test_result.wasSuccessful(): cros_lib.Die('Test harness failed.')
-
- finally:
- my_server.Stop()
-
- finally:
- # Un-modify any target images we modified. We don't need to un-modify
- # non-targets because they aren't important for archival steps.
- if options.public_key:
- cros_lib.Info('Cleaning up. Removing keys added as part of testing.')
- target_directory = os.path.dirname(options.target_image)
- for key_manager in au_test.AUTest.public_key_managers:
- if key_manager.image_path.startswith(target_directory):
- key_manager.RemoveKeyFromImage()
-
-
-if __name__ == '__main__':
- main()
diff --git a/bin/au_test_harness/cros_test_proxy.py b/bin/au_test_harness/cros_test_proxy.py
deleted file mode 100644
index 0b3728638b..0000000000
--- a/bin/au_test_harness/cros_test_proxy.py
+++ /dev/null
@@ -1,121 +0,0 @@
-# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Module containing various classes pertaining to inserting a proxy in a test.
-"""
-
-import os
-import select
-import socket
-import SocketServer
-import threading
-
-class Filter(object):
- """Base class for data filters.
-
- Pass subclass of this to CrosTestProxy which will perform whatever
- connection manipulation you prefer.
- """
-
- def setup(self):
- """This setup method is called once per connection."""
- pass
-
- def InBound(self, data):
- """This method is called once per packet of incoming data.
-
- The value returned is what is sent through the proxy. If
- None is returned, the connection will be closed.
- """
- return data
-
- def OutBound(self, data):
- """This method is called once per packet of outgoing data.
-
- The value returned is what is sent through the proxy. If
- None is returned, the connection will be closed.
- """
- return data
-
-
-class CrosTestProxy(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
- """A transparent proxy for simulating network errors"""
-
- class _Handler(SocketServer.BaseRequestHandler):
- """Proxy connection handler that passes data though a filter"""
-
- def setup(self):
- """Setup is called once for each connection proxied."""
- self.server.filter.setup()
-
- def handle(self):
- """Handles each incoming connection.
-
- Opens a new connection to the port we are proxing to, then
- passes each packet along in both directions after passing
- them through the filter object passed in.
- """
- # Open outgoing socket
- s_in = self.request
- s_out = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s_out.connect((self.server.address_out, self.server.port_out))
-
- while True:
- rlist, wlist, xlist = select.select([s_in, s_out], [], [])
-
- if s_in in rlist:
- data = s_in.recv(1024)
- data = self.server.filter.InBound(data)
- if not data: break
- try:
- # If there is any error sending data, close both connections.
- s_out.sendall(data)
- except socket.error:
- break
-
- if s_out in rlist:
- data = s_out.recv(1024)
- data = self.server.filter.OutBound(data)
- if not data: break
- try:
- # If there is any error sending data, close both connections.
- s_in.sendall(data)
- except socket.error:
- break
-
- s_in.close()
- s_out.close()
-
- def __init__(self,
- filter,
- port_in=8081,
- address_out='127.0.0.1', port_out=8080):
- """Configures the proxy object.
-
- Args:
- filter: An instance of a subclass of Filter.
- port_in: Port on which to listen for incoming connections.
- address_out: Address to which outgoing connections will go.
- address_port: Port to which outgoing connections will go.
- """
- self.port_in = port_in
- self.address_out = address_out
- self.port_out = port_out
- self.filter = filter
-
- try:
- SocketServer.TCPServer.__init__(self,
- ('', port_in),
- self._Handler)
- except socket.error:
- os.system('sudo netstat -l --tcp -n -p')
- raise
-
- def serve_forever_in_thread(self):
- """Helper method to start the server in a new background thread."""
- server_thread = threading.Thread(target=self.serve_forever)
- server_thread.setDaemon(True)
- server_thread.start()
-
- return server_thread
diff --git a/bin/au_test_harness/dev_server_wrapper.py b/bin/au_test_harness/dev_server_wrapper.py
deleted file mode 100644
index 3ddc1b8a34..0000000000
--- a/bin/au_test_harness/dev_server_wrapper.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Module containing methods and classes to interact with a devserver instance.
-"""
-
-import os
-import threading
-
-import cros_build_lib as cros_lib
-
-def GenerateUpdateId(target, src, key):
- """Returns a simple representation id of target and src paths."""
- update_id = target
- if src: update_id = '->'.join([src, update_id])
- if key: update_id = '+'.join([update_id, key])
- return update_id
-
-class DevServerWrapper(threading.Thread):
- """A Simple wrapper around a dev server instance."""
-
- def __init__(self, test_root):
- self.proc = None
- self.test_root = test_root
- threading.Thread.__init__(self)
-
- def run(self):
- # Kill previous running instance of devserver if it exists.
- cros_lib.RunCommand(['sudo', 'pkill', '-f', 'devserver.py'], error_ok=True,
- print_cmd=False)
- cros_lib.RunCommand(['sudo',
- 'start_devserver',
- '--archive_dir=./static',
- '--client_prefix=ChromeOSUpdateEngine',
- '--production',
- ], enter_chroot=True, print_cmd=False,
- log_to_file=os.path.join(self.test_root,
- 'dev_server.log'))
-
- def Stop(self):
- """Kills the devserver instance."""
- cros_lib.RunCommand(['sudo', 'pkill', '-f', 'devserver.py'], error_ok=True,
- print_cmd=False)
-
- @classmethod
- def GetDevServerURL(cls, port, sub_dir):
- """Returns the dev server url for a given port and sub directory."""
- ip_addr = cros_lib.GetIPAddress()
- if not port: port = 8080
- url = 'http://%(ip)s:%(port)s/%(dir)s' % {'ip': ip_addr,
- 'port': str(port),
- 'dir': sub_dir}
- return url
diff --git a/bin/au_test_harness/dummy_au_worker.py b/bin/au_test_harness/dummy_au_worker.py
deleted file mode 100644
index fa1bee042a..0000000000
--- a/bin/au_test_harness/dummy_au_worker.py
+++ /dev/null
@@ -1,45 +0,0 @@
-# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Module containing a fake au worker class."""
-
-import unittest
-
-import au_worker
-
-class DummyAUWorker(au_worker.AUWorker):
- """AU worker that emulates work for an au_worker without actually doing work.
-
- Collects different updates that would be generated that can be obtained
- from the class object delta_list.
- """
-
- # Class variable that stores the list of payloads that would be needed.
- delta_list = {}
-
- def __init__(self, options, test_results_root):
- au_worker.AUWorker.__init__(self, options, test_results_root)
- self.au_type = options.type
-
- def PrepareBase(self, image_path):
- """Copy how the actual worker would prepare the base image."""
- if self.au_type == 'vm':
- self.PrepareVMBase(image_path)
- else:
- self.PrepareRealBase(image_path)
-
- def UpdateImage(self, image_path, src_image_path='', stateful_change='old',
- proxy_port=None, private_key_path=None):
- """Emulate Update and record the update payload in delta_list."""
- if self.au_type == 'vm' and src_image_path and self._first_update:
- src_image_path = self.vm_image_path
- self._first_update = False
-
- # Generate a value that combines delta with private key path.
- val = src_image_path
- if private_key_path: val = '%s+%s' % (val, private_key_path)
- if not self.delta_list.has_key(image_path):
- self.delta_list[image_path] = set([val])
- else:
- self.delta_list[image_path].add(val)
diff --git a/bin/au_test_harness/parallel_test_job.py b/bin/au_test_harness/parallel_test_job.py
deleted file mode 100644
index 096ec17043..0000000000
--- a/bin/au_test_harness/parallel_test_job.py
+++ /dev/null
@@ -1,109 +0,0 @@
-# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Module containing methods/classes related to running parallel test jobs."""
-
-import sys
-import threading
-import time
-
-import cros_build_lib as cros_lib
-
-class ParallelJob(threading.Thread):
- """Small wrapper for threading. Thread that releases a semaphores on exit."""
-
- def __init__(self, starting_semaphore, ending_semaphore, target, args):
- """Initializes an instance of a job.
-
- Args:
- starting_semaphore: Semaphore used by caller to wait on such that
- there isn't more than a certain number of threads running. Should
- be initialized to a value for the number of threads wanting to be run
- at a time.
- ending_semaphore: Semaphore is released every time a job ends. Should be
- initialized to 0 before starting first job. Should be acquired once for
- each job. Threading.Thread.join() has a bug where if the run function
- terminates too quickly join() will hang forever.
- target: The func to run.
- args: Args to pass to the fun.
- """
- threading.Thread.__init__(self, target=target, args=args)
- self._target = target
- self._args = args
- self._starting_semaphore = starting_semaphore
- self._ending_semaphore = ending_semaphore
- self._output = None
- self._completed = False
-
- def run(self):
- """Thread override. Runs the method specified and sets output."""
- try:
- self._output = self._target(*self._args)
- finally:
- # Our own clean up.
- self._Cleanup()
- self._completed = True
- # From threading.py to avoid a refcycle.
- del self._target, self._args
-
- def GetOutput(self):
- """Returns the output of the method run."""
- assert self._completed, 'GetOutput called before thread was run.'
- return self._output
-
- def _Cleanup(self):
- """Releases semaphores for a waiting caller."""
- self._starting_semaphore.release()
- self._ending_semaphore.release()
-
- def __str__(self):
- return '%s(%s)' % (self._target, self._args)
-
-
-def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args,
- print_status):
- """Runs set number of specified jobs in parallel.
-
- Args:
- number_of_simultaneous_jobs: Max number of threads to be run in parallel.
- jobs: Array of methods to run.
- jobs_args: Array of args associated with method calls.
- print_status: True if you'd like this to print out .'s as it runs jobs.
- Returns:
- Returns an array of results corresponding to each thread.
- """
- def _TwoTupleize(x, y):
- return (x, y)
-
- threads = []
- job_start_semaphore = threading.Semaphore(number_of_simultaneous_jobs)
- join_semaphore = threading.Semaphore(0)
- assert len(jobs) == len(jobs_args), 'Length of args array is wrong.'
-
- # Create the parallel jobs.
- for job, args in map(_TwoTupleize, jobs, jobs_args):
- thread = ParallelJob(job_start_semaphore, join_semaphore, target=job,
- args=args)
- threads.append(thread)
-
- # Cache sudo access.
- cros_lib.RunCommand(['sudo', 'echo', 'Caching sudo credentials'],
- print_cmd=False, redirect_stdout=True,
- redirect_stderr=True)
-
- # We use a semaphore to ensure we don't run more jobs than required.
- # After each thread finishes, it releases (increments semaphore).
- # Acquire blocks of num jobs reached and continues when a thread finishes.
- for next_thread in threads:
- job_start_semaphore.acquire(blocking=True)
- next_thread.start()
-
- # Wait on the rest of the threads to finish.
- for thread in threads:
- while not join_semaphore.acquire(blocking=False):
- time.sleep(5)
- if print_status:
- print >> sys.stderr, '.',
-
- return [thread.GetOutput() for thread in threads]
diff --git a/bin/au_test_harness/public_key_manager.py b/bin/au_test_harness/public_key_manager.py
deleted file mode 100644
index 355e7445fd..0000000000
--- a/bin/au_test_harness/public_key_manager.py
+++ /dev/null
@@ -1,91 +0,0 @@
-# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""This module manages interactions between an image and a public key."""
-
-import os
-import tempfile
-
-import cros_build_lib as cros_lib
-
-class PublicKeyManager(object):
- """Class wrapping interactions with a public key on an image."""
- TARGET_KEY_PATH = 'usr/share/update_engine/update-payload-key.pub.pem'
-
- def __init__(self, image_path, key_path):
- """Initializes a manager with image_path and key_path we plan to insert."""
- self.image_path = image_path
- self.key_path = key_path
- self._rootfs_dir = tempfile.mkdtemp(suffix='rootfs', prefix='tmp')
- self._stateful_dir = tempfile.mkdtemp(suffix='stateful', prefix='tmp')
-
- # Gather some extra information about the image.
- try:
- cros_lib.MountImage(image_path, self._rootfs_dir, self._stateful_dir,
- read_only=True)
- self._full_target_key_path = os.path.join(
- self._rootfs_dir, PublicKeyManager.TARGET_KEY_PATH)
- self._is_key_new = True
- if os.path.exists(self._full_target_key_path):
- diff_output = cros_lib.RunCommand(['diff',
- self.key_path,
- self._full_target_key_path],
- print_cmd=False, redirect_stdout=True,
- redirect_stderr=True, error_ok=True)
-
- if not diff_output: self._is_key_new = False
-
- finally:
- cros_lib.UnmountImage(self._rootfs_dir, self._stateful_dir)
-
- def __del__(self):
- """Remove our temporary directories we created in init."""
- os.rmdir(self._rootfs_dir)
- os.rmdir(self._stateful_dir)
-
- def AddKeyToImage(self):
- """Adds the key specified in init to the image."""
- if not self._is_key_new:
- cros_lib.Info('Public key already on image %s. No work to do.' %
- self.image_path)
- return
-
- cros_lib.Info('Copying %s into %s' % (self.key_path, self.image_path))
- try:
- cros_lib.MountImage(self.image_path, self._rootfs_dir, self._stateful_dir,
- read_only=False)
-
- dir_path = os.path.dirname(self._full_target_key_path)
- cros_lib.RunCommand(['sudo', 'mkdir', '--parents', dir_path],
- print_cmd=False)
- cros_lib.RunCommand(['sudo', 'cp', '--force', '-p', self.key_path,
- self._full_target_key_path], print_cmd=False)
- finally:
- cros_lib.UnmountImage(self._rootfs_dir, self._stateful_dir)
- self._MakeImageBootable()
-
- def RemoveKeyFromImage(self):
- """Removes the key specified in init from the image."""
- cros_lib.Info('Removing public key from image %s.' % self.image_path)
- try:
- cros_lib.MountImage(self.image_path, self._rootfs_dir, self._stateful_dir,
- read_only=False)
- cros_lib.RunCommand(['sudo', 'rm', '--force', self._full_target_key_path],
- print_cmd=False)
- finally:
- cros_lib.UnmountImage(self._rootfs_dir, self._stateful_dir)
- self._MakeImageBootable()
-
- def _MakeImageBootable(self):
- """Makes the image bootable. Note, it is only useful for non-vm images."""
- image = os.path.basename(self.image_path)
- if 'qemu' in image:
- return
-
- from_dir = os.path.dirname(self.image_path)
- cros_lib.RunCommand(['bin/cros_make_image_bootable',
- cros_lib.ReinterpretPathForChroot(from_dir),
- image], print_cmd=False, redirect_stdout=True,
- redirect_stderr=True, enter_chroot=True,
- cwd=cros_lib.CROSUTILS_DIRECTORY)
diff --git a/bin/au_test_harness/real_au_worker.py b/bin/au_test_harness/real_au_worker.py
deleted file mode 100644
index 43b8c2e62c..0000000000
--- a/bin/au_test_harness/real_au_worker.py
+++ /dev/null
@@ -1,63 +0,0 @@
-# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Module containing class that implements an au_worker for a test device."""
-
-import unittest
-
-import cros_build_lib as cros_lib
-
-import au_worker
-
-class RealAUWorker(au_worker.AUWorker):
- """Test harness for updating real images."""
-
- def __init__(self, options, test_results_root):
- """Processes non-vm-specific options."""
- au_worker.AUWorker.__init__(self, options, test_results_root)
- self.remote = options.remote
- if not self.remote: cros_lib.Die('We require a remote address for tests.')
-
- def PrepareBase(self, image_path):
- """Auto-update to base image to prepare for test."""
- self.PrepareRealBase(image_path)
-
- def UpdateImage(self, image_path, src_image_path='', stateful_change='old',
- proxy_port=None, private_key_path=None):
- """Updates a remote image using image_to_live.sh."""
- stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
- cmd = ['%s/image_to_live.sh' % self.crosutils,
- '--remote=%s' % self.remote,
- stateful_change_flag,
- '--verify',
- ]
- self.AppendUpdateFlags(cmd, image_path, src_image_path, proxy_port,
- private_key_path)
- self.RunUpdateCmd(cmd)
-
- def UpdateUsingPayload(self, update_path, stateful_change='old',
- proxy_port=None):
- """Updates a remote image using image_to_live.sh."""
- stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
- cmd = ['%s/image_to_live.sh' % self.crosutils,
- '--payload=%s' % update_path,
- '--remote=%s' % self.remote,
- stateful_change_flag,
- '--verify',
- ]
- if proxy_port: cmd.append('--proxy_port=%s' % proxy_port)
- self.RunUpdateCmd(cmd)
-
- def VerifyImage(self, unittest, percent_required_to_pass=100):
- """Verifies an image using run_remote_tests.sh with verification suite."""
- test_directory = self.GetNextResultsPath('verify')
- output = cros_lib.RunCommand(
- ['%s/run_remote_tests.sh' % self.crosutils,
- '--remote=%s' % self.remote,
- '--results_dir_root=%s' % test_directory,
- self.verify_suite,
- ], error_ok=True, enter_chroot=False, redirect_stdout=True)
- return self.AssertEnoughTestsPassed(unittest, output,
- percent_required_to_pass)
-
diff --git a/bin/au_test_harness/update_exception.py b/bin/au_test_harness/update_exception.py
deleted file mode 100644
index 4c1e73ec64..0000000000
--- a/bin/au_test_harness/update_exception.py
+++ /dev/null
@@ -1,11 +0,0 @@
-# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Module containing update exceptions."""
-
-class UpdateException(Exception):
- """Exception thrown when _UpdateImage or _UpdateUsingPayload fail"""
- def __init__(self, code, stdout):
- self.code = code
- self.stdout = stdout
diff --git a/bin/au_test_harness/vm_au_worker.py b/bin/au_test_harness/vm_au_worker.py
deleted file mode 100644
index 9788fddcb2..0000000000
--- a/bin/au_test_harness/vm_au_worker.py
+++ /dev/null
@@ -1,120 +0,0 @@
-# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Module containing implementation of an au_worker for virtual machines."""
-
-import os
-import threading
-import unittest
-
-import cros_build_lib as cros_lib
-
-import au_worker
-
-
-class VMAUWorker(au_worker.AUWorker):
- """Test harness for updating virtual machines."""
-
- # Class variables used to acquire individual VM variables per test.
- _vm_lock = threading.Lock()
- _next_port = 9222
-
- def __init__(self, options, test_results_root):
- """Processes vm-specific options."""
- au_worker.AUWorker.__init__(self, options, test_results_root)
- self.graphics_flag = ''
- if options.no_graphics: self.graphics_flag = '--no_graphics'
- if not self.board: cros_lib.Die('Need board to convert base image to vm.')
-
- self._AcquireUniquePortAndPidFile()
- self._KillExistingVM(self._kvm_pid_file)
-
- def _KillExistingVM(self, pid_file):
- """Kills an existing VM specified by the pid_file."""
- if os.path.exists(pid_file):
- cros_lib.Warning('Existing %s found. Deleting and killing process' %
- pid_file)
- cros_lib.RunCommand(['./cros_stop_vm', '--kvm_pid=%s' % pid_file],
- cwd=self.crosutilsbin)
-
- assert not os.path.exists(pid_file)
-
- def _AcquireUniquePortAndPidFile(self):
- """Acquires unique ssh port and pid file for VM."""
- with VMAUWorker._vm_lock:
- self._ssh_port = VMAUWorker._next_port
- self._kvm_pid_file = '/tmp/kvm.%d' % self._ssh_port
- VMAUWorker._next_port += 1
-
- def CleanUp(self):
- """Stop the vm after a test."""
- self._KillExistingVM(self._kvm_pid_file)
-
- def PrepareBase(self, image_path):
- """Creates an update-able VM based on base image."""
- self.PrepareVMBase(image_path)
-
- def UpdateImage(self, image_path, src_image_path='', stateful_change='old',
- proxy_port='', private_key_path=None):
- """Updates VM image with image_path."""
- log_directory = self.GetNextResultsPath('update')
- stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
- if src_image_path and self._first_update:
- src_image_path = self.vm_image_path
- self._first_update = False
-
- cmd = ['%s/cros_run_vm_update' % self.crosutilsbin,
- '--vm_image_path=%s' % self.vm_image_path,
- '--update_log=%s' % os.path.join(log_directory, 'update_engine.log'),
- '--snapshot',
- self.graphics_flag,
- '--persist',
- '--kvm_pid=%s' % self._kvm_pid_file,
- '--ssh_port=%s' % self._ssh_port,
- stateful_change_flag,
- ]
- self.AppendUpdateFlags(cmd, image_path, src_image_path, proxy_port,
- private_key_path)
- self.RunUpdateCmd(cmd, log_directory)
-
- def UpdateUsingPayload(self, update_path, stateful_change='old',
- proxy_port=None):
- """Updates a vm image using cros_run_vm_update."""
- log_directory = self.GetNextResultsPath('update')
- stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
- cmd = ['%s/cros_run_vm_update' % self.crosutilsbin,
- '--payload=%s' % update_path,
- '--vm_image_path=%s' % self.vm_image_path,
- '--update_log=%s' % os.path.join(log_directory, 'update_engine.log'),
- '--snapshot',
- self.graphics_flag,
- '--persist',
- '--kvm_pid=%s' % self._kvm_pid_file,
- '--ssh_port=%s' % self._ssh_port,
- stateful_change_flag,
- ]
- if proxy_port: cmd.append('--proxy_port=%s' % proxy_port)
- self.RunUpdateCmd(cmd, log_directory)
-
- def VerifyImage(self, unittest, percent_required_to_pass=100):
- """Runs vm smoke suite to verify image."""
- log_directory = self.GetNextResultsPath('verify')
- (_, _, log_directory_in_chroot) = log_directory.rpartition('chroot')
- # image_to_live already verifies lsb-release matching. This is just
- # for additional steps.
- commandWithArgs = ['%s/cros_run_vm_test' % self.crosutilsbin,
- '--image_path=%s' % self.vm_image_path,
- '--snapshot',
- '--persist',
- '--kvm_pid=%s' % self._kvm_pid_file,
- '--ssh_port=%s' % self._ssh_port,
- '--results_dir_root=%s' % log_directory_in_chroot,
- self.verify_suite,
- ]
- if self.graphics_flag: commandWithArgs.append(self.graphics_flag)
- output = cros_lib.RunCommand(commandWithArgs, error_ok=True,
- enter_chroot=False, redirect_stdout=True)
- return self.AssertEnoughTestsPassed(unittest, output,
- percent_required_to_pass)
-
diff --git a/bin/cros_au_test_harness b/bin/cros_au_test_harness
index 601ca3047f..daa10b912c 120000
--- a/bin/cros_au_test_harness
+++ b/bin/cros_au_test_harness
@@ -1 +1 @@
-au_test_harness/cros_au_test_harness.py
\ No newline at end of file
+../../platform/crostestutils/au_test_harness/cros_au_test_harness.py
\ No newline at end of file
diff --git a/bin/ctest b/bin/ctest
index 2a65c0d330..cb76d89795 120000
--- a/bin/ctest
+++ b/bin/ctest
@@ -1 +1 @@
-ctest.py
\ No newline at end of file
+../../platform/crostestutils/ctest/ctest.py
\ No newline at end of file
diff --git a/bin/ctest.py b/bin/ctest.py
deleted file mode 100755
index 3e424b7046..0000000000
--- a/bin/ctest.py
+++ /dev/null
@@ -1,320 +0,0 @@
-#!/usr/bin/python
-#
-# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Wrapper for tests that are run on builders."""
-
-import fileinput
-import optparse
-import os
-import re
-import sys
-import traceback
-import urllib
-import HTMLParser
-
-sys.path.append(os.path.join(os.path.dirname(__file__), '../lib'))
-from cros_build_lib import Info
-from cros_build_lib import ReinterpretPathForChroot
-from cros_build_lib import RunCommand
-from cros_build_lib import Warning
-
-_IMAGE_TO_EXTRACT = 'chromiumos_test_image.bin'
-_NEW_STYLE_VERSION = '0.9.131.0'
-
-class HTMLDirectoryParser(HTMLParser.HTMLParser):
- """HTMLParser for parsing the default apache file index."""
-
- def __init__(self, regex):
- HTMLParser.HTMLParser.__init__(self)
- self.regex_object = re.compile(regex)
- self.link_list = []
-
- def handle_starttag(self, tag, attrs):
- """Overrides from HTMLParser and is called at the start of every tag.
-
- This implementation grabs attributes from links (i.e.
- and adds the target from href= if the matches the
- regex given at the start.
- """
- if not tag.lower() == 'a':
- return
-
- for attr in attrs:
- if not attr[0].lower() == 'href':
- continue
-
- match = self.regex_object.match(attr[1])
- if match:
- self.link_list.append(match.group(0).rstrip('/'))
-
-
-def ModifyBootDesc(download_folder, redirect_file=None):
- """Modifies the boot description of a downloaded image to work with path.
-
- The default boot.desc from another system is specific to the directory
- it was created in. This modifies the boot description to be compatiable
- with the download folder.
-
- Args:
- download_folder: Absoulte path to the download folder.
- redirect_file: For testing. Where to copy new boot desc.
- """
- boot_desc_path = os.path.join(download_folder, 'boot.desc')
- in_chroot_folder = ReinterpretPathForChroot(download_folder)
-
- for line in fileinput.input(boot_desc_path, inplace=1):
- # Has to be done here to get changes to sys.stdout from fileinput.input.
- if not redirect_file:
- redirect_file = sys.stdout
- split_line = line.split('=')
- if len(split_line) > 1:
- var_part = split_line[0]
- potential_path = split_line[1].replace('"', '').strip()
-
- if potential_path.startswith('/home') and not 'output_dir' in var_part:
- new_path = os.path.join(in_chroot_folder,
- os.path.basename(potential_path))
- new_line = '%s="%s"' % (var_part, new_path)
- Info('Replacing line %s with %s' % (line, new_line))
- redirect_file.write('%s\n' % new_line)
- continue
- elif 'output_dir' in var_part:
- # Special case for output_dir.
- new_line = '%s="%s"' % (var_part, in_chroot_folder)
- Info('Replacing line %s with %s' % (line, new_line))
- redirect_file.write('%s\n' % new_line)
- continue
-
- # Line does not need to be modified.
- redirect_file.write(line)
-
- fileinput.close()
-
-
-def _GreaterVersion(version_a, version_b):
- """Returns the higher version number of two version number strings."""
- version_regex = re.compile('.*(\d+)\.(\d+)\.(\d+)\.(\d+).*')
- version_a_tokens = version_regex.match(version_a).groups()
- version_b_tokens = version_regex.match(version_b).groups()
- for i in range(4):
- (a, b) = (int(version_a_tokens[i]), int(version_b_tokens[i]))
- if a != b:
- if a > b: return version_a
- return version_b
- return version_a
-
-
-def GetLatestLinkFromPage(url, regex):
- """Returns the latest link from the given url that matches regex.
-
- Args:
- url: Url to download and parse.
- regex: Regular expression to match links against.
- """
- url_file = urllib.urlopen(url)
- url_html = url_file.read()
-
- url_file.close()
-
- # Parses links with versions embedded.
- url_parser = HTMLDirectoryParser(regex=regex)
- url_parser.feed(url_html)
- return reduce(_GreaterVersion, url_parser.link_list)
-
-
-def GetNewestLinkFromZipBase(board, channel, zip_server_base):
- """Returns the url to the newest image from the zip server.
-
- Args:
- board: board for the image zip.
- channel: channel for the image zip.
- zip_server_base: base url for zipped images.
- """
- zip_base = os.path.join(zip_server_base, channel, board)
- latest_version = GetLatestLinkFromPage(zip_base, '\d+\.\d+\.\d+\.\d+/')
-
- zip_dir = os.path.join(zip_base, latest_version)
- zip_name = GetLatestLinkFromPage(zip_dir,
- 'ChromeOS-\d+\.\d+\.\d+\.\d+-.*\.zip')
- return os.path.join(zip_dir, zip_name)
-
-
-def GetLatestZipUrl(board, channel, latest_url_base, zip_server_base):
- """Returns the url of the latest image zip for the given arguments.
-
- Args:
- board: board for the image zip.
- channel: channel for the image zip.
- latest_url_base: base url for latest links.
- zip_server_base: base url for zipped images.
- """
- if latest_url_base:
- try:
- # Grab the latest image info.
- latest_file_url = os.path.join(latest_url_base, channel,
- 'LATEST-%s' % board)
- latest_image_file = urllib.urlopen(latest_file_url)
- latest_image = latest_image_file.read()
- latest_image_file.close()
- # Convert bin.gz into zip.
- latest_image = latest_image.replace('.bin.gz', '.zip')
- version = latest_image.split('-')[1]
- zip_base = os.path.join(zip_server_base, channel, board)
- return os.path.join(zip_base, version, latest_image)
- except IOError:
- Warning(('Could not use latest link provided, defaulting to parsing'
- ' latest from zip url base.'))
-
- try:
- return GetNewestLinkFromZipBase(board, channel, zip_server_base)
- except:
- Warning('Failed to get url from standard zip base. Trying rc.')
- return GetNewestLinkFromZipBase(board + '-rc', channel, zip_server_base)
-
-
-def GrabZipAndExtractImage(zip_url, download_folder, image_name) :
- """Downloads the zip and extracts the given image.
-
- Doesn't re-download if matching version found already in download folder.
- Args:
- zip_url - url for the image.
- download_folder - download folder to store zip file and extracted images.
- image_name - name of the image to extract from the zip file.
- """
- zip_path = os.path.join(download_folder, 'image.zip')
- versioned_url_path = os.path.join(download_folder, 'download_url')
- found_cached = False
-
- if os.path.exists(versioned_url_path):
- fh = open(versioned_url_path)
- version_url = fh.read()
- fh.close()
-
- if version_url == zip_url and os.path.exists(os.path.join(download_folder,
- image_name)):
- Info('Using cached %s' % image_name)
- found_cached = True
-
- if not found_cached:
- Info('Downloading %s' % zip_url)
- RunCommand(['rm', '-rf', download_folder], print_cmd=False)
- os.mkdir(download_folder)
- urllib.urlretrieve(zip_url, zip_path)
-
- # Using unzip because python implemented unzip in native python so
- # extraction is really slow.
- Info('Unzipping image %s' % image_name)
- RunCommand(['unzip', '-d', download_folder, zip_path],
- print_cmd=False, error_message='Failed to download %s' % zip_url)
-
- ModifyBootDesc(download_folder)
-
- # Put url in version file so we don't have to do this every time.
- fh = open(versioned_url_path, 'w+')
- fh.write(zip_url)
- fh.close()
-
- version = zip_url.split('/')[-2]
- if not _GreaterVersion(version, _NEW_STYLE_VERSION) == version:
- # If the version isn't ready for new style, touch file to use old style.
- old_style_touch_path = os.path.join(download_folder, '.use_e1000')
- fh = open(old_style_touch_path, 'w+')
- fh.close()
-
-
-def RunAUTestHarness(board, channel, latest_url_base, zip_server_base,
- no_graphics, type, remote, clean, test_results_root):
- """Runs the auto update test harness.
-
- The auto update test harness encapsulates testing the auto-update mechanism
- for the latest image against the latest official image from the channel. This
- also tests images with suite_Smoke (built-in as part of its verification
- process).
-
- Args:
- board: the board for the latest image.
- channel: the channel to run the au test harness against.
- latest_url_base: base url for getting latest links.
- zip_server_base: base url for zipped images.
- no_graphics: boolean - If True, disable graphics during vm test.
- type: which test harness to run. Possible values: real, vm.
- remote: ip address for real test harness run.
- clean: Clean the state of test harness before running.
- test_results_root: Root directory to store au_test_harness results.
- """
- crosutils_root = os.path.join(os.path.dirname(__file__), '..')
- download_folder = os.path.abspath('latest_download')
- zip_url = GetLatestZipUrl(board, channel, latest_url_base, zip_server_base)
- GrabZipAndExtractImage(zip_url, download_folder, _IMAGE_TO_EXTRACT)
-
- # Tests go here.
- latest_image = RunCommand(['./get_latest_image.sh', '--board=%s' % board],
- cwd=crosutils_root, redirect_stdout=True,
- print_cmd=True).strip()
-
- update_engine_path = os.path.join(crosutils_root, '..', 'platform',
- 'update_engine')
-
- cmd = ['bin/cros_au_test_harness',
- '--base_image=%s' % os.path.join(download_folder,
- _IMAGE_TO_EXTRACT),
- '--target_image=%s' % os.path.join(latest_image,
- _IMAGE_TO_EXTRACT),
- '--board=%s' % board,
- '--type=%s' % type,
- '--remote=%s' % remote,
- '--private_key=%s' % os.path.join(update_engine_path,
- 'unittest_key.pem'),
- '--public_key=%s' % os.path.join(update_engine_path,
- 'unittest_key.pub.pem'),
- ]
- if test_results_root: cmd.append('--test_results_root=%s' % test_results_root)
- if no_graphics: cmd.append('--no_graphics')
- if clean: cmd.append('--clean')
-
- RunCommand(cmd, cwd=crosutils_root)
-
-
-def main():
- parser = optparse.OptionParser()
- parser.add_option('-b', '--board',
- help='board for the image to compare against.')
- parser.add_option('-c', '--channel',
- help='channel for the image to compare against.')
- parser.add_option('--cache', default=False, action='store_true',
- help='Cache payloads')
- parser.add_option('-l', '--latestbase',
- help='Base url for latest links.')
- parser.add_option('-z', '--zipbase',
- help='Base url for hosted images.')
- parser.add_option('--no_graphics', action='store_true', default=False,
- help='Disable graphics for the vm test.')
- parser.add_option('--test_results_root', default=None,
- help='Root directory to store test results. Should '
- 'be defined relative to chroot root.')
- parser.add_option('--type', default='vm',
- help='type of test to run: [vm, real]. Default: vm.')
- parser.add_option('--remote', default='0.0.0.0',
- help='For real tests, ip address of the target machine.')
-
- # Set the usage to include flags.
- parser.set_usage(parser.format_help())
- (options, args) = parser.parse_args()
-
- if args: parser.error('Extra args found %s.' % args)
- if not options.board: parser.error('Need board for image to compare against.')
- if not options.channel: parser.error('Need channel e.g. dev-channel.')
- if not options.zipbase: parser.error('Need zip url base to get images.')
-
- RunAUTestHarness(options.board, options.channel, options.latestbase,
- options.zipbase, options.no_graphics, options.type,
- options.remote, not options.cache,
- options.test_results_root)
-
-
-if __name__ == '__main__':
- main()
-
diff --git a/bin/ctest_unittest.py b/bin/ctest_unittest.py
deleted file mode 100755
index cc00e329ce..0000000000
--- a/bin/ctest_unittest.py
+++ /dev/null
@@ -1,228 +0,0 @@
-#!/usr/bin/python
-#
-# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Unit tests for ctest."""
-
-import mox
-import os
-import unittest
-import urllib
-
-import ctest
-
-_TEST_BOOT_DESC = """
- --arch="x86"
- --output_dir="/home/chrome-bot/0.8.70.5-a1"
- --espfs_mountpoint="/home/chrome-bot/0.8.70.5-a1/esp"
- --enable_rootfs_verification
-"""
-
-class CrosTestTest(mox.MoxTestBase):
- """Test class for CTest."""
-
- def setUp(self):
- mox.MoxTestBase.setUp(self)
- self.board = 'test-board'
- self.channel = 'test-channel'
- self.version = '1.2.3.4.5'
- self.revision = '7ghfa9999-12345'
- self.image_name = 'TestOS-%s-%s' % (self.version, self.revision)
- self.download_folder = 'test_folder'
- self.latestbase = 'http://test-latest/TestOS'
- self.zipbase = 'http://test-zips/archive/TestOS'
- self.image_url = '%s/%s/%s/%s/%s.zip' % (self.zipbase, self.channel,
- self.board, self.version,
- self.image_name)
- self.test_regex = 'ChromeOS-\d+\.\d+\.\d+\.\d+-.*\.zip'
-
- def testModifyBootDesc(self):
- """Tests to make sure we correctly modify a boot desc."""
- in_chroot_path = ctest.ReinterpretPathForChroot(os.path.abspath(
- self.download_folder))
- self.mox.StubOutWithMock(__builtins__, 'open')
- self.mox.StubOutWithMock(ctest.fileinput, 'input')
- m_file = self.mox.CreateMock(file)
-
- mock_file = _TEST_BOOT_DESC.splitlines(True)
- ctest.fileinput.input('%s/%s' % (os.path.abspath(self.download_folder),
- 'boot.desc'),
- inplace=1).AndReturn(mock_file)
-
- m_file.write('\n')
- m_file.write(' --arch="x86"\n')
- m_file.write(' --output_dir="%s"\n' % in_chroot_path)
- m_file.write(' --espfs_mountpoint="%s/%s"\n' % (in_chroot_path, 'esp'))
- m_file.write(' --enable_rootfs_verification\n')
-
- self.mox.ReplayAll()
- ctest.ModifyBootDesc(os.path.abspath(self.download_folder), m_file)
- self.mox.VerifyAll()
-
-
- def testGetLatestZipUrl(self):
- """Test case that tests GetLatestZipUrl with test urls."""
- self.mox.StubOutWithMock(urllib, 'urlopen')
- m_file = self.mox.CreateMock(file)
-
- urllib.urlopen('%s/%s/LATEST-%s' % (self.latestbase, self.channel,
- self.board)).AndReturn(m_file)
- m_file.read().AndReturn('%s.bin.gz' % self.image_name)
- m_file.close()
-
- self.mox.ReplayAll()
- self.assertEquals(ctest.GetLatestZipUrl(self.board, self.channel,
- self.latestbase, self.zipbase),
- self.image_url)
- self.mox.VerifyAll()
-
- def testGetLatestZipFromBadUrl(self):
- """Tests whether GetLatestZipUrl returns correct url given bad link."""
- self.mox.StubOutWithMock(urllib, 'urlopen')
- self.mox.StubOutWithMock(ctest, 'GetNewestLinkFromZipBase')
- m_file = self.mox.CreateMock(file)
-
- urllib.urlopen('%s/%s/LATEST-%s' % (self.latestbase, self.channel,
- self.board)).AndRaise(IOError('Cannot open url.'))
- ctest.GetNewestLinkFromZipBase(self.board, self.channel,
- self.zipbase).AndReturn(self.image_url)
-
- self.mox.ReplayAll()
- self.assertEquals(ctest.GetLatestZipUrl(self.board, self.channel,
- self.latestbase, self.zipbase),
- self.image_url)
- self.mox.VerifyAll()
-
- def testGrabZipAndExtractImageUseCached(self):
- """Test case where cache holds our image."""
- self.mox.StubOutWithMock(os.path, 'exists')
- self.mox.StubOutWithMock(__builtins__, 'open')
- m_file = self.mox.CreateMock(file)
-
- os.path.exists('%s/%s' % (
- self.download_folder, 'download_url')).AndReturn(True)
-
- open('%s/%s' % (self.download_folder, 'download_url')).AndReturn(m_file)
- m_file.read().AndReturn(self.image_url)
- m_file.close()
-
- os.path.exists('%s/%s' % (
- self.download_folder, ctest._IMAGE_TO_EXTRACT)).AndReturn(True)
-
- self.mox.ReplayAll()
- ctest.GrabZipAndExtractImage(self.image_url, self.download_folder,
- ctest._IMAGE_TO_EXTRACT)
- self.mox.VerifyAll()
-
- def CommonDownloadAndExtractImage(self):
- """Common code to mock downloading image, unzipping it and setting url."""
- zip_path = os.path.join(self.download_folder, 'image.zip')
- m_file = self.mox.CreateMock(file)
-
- ctest.RunCommand(['rm', '-rf', self.download_folder], print_cmd=False)
- os.mkdir(self.download_folder)
- urllib.urlretrieve(self.image_url, zip_path)
- ctest.RunCommand(['unzip', '-d', self.download_folder, zip_path],
- print_cmd=False, error_message=mox.IgnoreArg())
-
- ctest.ModifyBootDesc(self.download_folder)
-
- open('%s/%s' % (self.download_folder, 'download_url'),
- 'w+').AndReturn(m_file)
- m_file.write(self.image_url)
- m_file.close()
-
- self.mox.ReplayAll()
- ctest.GrabZipAndExtractImage(self.image_url, self.download_folder,
- ctest._IMAGE_TO_EXTRACT)
- self.mox.VerifyAll()
-
- def testGrabZipAndExtractImageNoCache(self):
- """Test case where download_url doesn't exist."""
- self.mox.StubOutWithMock(os.path, 'exists')
- self.mox.StubOutWithMock(os, 'mkdir')
- self.mox.StubOutWithMock(__builtins__, 'open')
- self.mox.StubOutWithMock(ctest, 'RunCommand')
- self.mox.StubOutWithMock(urllib, 'urlretrieve')
- self.mox.StubOutWithMock(ctest, 'ModifyBootDesc')
-
- m_file = self.mox.CreateMock(file)
-
- os.path.exists('%s/%s' % (
- self.download_folder, 'download_url')).AndReturn(False)
-
- self.CommonDownloadAndExtractImage()
-
-
- def testGrabZipAndExtractImageWrongCache(self):
- """Test case where download_url exists but doesn't match our url."""
- self.mox.StubOutWithMock(os.path, 'exists')
- self.mox.StubOutWithMock(os, 'mkdir')
- self.mox.StubOutWithMock(__builtins__, 'open')
- self.mox.StubOutWithMock(ctest, 'RunCommand')
- self.mox.StubOutWithMock(urllib, 'urlretrieve')
- self.mox.StubOutWithMock(ctest, 'ModifyBootDesc')
-
- m_file = self.mox.CreateMock(file)
-
- os.path.exists('%s/%s' % (
- self.download_folder, 'download_url')).AndReturn(True)
-
- open('%s/%s' % (self.download_folder, 'download_url')).AndReturn(m_file)
- m_file.read().AndReturn(self.image_url)
- m_file.close()
-
- os.path.exists('%s/%s' % (
- self.download_folder, ctest._IMAGE_TO_EXTRACT)).AndReturn(False)
-
- self.CommonDownloadAndExtractImage()
-
- def testGetLatestLinkFromPage(self):
- """Tests whether we get the latest link from a url given a regex."""
- test_url = 'test_url'
- test_html = """
-
-
-
- Test Index
- Cruft
- Cruft
- testlink1/
- testlink2/
- testlink3/
-
- """
- self.mox.StubOutWithMock(urllib, 'urlopen')
- m_file = self.mox.CreateMock(file)
-
- urllib.urlopen(test_url).AndReturn(m_file)
- m_file.read().AndReturn(test_html)
- m_file.close()
-
- self.mox.ReplayAll()
- latest_link = ctest.GetLatestLinkFromPage(test_url, regex=self.test_regex)
- self.assertTrue(latest_link == 'ChromeOS-0.9.12.4-blahblah.zip')
- self.mox.VerifyAll()
-
-
-class HTMLDirectoryParserTest(unittest.TestCase):
- """Test class for HTMLDirectoryParser."""
-
- def setUp(self):
- self.test_regex = '\d+\.\d+\.\d+\.\d+/'
-
- def testHandleStarttagGood(self):
- parser = ctest.HTMLDirectoryParser(regex=self.test_regex)
- parser.handle_starttag('a', [('href', '0.9.74.1/')])
- self.assertTrue('0.9.74.1' in parser.link_list)
-
- def testHandleStarttagBad(self):
- parser = ctest.HTMLDirectoryParser(regex=self.test_regex)
- parser.handle_starttag('a', [('href', 'ZsomeCruft/')])
- self.assertTrue('ZsomeCruft' not in parser.link_list)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/lib/cros_build_lib.py b/lib/cros_build_lib.py
index 5cd39b1bd3..d846757677 100644
--- a/lib/cros_build_lib.py
+++ b/lib/cros_build_lib.py
@@ -11,8 +11,8 @@ import subprocess
import sys
_STDOUT_IS_TTY = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
-CROSUTILS_DIRECTORY = os.path.realpath(os.path.dirname(os.path.dirname(
- __file__)))
+CROSUTILS_DIRECTORY = os.path.dirname(os.path.dirname(
+ os.path.realpath(__file__)))
# TODO(sosa): Move logging to logging module.