Remove au_test_harness code and change symlinks to point to new location

Change-Id: I55a198e09ceac1e05b766864dbc920bfaedffb1f

BUG=chromium-os:11172
TEST=Tested ctest and cros_au_test_harness running from src/scripts
with buildbot params.

Review URL: http://codereview.chromium.org/6717011
This commit is contained in:
Chris Sosa 2011-03-22 13:17:39 -07:00
parent f9c49add44
commit 8d7caa720a
17 changed files with 4 additions and 2004 deletions

View File

@ -1,288 +0,0 @@
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module containing a test suite that is run to test auto updates."""
import os
import tempfile
import time
import unittest
import cros_build_lib as cros_lib
import cros_test_proxy
import dummy_au_worker
import real_au_worker
import vm_au_worker
class AUTest(unittest.TestCase):
"""Test harness that uses an au_worker to perform and validate updates.
Defines a test suite that is run using an au_worker. An au_worker can
be created to perform and validates updates on both virtual and real devices.
See documentation for au_worker for more information.
"""
test_results_root = None
public_key_managers = []
@classmethod
def ProcessOptions(cls, options, use_dummy_worker):
"""Processes options for the test suite and sets up the worker class.
Args:
options: options class to be parsed from main class.
use_dummy_worker: If True, use a dummy_worker_class rather than deriving
one from options.type.
"""
cls.base_image_path = options.base_image
cls.target_image_path = options.target_image
cls.clean = options.clean
assert options.type in ['real', 'vm'], 'Failed to specify either real|vm.'
if use_dummy_worker:
cls.worker_class = dummy_au_worker.DummyAUWorker
elif options.type == 'vm':
cls.worker_class = vm_au_worker.VMAUWorker
else:
cls.worker_class = real_au_worker.RealAUWorker
# Sanity checks.
if not cls.base_image_path:
cros_lib.Die('Need path to base image for vm.')
elif not os.path.exists(cls.base_image_path):
cros_lib.Die('%s does not exist' % cls.base_image_path)
if not cls.target_image_path:
cros_lib.Die('Need path to target image to update with.')
elif not os.path.exists(cls.target_image_path):
cros_lib.Die('%s does not exist' % cls.target_image_path)
# Initialize test root. Test root path must be in the chroot.
if not cls.test_results_root:
if options.test_results_root:
assert 'chroot/tmp' in options.test_results_root, \
'Must specify a test results root inside tmp in a chroot.'
cls.test_results_root = options.test_results_root
else:
cls.test_results_root = tempfile.mkdtemp(
prefix='au_test_harness',
dir=cros_lib.PrependChrootPath('/tmp'))
cros_lib.Info('Using %s as the test results root' % cls.test_results_root)
# Cache away options to instantiate workers later.
cls.options = options
def AttemptUpdateWithPayloadExpectedFailure(self, payload, expected_msg):
"""Attempt a payload update, expect it to fail with expected log"""
try:
self.worker.UpdateUsingPayload(payload)
except UpdateException as err:
# Will raise ValueError if expected is not found.
if re.search(re.escape(expected_msg), err.stdout, re.MULTILINE):
return
else:
cros_lib.Warning("Didn't find '%s' in:" % expected_msg)
cros_lib.Warning(err.stdout)
self.fail('We managed to update when failure was expected')
def AttemptUpdateWithFilter(self, filter, proxy_port=8081):
"""Update through a proxy, with a specified filter, and expect success."""
self.worker.PrepareBase(self.target_image_path)
# The devserver runs at port 8080 by default. We assume that here, and
# start our proxy at a different one. We then tell our update tools to
# have the client connect to our proxy_port instead of 8080.
proxy = cros_test_proxy.CrosTestProxy(port_in=proxy_port,
address_out='127.0.0.1',
port_out=8080,
filter=filter)
proxy.serve_forever_in_thread()
try:
self.worker.PerformUpdate(self.target_image_path, self.target_image_path,
proxy_port=proxy_port)
finally:
proxy.shutdown()
# --- UNITTEST SPECIFIC METHODS ---
def setUp(self):
"""Overrides unittest.TestCase.setUp and called before every test.
Sets instance specific variables and initializes worker.
"""
unittest.TestCase.setUp(self)
self.worker = self.worker_class(self.options, AUTest.test_results_root)
self.crosutils = os.path.join(os.path.dirname(__file__), '..', '..')
self.download_folder = os.path.join(self.crosutils, 'latest_download')
if not os.path.exists(self.download_folder):
os.makedirs(self.download_folder)
def tearDown(self):
"""Overrides unittest.TestCase.tearDown and called after every test."""
self.worker.CleanUp()
def testUpdateKeepStateful(self):
"""Tests if we can update normally.
This test checks that we can update by updating the stateful partition
rather than wiping it.
"""
self.worker.InitializeResultsDirectory()
# Just make sure some tests pass on original image. Some old images
# don't pass many tests.
self.worker.PrepareBase(self.base_image_path)
# TODO(sosa): move to 100% once we start testing using the autotest paired
# with the dev channel.
percent_passed = self.worker.VerifyImage(self, 10)
# Update to - all tests should pass on new image.
self.worker.PerformUpdate(self.target_image_path, self.base_image_path)
percent_passed = self.worker.VerifyImage(self)
# Update from - same percentage should pass that originally passed.
self.worker.PerformUpdate(self.base_image_path, self.target_image_path)
self.worker.VerifyImage(self, percent_passed)
def testUpdateWipeStateful(self):
"""Tests if we can update after cleaning the stateful partition.
This test checks that we can update successfully after wiping the
stateful partition.
"""
self.worker.InitializeResultsDirectory()
# Just make sure some tests pass on original image. Some old images
# don't pass many tests.
self.worker.PrepareBase(self.base_image_path)
percent_passed = self.worker.VerifyImage(self, 10)
# Update to - all tests should pass on new image.
self.worker.PerformUpdate(self.target_image_path, self.base_image_path,
'clean')
self.worker.VerifyImage(self)
# Update from - same percentage should pass that originally passed.
self.worker.PerformUpdate(self.base_image_path, self.target_image_path,
'clean')
self.worker.VerifyImage(self, percent_passed)
def testInterruptedUpdate(self):
"""Tests what happens if we interrupt payload delivery 3 times."""
class InterruptionFilter(cros_test_proxy.Filter):
"""This filter causes the proxy to interrupt the download 3 times
It does this by closing the first three connections to transfer
2M total in the outbound connection after they transfer the
2M.
"""
def __init__(self):
"""Defines variable shared across all connections"""
self.close_count = 0
def setup(self):
"""Called once at the start of each connection."""
self.data_size = 0
def OutBound(self, data):
"""Called once per packet for outgoing data.
The first three connections transferring more than 2M
outbound will be closed.
"""
if self.close_count < 3:
if self.data_size > (2 * 1024 * 1024):
self.close_count += 1
return None
self.data_size += len(data)
return data
self.worker.InitializeResultsDirectory()
self.AttemptUpdateWithFilter(InterruptionFilter(), proxy_port=8082)
def testDelayedUpdate(self):
"""Tests what happens if some data is delayed during update delivery"""
class DelayedFilter(cros_test_proxy.Filter):
"""Causes intermittent delays in data transmission.
It does this by inserting 3 20 second delays when transmitting
data after 2M has been sent.
"""
def setup(self):
"""Called once at the start of each connection."""
self.data_size = 0
self.delay_count = 0
def OutBound(self, data):
"""Called once per packet for outgoing data.
The first three packets after we reach 2M transferred
are delayed by 20 seconds.
"""
if self.delay_count < 3:
if self.data_size > (2 * 1024 * 1024):
self.delay_count += 1
time.sleep(20)
self.data_size += len(data)
return data
self.worker.InitializeResultsDirectory()
self.AttemptUpdateWithFilter(DelayedFilter(), proxy_port=8083)
def SimpleTest(self):
"""A simple update that updates once from a base image to a target.
We explicitly don't use test prefix so that isn't run by default. Can be
run using test_prefix option.
"""
self.worker.InitializeResultsDirectory()
self.worker.PrepareBase(self.base_image_path)
self.worker.PerformUpdate(self.target_image_path, self.base_image_path)
self.worker.VerifyImage(self)
# --- DISABLED TESTS ---
# TODO(sosa): Get test to work with verbose.
def NotestPartialUpdate(self):
"""Tests what happens if we attempt to update with a truncated payload."""
self.worker.InitializeResultsDirectory()
# Preload with the version we are trying to test.
self.worker.PrepareBase(self.target_image_path)
# Image can be updated at:
# ~chrome-eng/chromeos/localmirror/autest-images
url = 'http://gsdview.appspot.com/chromeos-localmirror/' \
'autest-images/truncated_image.gz'
payload = os.path.join(self.download_folder, 'truncated_image.gz')
# Read from the URL and write to the local file
urllib.urlretrieve(url, payload)
expected_msg = 'download_hash_data == update_check_response_hash failed'
self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg)
# TODO(sosa): Get test to work with verbose.
def NotestCorruptedUpdate(self):
"""Tests what happens if we attempt to update with a corrupted payload."""
self.worker.InitializeResultsDirectory()
# Preload with the version we are trying to test.
self.worker.PrepareBase(self.target_image_path)
# Image can be updated at:
# ~chrome-eng/chromeos/localmirror/autest-images
url = 'http://gsdview.appspot.com/chromeos-localmirror/' \
'autest-images/corrupted_image.gz'
payload = os.path.join(self.download_folder, 'corrupted.gz')
# Read from the URL and write to the local file
urllib.urlretrieve(url, payload)
# This update is expected to fail...
expected_msg = 'zlib inflate() error:-3'
self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg)

View File

@ -1,272 +0,0 @@
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module that contains the interface for au_test_harness workers.
An au test harnss worker is a class that contains the logic for performing
and validating updates on a target. This should be subclassed to handle
various types of target. Types of targets include VM's, real devices, etc.
"""
import inspect
import threading
import os
import sys
import cros_build_lib as cros_lib
import dev_server_wrapper
import update_exception
class AUWorker(object):
"""Interface for a worker that updates and verifies images."""
# Mapping between cached payloads to directory locations.
update_cache = None
# --- INTERFACE ---
def __init__(self, options, test_results_root):
"""Processes options for the specific-type of worker."""
self.board = options.board
self.private_key = options.private_key
self.test_results_root = test_results_root
self.use_delta_updates = options.delta
self.verbose = options.verbose
self.vm_image_path = None
if options.quick_test:
self.verify_suite = 'build_RootFilesystemSize'
else:
self.verify_suite = 'suite_Smoke'
# Set these up as they are used often.
self.crosutils = os.path.join(os.path.dirname(__file__), '..', '..')
self.crosutilsbin = os.path.join(os.path.dirname(__file__), '..')
def CleanUp(self):
"""Called at the end of every test."""
pass
def UpdateImage(self, image_path, src_image_path='', stateful_change='old',
proxy_port=None, private_key_path=None):
"""Implementation of an actual update.
See PerformUpdate for description of args. Subclasses must override this
method with the correct update procedure for the class.
"""
pass
def UpdateUsingPayload(self, update_path, stateful_change='old',
proxy_port=None):
"""Updates target with the pre-generated update stored in update_path.
Subclasses must override this method with the correct update procedure for
the class.
Args:
update_path: Path to the image to update with. This directory should
contain both update.gz, and stateful.image.gz
proxy_port: Port to have the client connect to. For use with
CrosTestProxy.
"""
pass
def VerifyImage(self, unittest, percent_required_to_pass=100):
"""Verifies the image with tests.
Verifies that the test images passes the percent required. Subclasses must
override this method with the correct update procedure for the class.
Args:
unittest: pointer to a unittest to fail if we cannot verify the image.
percent_required_to_pass: percentage required to pass. This should be
fall between 0-100.
Returns:
Returns the percent that passed.
"""
pass
# --- INTERFACE TO AU_TEST ---
def PerformUpdate(self, image_path, src_image_path='', stateful_change='old',
proxy_port=None, private_key_path=None):
"""Performs an update using _UpdateImage and reports any error.
Subclasses should not override this method but override _UpdateImage
instead.
Args:
image_path: Path to the image to update with. This image must be a test
image.
src_image_path: Optional. If set, perform a delta update using the
image specified by the path as the source image.
stateful_change: How to modify the stateful partition. Values are:
'old': Don't modify stateful partition. Just update normally.
'clean': Uses clobber-state to wipe the stateful partition with the
exception of code needed for ssh.
proxy_port: Port to have the client connect to. For use with
CrosTestProxy.
private_key_path: Path to a private key to use with update payload.
Raises an update_exception.UpdateException if _UpdateImage returns an error.
"""
try:
if not self.use_delta_updates: src_image_path = ''
if private_key_path:
key_to_use = private_key_path
else:
key_to_use = self.private_key
self.UpdateImage(image_path, src_image_path, stateful_change,
proxy_port, key_to_use)
except update_exception.UpdateException as err:
# If the update fails, print it out
Warning(err.stdout)
raise
@classmethod
def SetUpdateCache(cls, update_cache):
"""Sets the global update cache for getting paths to devserver payloads."""
cls.update_cache = update_cache
# --- METHODS FOR SUB CLASS USE ---
def PrepareRealBase(self, image_path):
"""Prepares a remote device for worker test by updating it to the image."""
self.UpdateImage(image_path)
def PrepareVMBase(self, image_path):
"""Prepares a VM image for worker test by creating the VM file from the img.
"""
# VM Constants.
FULL_VDISK_SIZE = 6072
FULL_STATEFULFS_SIZE = 3074
# Needed for VM delta updates. We need to use the qemu image rather
# than the base image on a first update. By tracking the first_update
# we can set src_image to the qemu form of the base image when
# performing generating the delta payload.
self._first_update = True
self.vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname(
image_path)
if not os.path.exists(self.vm_image_path):
cros_lib.Info('Creating %s' % self.vm_image_path)
cros_lib.RunCommand(['./image_to_vm.sh',
'--full',
'--from=%s' % cros_lib.ReinterpretPathForChroot(
os.path.dirname(image_path)),
'--vdisk_size=%s' % FULL_VDISK_SIZE,
'--statefulfs_size=%s' % FULL_STATEFULFS_SIZE,
'--board=%s' % self.board,
'--test_image'
], enter_chroot=True, cwd=self.crosutils)
cros_lib.Info('Using %s as base' % self.vm_image_path)
assert os.path.exists(self.vm_image_path)
def GetStatefulChangeFlag(self, stateful_change):
"""Returns the flag to pass to image_to_vm for the stateful change."""
stateful_change_flag = ''
if stateful_change:
stateful_change_flag = '--stateful_update_flag=%s' % stateful_change
return stateful_change_flag
def AppendUpdateFlags(self, cmd, image_path, src_image_path, proxy_port,
private_key_path):
"""Appends common args to an update cmd defined by an array.
Modifies cmd in places by appending appropriate items given args.
"""
if proxy_port: cmd.append('--proxy_port=%s' % proxy_port)
# Get pregenerated update if we have one.
update_id = dev_server_wrapper.GenerateUpdateId(image_path, src_image_path,
private_key_path)
cache_path = self.update_cache[update_id]
if cache_path:
update_url = dev_server_wrapper.DevServerWrapper.GetDevServerURL(
proxy_port, cache_path)
cmd.append('--update_url=%s' % update_url)
else:
cmd.append('--image=%s' % image_path)
if src_image_path: cmd.append('--src_image=%s' % src_image_path)
def RunUpdateCmd(self, cmd, log_directory=None):
"""Runs the given update cmd given verbose options.
Raises an update_exception.UpdateException if the update fails.
"""
if self.verbose:
try:
if log_directory:
cros_lib.RunCommand(cmd, log_to_file=os.path.join(log_directory,
'update.log'))
else:
cros_lib.RunCommand(cmd)
except Exception as e:
Warning(str(e))
raise update_exception.UpdateException(1, str(e))
else:
(code, stdout, stderr) = cros_lib.RunCommandCaptureOutput(cmd)
if code != 0:
Warning(stdout)
raise update_exception.UpdateException(code, stdout)
def AssertEnoughTestsPassed(self, unittest, output, percent_required_to_pass):
"""Helper function that asserts a sufficient number of tests passed.
Args:
output: stdout from a test run.
percent_required_to_pass: percentage required to pass. This should be
fall between 0-100.
Returns:
percent that passed.
"""
cros_lib.Info('Output from VerifyImage():')
print >> sys.stderr, output
sys.stderr.flush()
percent_passed = self._ParseGenerateTestReportOutput(output)
cros_lib.Info('Percent passed: %d vs. Percent required: %d' % (
percent_passed, percent_required_to_pass))
unittest.assertTrue(percent_passed >= percent_required_to_pass)
return percent_passed
def InitializeResultsDirectory(self):
"""Called by a test to initialize a results directory for this worker."""
# Use the name of the test.
test_name = inspect.stack()[1][3]
self.results_directory = os.path.join(self.test_results_root, test_name)
self.results_count = 0
def GetNextResultsPath(self, label):
"""Returns a path for the results directory for this label.
Prefixes directory returned for worker with time called i.e. 1_label,
2_label, etc. The directory returned is outside the chroot so if passing
to an script that is called with enther_chroot, make sure to use
ReinterpretPathForChroot.
"""
self.results_count += 1
dir = os.path.join(self.results_directory, '%s_%s' % (self.results_count,
label))
if not os.path.exists(dir):
os.makedirs(dir)
return dir
# --- PRIVATE HELPER FUNCTIONS ---
def _ParseGenerateTestReportOutput(self, output):
"""Returns the percentage of tests that passed based on output."""
percent_passed = 0
lines = output.split('\n')
for line in lines:
if line.startswith("Total PASS:"):
# FORMAT: ^TOTAL PASS: num_passed/num_total (percent%)$
percent_passed = line.split()[3].strip('()%')
cros_lib.Info('Percent of tests passed %s' % percent_passed)
break
return int(percent_passed)

View File

@ -1,278 +0,0 @@
#!/usr/bin/python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module runs a suite of Auto Update tests.
The tests can be run on either a virtual machine or actual device depending
on parameters given. Specific tests can be run by invoking --test_prefix.
Verbose is useful for many of the tests if you want to see individual commands
being run during the update process.
"""
import optparse
import os
import re
import sys
import unittest
sys.path.append(os.path.join(os.path.dirname(__file__), '../lib'))
import cros_build_lib as cros_lib
import au_test
import au_worker
import dummy_au_worker
import dev_server_wrapper
import parallel_test_job
import public_key_manager
import update_exception
def _PrepareTestSuite(options, use_dummy_worker=False):
"""Returns a prepared test suite given by the options and test class."""
au_test.AUTest.ProcessOptions(options, use_dummy_worker)
test_loader = unittest.TestLoader()
test_loader.testMethodPrefix = options.test_prefix
return test_loader.loadTestsFromTestCase(au_test.AUTest)
def _PregenerateUpdates(options):
"""Determines all deltas that will be generated and generates them.
This method effectively pre-generates the dev server cache for all tests.
Args:
options: options from parsed parser.
Returns:
Dictionary of Update Identifiers->Relative cache locations.
Raises:
update_exception.UpdateException if we fail to generate an update.
"""
def _GenerateVMUpdate(target, src, private_key_path):
"""Generates an update using the devserver."""
command = ['./enter_chroot.sh',
'--',
'sudo',
'start_devserver',
'--pregenerate_update',
'--exit',
]
# Add actual args to command.
command.append('--image=%s' % cros_lib.ReinterpretPathForChroot(target))
if src: command.append('--src_image=%s' %
cros_lib.ReinterpretPathForChroot(src))
if options.type == 'vm': command.append('--for_vm')
if private_key_path:
command.append('--private_key=%s' %
cros_lib.ReinterpretPathForChroot(private_key_path))
return cros_lib.RunCommandCaptureOutput(command, combine_stdout_stderr=True,
print_cmd=True)
# Use dummy class to mock out updates that would be run as part of a test.
test_suite = _PrepareTestSuite(options, use_dummy_worker=True)
test_result = unittest.TextTestRunner(verbosity=0).run(test_suite)
if not test_result.wasSuccessful():
raise update_exception.UpdateException(1,
'Error finding updates to generate.')
cros_lib.Info('The following delta updates are required.')
update_ids = []
jobs = []
args = []
modified_images = set()
for target, srcs in dummy_au_worker.DummyAUWorker.delta_list.items():
modified_images.add(target)
for src_key in srcs:
(src, _ , key) = src_key.partition('+')
if src: modified_images.add(src)
# TODO(sosa): Add private key as part of caching name once devserver can
# handle it its own cache.
update_id = dev_server_wrapper.GenerateUpdateId(target, src, key)
print >> sys.stderr, 'AU: %s' % update_id
update_ids.append(update_id)
jobs.append(_GenerateVMUpdate)
args.append((target, src, key))
# Always add the base image path. This is only useful for non-delta updates.
modified_images.add(options.base_image)
# Add public key to all images we are using.
if options.public_key:
cros_lib.Info('Adding public keys to images for testing.')
for image in modified_images:
manager = public_key_manager.PublicKeyManager(image, options.public_key)
manager.AddKeyToImage()
au_test.AUTest.public_key_managers.append(manager)
raw_results = parallel_test_job.RunParallelJobs(options.jobs, jobs, args,
print_status=True)
results = []
# Looking for this line in the output.
key_line_re = re.compile('^PREGENERATED_UPDATE=([\w/.]+)')
for result in raw_results:
(return_code, output, _) = result
if return_code != 0:
cros_lib.Warning(output)
raise update_exception.UpdateException(return_code,
'Failed to generate all updates.')
else:
for line in output.splitlines():
match = key_line_re.search(line)
if match:
# Convert blah/blah/update.gz -> update/blah/blah.
path_to_update_gz = match.group(1).rstrip()
(path_to_update_dir, _, _) = path_to_update_gz.rpartition(
'/update.gz')
results.append('/'.join(['update', path_to_update_dir]))
break
# Make sure all generation of updates returned cached locations.
if len(raw_results) != len(results):
raise update_exception.UpdateException(
1, 'Insufficient number cache directories returned.')
# Build the dictionary from our id's and returned cache paths.
cache_dictionary = {}
for index, id in enumerate(update_ids):
cache_dictionary[id] = results[index]
return cache_dictionary
def _RunTestsInParallel(options):
"""Runs the tests given by the options in parallel."""
threads = []
args = []
test_suite = _PrepareTestSuite(options)
for test in test_suite:
test_name = test.id()
test_case = unittest.TestLoader().loadTestsFromName(test_name)
threads.append(unittest.TextTestRunner(verbosity=2).run)
args.append(test_case)
results = parallel_test_job.RunParallelJobs(options.jobs, threads, args,
print_status=False)
for test_result in results:
if not test_result.wasSuccessful():
cros_lib.Die('Test harness was not successful')
def _CleanPreviousWork(options):
"""Cleans up previous work from the devserver cache and local image cache."""
cros_lib.Info('Cleaning up previous work.')
# Wipe devserver cache.
cros_lib.RunCommandCaptureOutput(
['sudo', 'start_devserver', '--clear_cache', '--exit', ],
enter_chroot=True, print_cmd=False, combine_stdout_stderr=True)
# Clean previous vm images if they exist.
if options.type == 'vm':
target_vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname(
options.target_image)
base_vm_image_path = '%s/chromiumos_qemu_image.bin' % os.path.dirname(
options.base_image)
if os.path.exists(target_vm_image_path): os.remove(target_vm_image_path)
if os.path.exists(base_vm_image_path): os.remove(base_vm_image_path)
def main():
parser = optparse.OptionParser()
parser.add_option('-b', '--base_image',
help='path to the base image.')
parser.add_option('-r', '--board',
help='board for the images.')
parser.add_option('--clean', default=False, dest='clean', action='store_true',
help='Clean all previous state')
parser.add_option('--no_delta', action='store_false', default=True,
dest='delta',
help='Disable using delta updates.')
parser.add_option('--no_graphics', action='store_true',
help='Disable graphics for the vm test.')
parser.add_option('-j', '--jobs', default=8, type=int,
help='Number of simultaneous jobs')
parser.add_option('--public_key', default=None,
help='Public key to use on images and updates.')
parser.add_option('--private_key', default=None,
help='Private key to use on images and updates.')
parser.add_option('-q', '--quick_test', default=False, action='store_true',
help='Use a basic test to verify image.')
parser.add_option('-m', '--remote',
help='Remote address for real test.')
parser.add_option('-t', '--target_image',
help='path to the target image.')
parser.add_option('--test_results_root', default=None,
help='Root directory to store test results. Should '
'be defined relative to chroot root.')
parser.add_option('--test_prefix', default='test',
help='Only runs tests with specific prefix i.e. '
'testFullUpdateWipeStateful.')
parser.add_option('-p', '--type', default='vm',
help='type of test to run: [vm, real]. Default: vm.')
parser.add_option('--verbose', default=True, action='store_true',
help='Print out rather than capture output as much as '
'possible.')
(options, leftover_args) = parser.parse_args()
if leftover_args: parser.error('Found unsupported flags: %s' % leftover_args)
assert options.target_image and os.path.exists(options.target_image), \
'Target image path does not exist'
if not options.base_image:
cros_lib.Info('Base image not specified. Using target as base image.')
options.base_image = options.target_image
if options.private_key or options.public_key:
error_msg = ('Could not find %s key. Both private and public keys must be '
'specified if either is specified.')
assert options.private_key and os.path.exists(options.private_key), \
error_msg % 'private'
assert options.public_key and os.path.exists(options.public_key), \
error_msg % 'public'
# Clean up previous work if requested.
if options.clean: _CleanPreviousWork(options)
# Make sure we have a log directory.
if options.test_results_root and not os.path.exists(
options.test_results_root):
os.makedirs(options.test_results_root)
# Pre-generate update modifies images by adding public keys to them.
# Wrap try to make sure we clean this up before we're done.
try:
# Generate cache of updates to use during test harness.
update_cache = _PregenerateUpdates(options)
au_worker.AUWorker.SetUpdateCache(update_cache)
my_server = dev_server_wrapper.DevServerWrapper(
au_test.AUTest.test_results_root)
my_server.start()
try:
if options.type == 'vm':
_RunTestsInParallel(options)
else:
# TODO(sosa) - Take in a machine pool for a real test.
# Can't run in parallel with only one remote device.
test_suite = _PrepareTestSuite(options)
test_result = unittest.TextTestRunner(verbosity=2).run(test_suite)
if not test_result.wasSuccessful(): cros_lib.Die('Test harness failed.')
finally:
my_server.Stop()
finally:
# Un-modify any target images we modified. We don't need to un-modify
# non-targets because they aren't important for archival steps.
if options.public_key:
cros_lib.Info('Cleaning up. Removing keys added as part of testing.')
target_directory = os.path.dirname(options.target_image)
for key_manager in au_test.AUTest.public_key_managers:
if key_manager.image_path.startswith(target_directory):
key_manager.RemoveKeyFromImage()
if __name__ == '__main__':
main()

View File

@ -1,121 +0,0 @@
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module containing various classes pertaining to inserting a proxy in a test.
"""
import os
import select
import socket
import SocketServer
import threading
class Filter(object):
"""Base class for data filters.
Pass subclass of this to CrosTestProxy which will perform whatever
connection manipulation you prefer.
"""
def setup(self):
"""This setup method is called once per connection."""
pass
def InBound(self, data):
"""This method is called once per packet of incoming data.
The value returned is what is sent through the proxy. If
None is returned, the connection will be closed.
"""
return data
def OutBound(self, data):
"""This method is called once per packet of outgoing data.
The value returned is what is sent through the proxy. If
None is returned, the connection will be closed.
"""
return data
class CrosTestProxy(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
"""A transparent proxy for simulating network errors"""
class _Handler(SocketServer.BaseRequestHandler):
"""Proxy connection handler that passes data though a filter"""
def setup(self):
"""Setup is called once for each connection proxied."""
self.server.filter.setup()
def handle(self):
"""Handles each incoming connection.
Opens a new connection to the port we are proxing to, then
passes each packet along in both directions after passing
them through the filter object passed in.
"""
# Open outgoing socket
s_in = self.request
s_out = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s_out.connect((self.server.address_out, self.server.port_out))
while True:
rlist, wlist, xlist = select.select([s_in, s_out], [], [])
if s_in in rlist:
data = s_in.recv(1024)
data = self.server.filter.InBound(data)
if not data: break
try:
# If there is any error sending data, close both connections.
s_out.sendall(data)
except socket.error:
break
if s_out in rlist:
data = s_out.recv(1024)
data = self.server.filter.OutBound(data)
if not data: break
try:
# If there is any error sending data, close both connections.
s_in.sendall(data)
except socket.error:
break
s_in.close()
s_out.close()
def __init__(self,
filter,
port_in=8081,
address_out='127.0.0.1', port_out=8080):
"""Configures the proxy object.
Args:
filter: An instance of a subclass of Filter.
port_in: Port on which to listen for incoming connections.
address_out: Address to which outgoing connections will go.
address_port: Port to which outgoing connections will go.
"""
self.port_in = port_in
self.address_out = address_out
self.port_out = port_out
self.filter = filter
try:
SocketServer.TCPServer.__init__(self,
('', port_in),
self._Handler)
except socket.error:
os.system('sudo netstat -l --tcp -n -p')
raise
def serve_forever_in_thread(self):
"""Helper method to start the server in a new background thread."""
server_thread = threading.Thread(target=self.serve_forever)
server_thread.setDaemon(True)
server_thread.start()
return server_thread

View File

@ -1,54 +0,0 @@
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module containing methods and classes to interact with a devserver instance.
"""
import os
import threading
import cros_build_lib as cros_lib
def GenerateUpdateId(target, src, key):
"""Returns a simple representation id of target and src paths."""
update_id = target
if src: update_id = '->'.join([src, update_id])
if key: update_id = '+'.join([update_id, key])
return update_id
class DevServerWrapper(threading.Thread):
"""A Simple wrapper around a dev server instance."""
def __init__(self, test_root):
self.proc = None
self.test_root = test_root
threading.Thread.__init__(self)
def run(self):
# Kill previous running instance of devserver if it exists.
cros_lib.RunCommand(['sudo', 'pkill', '-f', 'devserver.py'], error_ok=True,
print_cmd=False)
cros_lib.RunCommand(['sudo',
'start_devserver',
'--archive_dir=./static',
'--client_prefix=ChromeOSUpdateEngine',
'--production',
], enter_chroot=True, print_cmd=False,
log_to_file=os.path.join(self.test_root,
'dev_server.log'))
def Stop(self):
"""Kills the devserver instance."""
cros_lib.RunCommand(['sudo', 'pkill', '-f', 'devserver.py'], error_ok=True,
print_cmd=False)
@classmethod
def GetDevServerURL(cls, port, sub_dir):
"""Returns the dev server url for a given port and sub directory."""
ip_addr = cros_lib.GetIPAddress()
if not port: port = 8080
url = 'http://%(ip)s:%(port)s/%(dir)s' % {'ip': ip_addr,
'port': str(port),
'dir': sub_dir}
return url

View File

@ -1,45 +0,0 @@
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module containing a fake au worker class."""
import unittest
import au_worker
class DummyAUWorker(au_worker.AUWorker):
"""AU worker that emulates work for an au_worker without actually doing work.
Collects different updates that would be generated that can be obtained
from the class object delta_list.
"""
# Class variable that stores the list of payloads that would be needed.
delta_list = {}
def __init__(self, options, test_results_root):
au_worker.AUWorker.__init__(self, options, test_results_root)
self.au_type = options.type
def PrepareBase(self, image_path):
"""Copy how the actual worker would prepare the base image."""
if self.au_type == 'vm':
self.PrepareVMBase(image_path)
else:
self.PrepareRealBase(image_path)
def UpdateImage(self, image_path, src_image_path='', stateful_change='old',
proxy_port=None, private_key_path=None):
"""Emulate Update and record the update payload in delta_list."""
if self.au_type == 'vm' and src_image_path and self._first_update:
src_image_path = self.vm_image_path
self._first_update = False
# Generate a value that combines delta with private key path.
val = src_image_path
if private_key_path: val = '%s+%s' % (val, private_key_path)
if not self.delta_list.has_key(image_path):
self.delta_list[image_path] = set([val])
else:
self.delta_list[image_path].add(val)

View File

@ -1,109 +0,0 @@
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module containing methods/classes related to running parallel test jobs."""
import sys
import threading
import time
import cros_build_lib as cros_lib
class ParallelJob(threading.Thread):
"""Small wrapper for threading. Thread that releases a semaphores on exit."""
def __init__(self, starting_semaphore, ending_semaphore, target, args):
"""Initializes an instance of a job.
Args:
starting_semaphore: Semaphore used by caller to wait on such that
there isn't more than a certain number of threads running. Should
be initialized to a value for the number of threads wanting to be run
at a time.
ending_semaphore: Semaphore is released every time a job ends. Should be
initialized to 0 before starting first job. Should be acquired once for
each job. Threading.Thread.join() has a bug where if the run function
terminates too quickly join() will hang forever.
target: The func to run.
args: Args to pass to the fun.
"""
threading.Thread.__init__(self, target=target, args=args)
self._target = target
self._args = args
self._starting_semaphore = starting_semaphore
self._ending_semaphore = ending_semaphore
self._output = None
self._completed = False
def run(self):
"""Thread override. Runs the method specified and sets output."""
try:
self._output = self._target(*self._args)
finally:
# Our own clean up.
self._Cleanup()
self._completed = True
# From threading.py to avoid a refcycle.
del self._target, self._args
def GetOutput(self):
"""Returns the output of the method run."""
assert self._completed, 'GetOutput called before thread was run.'
return self._output
def _Cleanup(self):
"""Releases semaphores for a waiting caller."""
self._starting_semaphore.release()
self._ending_semaphore.release()
def __str__(self):
return '%s(%s)' % (self._target, self._args)
def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args,
print_status):
"""Runs set number of specified jobs in parallel.
Args:
number_of_simultaneous_jobs: Max number of threads to be run in parallel.
jobs: Array of methods to run.
jobs_args: Array of args associated with method calls.
print_status: True if you'd like this to print out .'s as it runs jobs.
Returns:
Returns an array of results corresponding to each thread.
"""
def _TwoTupleize(x, y):
return (x, y)
threads = []
job_start_semaphore = threading.Semaphore(number_of_simultaneous_jobs)
join_semaphore = threading.Semaphore(0)
assert len(jobs) == len(jobs_args), 'Length of args array is wrong.'
# Create the parallel jobs.
for job, args in map(_TwoTupleize, jobs, jobs_args):
thread = ParallelJob(job_start_semaphore, join_semaphore, target=job,
args=args)
threads.append(thread)
# Cache sudo access.
cros_lib.RunCommand(['sudo', 'echo', 'Caching sudo credentials'],
print_cmd=False, redirect_stdout=True,
redirect_stderr=True)
# We use a semaphore to ensure we don't run more jobs than required.
# After each thread finishes, it releases (increments semaphore).
# Acquire blocks of num jobs reached and continues when a thread finishes.
for next_thread in threads:
job_start_semaphore.acquire(blocking=True)
next_thread.start()
# Wait on the rest of the threads to finish.
for thread in threads:
while not join_semaphore.acquire(blocking=False):
time.sleep(5)
if print_status:
print >> sys.stderr, '.',
return [thread.GetOutput() for thread in threads]

View File

@ -1,91 +0,0 @@
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module manages interactions between an image and a public key."""
import os
import tempfile
import cros_build_lib as cros_lib
class PublicKeyManager(object):
"""Class wrapping interactions with a public key on an image."""
TARGET_KEY_PATH = 'usr/share/update_engine/update-payload-key.pub.pem'
def __init__(self, image_path, key_path):
"""Initializes a manager with image_path and key_path we plan to insert."""
self.image_path = image_path
self.key_path = key_path
self._rootfs_dir = tempfile.mkdtemp(suffix='rootfs', prefix='tmp')
self._stateful_dir = tempfile.mkdtemp(suffix='stateful', prefix='tmp')
# Gather some extra information about the image.
try:
cros_lib.MountImage(image_path, self._rootfs_dir, self._stateful_dir,
read_only=True)
self._full_target_key_path = os.path.join(
self._rootfs_dir, PublicKeyManager.TARGET_KEY_PATH)
self._is_key_new = True
if os.path.exists(self._full_target_key_path):
diff_output = cros_lib.RunCommand(['diff',
self.key_path,
self._full_target_key_path],
print_cmd=False, redirect_stdout=True,
redirect_stderr=True, error_ok=True)
if not diff_output: self._is_key_new = False
finally:
cros_lib.UnmountImage(self._rootfs_dir, self._stateful_dir)
def __del__(self):
"""Remove our temporary directories we created in init."""
os.rmdir(self._rootfs_dir)
os.rmdir(self._stateful_dir)
def AddKeyToImage(self):
"""Adds the key specified in init to the image."""
if not self._is_key_new:
cros_lib.Info('Public key already on image %s. No work to do.' %
self.image_path)
return
cros_lib.Info('Copying %s into %s' % (self.key_path, self.image_path))
try:
cros_lib.MountImage(self.image_path, self._rootfs_dir, self._stateful_dir,
read_only=False)
dir_path = os.path.dirname(self._full_target_key_path)
cros_lib.RunCommand(['sudo', 'mkdir', '--parents', dir_path],
print_cmd=False)
cros_lib.RunCommand(['sudo', 'cp', '--force', '-p', self.key_path,
self._full_target_key_path], print_cmd=False)
finally:
cros_lib.UnmountImage(self._rootfs_dir, self._stateful_dir)
self._MakeImageBootable()
def RemoveKeyFromImage(self):
"""Removes the key specified in init from the image."""
cros_lib.Info('Removing public key from image %s.' % self.image_path)
try:
cros_lib.MountImage(self.image_path, self._rootfs_dir, self._stateful_dir,
read_only=False)
cros_lib.RunCommand(['sudo', 'rm', '--force', self._full_target_key_path],
print_cmd=False)
finally:
cros_lib.UnmountImage(self._rootfs_dir, self._stateful_dir)
self._MakeImageBootable()
def _MakeImageBootable(self):
"""Makes the image bootable. Note, it is only useful for non-vm images."""
image = os.path.basename(self.image_path)
if 'qemu' in image:
return
from_dir = os.path.dirname(self.image_path)
cros_lib.RunCommand(['bin/cros_make_image_bootable',
cros_lib.ReinterpretPathForChroot(from_dir),
image], print_cmd=False, redirect_stdout=True,
redirect_stderr=True, enter_chroot=True,
cwd=cros_lib.CROSUTILS_DIRECTORY)

View File

@ -1,63 +0,0 @@
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module containing class that implements an au_worker for a test device."""
import unittest
import cros_build_lib as cros_lib
import au_worker
class RealAUWorker(au_worker.AUWorker):
"""Test harness for updating real images."""
def __init__(self, options, test_results_root):
"""Processes non-vm-specific options."""
au_worker.AUWorker.__init__(self, options, test_results_root)
self.remote = options.remote
if not self.remote: cros_lib.Die('We require a remote address for tests.')
def PrepareBase(self, image_path):
"""Auto-update to base image to prepare for test."""
self.PrepareRealBase(image_path)
def UpdateImage(self, image_path, src_image_path='', stateful_change='old',
proxy_port=None, private_key_path=None):
"""Updates a remote image using image_to_live.sh."""
stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
cmd = ['%s/image_to_live.sh' % self.crosutils,
'--remote=%s' % self.remote,
stateful_change_flag,
'--verify',
]
self.AppendUpdateFlags(cmd, image_path, src_image_path, proxy_port,
private_key_path)
self.RunUpdateCmd(cmd)
def UpdateUsingPayload(self, update_path, stateful_change='old',
proxy_port=None):
"""Updates a remote image using image_to_live.sh."""
stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
cmd = ['%s/image_to_live.sh' % self.crosutils,
'--payload=%s' % update_path,
'--remote=%s' % self.remote,
stateful_change_flag,
'--verify',
]
if proxy_port: cmd.append('--proxy_port=%s' % proxy_port)
self.RunUpdateCmd(cmd)
def VerifyImage(self, unittest, percent_required_to_pass=100):
"""Verifies an image using run_remote_tests.sh with verification suite."""
test_directory = self.GetNextResultsPath('verify')
output = cros_lib.RunCommand(
['%s/run_remote_tests.sh' % self.crosutils,
'--remote=%s' % self.remote,
'--results_dir_root=%s' % test_directory,
self.verify_suite,
], error_ok=True, enter_chroot=False, redirect_stdout=True)
return self.AssertEnoughTestsPassed(unittest, output,
percent_required_to_pass)

View File

@ -1,11 +0,0 @@
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module containing update exceptions."""
class UpdateException(Exception):
"""Exception thrown when _UpdateImage or _UpdateUsingPayload fail"""
def __init__(self, code, stdout):
self.code = code
self.stdout = stdout

View File

@ -1,120 +0,0 @@
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module containing implementation of an au_worker for virtual machines."""
import os
import threading
import unittest
import cros_build_lib as cros_lib
import au_worker
class VMAUWorker(au_worker.AUWorker):
"""Test harness for updating virtual machines."""
# Class variables used to acquire individual VM variables per test.
_vm_lock = threading.Lock()
_next_port = 9222
def __init__(self, options, test_results_root):
"""Processes vm-specific options."""
au_worker.AUWorker.__init__(self, options, test_results_root)
self.graphics_flag = ''
if options.no_graphics: self.graphics_flag = '--no_graphics'
if not self.board: cros_lib.Die('Need board to convert base image to vm.')
self._AcquireUniquePortAndPidFile()
self._KillExistingVM(self._kvm_pid_file)
def _KillExistingVM(self, pid_file):
"""Kills an existing VM specified by the pid_file."""
if os.path.exists(pid_file):
cros_lib.Warning('Existing %s found. Deleting and killing process' %
pid_file)
cros_lib.RunCommand(['./cros_stop_vm', '--kvm_pid=%s' % pid_file],
cwd=self.crosutilsbin)
assert not os.path.exists(pid_file)
def _AcquireUniquePortAndPidFile(self):
"""Acquires unique ssh port and pid file for VM."""
with VMAUWorker._vm_lock:
self._ssh_port = VMAUWorker._next_port
self._kvm_pid_file = '/tmp/kvm.%d' % self._ssh_port
VMAUWorker._next_port += 1
def CleanUp(self):
"""Stop the vm after a test."""
self._KillExistingVM(self._kvm_pid_file)
def PrepareBase(self, image_path):
"""Creates an update-able VM based on base image."""
self.PrepareVMBase(image_path)
def UpdateImage(self, image_path, src_image_path='', stateful_change='old',
proxy_port='', private_key_path=None):
"""Updates VM image with image_path."""
log_directory = self.GetNextResultsPath('update')
stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
if src_image_path and self._first_update:
src_image_path = self.vm_image_path
self._first_update = False
cmd = ['%s/cros_run_vm_update' % self.crosutilsbin,
'--vm_image_path=%s' % self.vm_image_path,
'--update_log=%s' % os.path.join(log_directory, 'update_engine.log'),
'--snapshot',
self.graphics_flag,
'--persist',
'--kvm_pid=%s' % self._kvm_pid_file,
'--ssh_port=%s' % self._ssh_port,
stateful_change_flag,
]
self.AppendUpdateFlags(cmd, image_path, src_image_path, proxy_port,
private_key_path)
self.RunUpdateCmd(cmd, log_directory)
def UpdateUsingPayload(self, update_path, stateful_change='old',
proxy_port=None):
"""Updates a vm image using cros_run_vm_update."""
log_directory = self.GetNextResultsPath('update')
stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
cmd = ['%s/cros_run_vm_update' % self.crosutilsbin,
'--payload=%s' % update_path,
'--vm_image_path=%s' % self.vm_image_path,
'--update_log=%s' % os.path.join(log_directory, 'update_engine.log'),
'--snapshot',
self.graphics_flag,
'--persist',
'--kvm_pid=%s' % self._kvm_pid_file,
'--ssh_port=%s' % self._ssh_port,
stateful_change_flag,
]
if proxy_port: cmd.append('--proxy_port=%s' % proxy_port)
self.RunUpdateCmd(cmd, log_directory)
def VerifyImage(self, unittest, percent_required_to_pass=100):
"""Runs vm smoke suite to verify image."""
log_directory = self.GetNextResultsPath('verify')
(_, _, log_directory_in_chroot) = log_directory.rpartition('chroot')
# image_to_live already verifies lsb-release matching. This is just
# for additional steps.
commandWithArgs = ['%s/cros_run_vm_test' % self.crosutilsbin,
'--image_path=%s' % self.vm_image_path,
'--snapshot',
'--persist',
'--kvm_pid=%s' % self._kvm_pid_file,
'--ssh_port=%s' % self._ssh_port,
'--results_dir_root=%s' % log_directory_in_chroot,
self.verify_suite,
]
if self.graphics_flag: commandWithArgs.append(self.graphics_flag)
output = cros_lib.RunCommand(commandWithArgs, error_ok=True,
enter_chroot=False, redirect_stdout=True)
return self.AssertEnoughTestsPassed(unittest, output,
percent_required_to_pass)

View File

@ -1 +1 @@
au_test_harness/cros_au_test_harness.py
../../platform/crostestutils/au_test_harness/cros_au_test_harness.py

View File

@ -1 +1 @@
ctest.py
../../platform/crostestutils/ctest/ctest.py

View File

@ -1,320 +0,0 @@
#!/usr/bin/python
#
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Wrapper for tests that are run on builders."""
import fileinput
import optparse
import os
import re
import sys
import traceback
import urllib
import HTMLParser
sys.path.append(os.path.join(os.path.dirname(__file__), '../lib'))
from cros_build_lib import Info
from cros_build_lib import ReinterpretPathForChroot
from cros_build_lib import RunCommand
from cros_build_lib import Warning
_IMAGE_TO_EXTRACT = 'chromiumos_test_image.bin'
_NEW_STYLE_VERSION = '0.9.131.0'
class HTMLDirectoryParser(HTMLParser.HTMLParser):
"""HTMLParser for parsing the default apache file index."""
def __init__(self, regex):
HTMLParser.HTMLParser.__init__(self)
self.regex_object = re.compile(regex)
self.link_list = []
def handle_starttag(self, tag, attrs):
"""Overrides from HTMLParser and is called at the start of every tag.
This implementation grabs attributes from links (i.e. <a ... > </a>
and adds the target from href=<target> if the <target> matches the
regex given at the start.
"""
if not tag.lower() == 'a':
return
for attr in attrs:
if not attr[0].lower() == 'href':
continue
match = self.regex_object.match(attr[1])
if match:
self.link_list.append(match.group(0).rstrip('/'))
def ModifyBootDesc(download_folder, redirect_file=None):
"""Modifies the boot description of a downloaded image to work with path.
The default boot.desc from another system is specific to the directory
it was created in. This modifies the boot description to be compatiable
with the download folder.
Args:
download_folder: Absoulte path to the download folder.
redirect_file: For testing. Where to copy new boot desc.
"""
boot_desc_path = os.path.join(download_folder, 'boot.desc')
in_chroot_folder = ReinterpretPathForChroot(download_folder)
for line in fileinput.input(boot_desc_path, inplace=1):
# Has to be done here to get changes to sys.stdout from fileinput.input.
if not redirect_file:
redirect_file = sys.stdout
split_line = line.split('=')
if len(split_line) > 1:
var_part = split_line[0]
potential_path = split_line[1].replace('"', '').strip()
if potential_path.startswith('/home') and not 'output_dir' in var_part:
new_path = os.path.join(in_chroot_folder,
os.path.basename(potential_path))
new_line = '%s="%s"' % (var_part, new_path)
Info('Replacing line %s with %s' % (line, new_line))
redirect_file.write('%s\n' % new_line)
continue
elif 'output_dir' in var_part:
# Special case for output_dir.
new_line = '%s="%s"' % (var_part, in_chroot_folder)
Info('Replacing line %s with %s' % (line, new_line))
redirect_file.write('%s\n' % new_line)
continue
# Line does not need to be modified.
redirect_file.write(line)
fileinput.close()
def _GreaterVersion(version_a, version_b):
"""Returns the higher version number of two version number strings."""
version_regex = re.compile('.*(\d+)\.(\d+)\.(\d+)\.(\d+).*')
version_a_tokens = version_regex.match(version_a).groups()
version_b_tokens = version_regex.match(version_b).groups()
for i in range(4):
(a, b) = (int(version_a_tokens[i]), int(version_b_tokens[i]))
if a != b:
if a > b: return version_a
return version_b
return version_a
def GetLatestLinkFromPage(url, regex):
"""Returns the latest link from the given url that matches regex.
Args:
url: Url to download and parse.
regex: Regular expression to match links against.
"""
url_file = urllib.urlopen(url)
url_html = url_file.read()
url_file.close()
# Parses links with versions embedded.
url_parser = HTMLDirectoryParser(regex=regex)
url_parser.feed(url_html)
return reduce(_GreaterVersion, url_parser.link_list)
def GetNewestLinkFromZipBase(board, channel, zip_server_base):
"""Returns the url to the newest image from the zip server.
Args:
board: board for the image zip.
channel: channel for the image zip.
zip_server_base: base url for zipped images.
"""
zip_base = os.path.join(zip_server_base, channel, board)
latest_version = GetLatestLinkFromPage(zip_base, '\d+\.\d+\.\d+\.\d+/')
zip_dir = os.path.join(zip_base, latest_version)
zip_name = GetLatestLinkFromPage(zip_dir,
'ChromeOS-\d+\.\d+\.\d+\.\d+-.*\.zip')
return os.path.join(zip_dir, zip_name)
def GetLatestZipUrl(board, channel, latest_url_base, zip_server_base):
"""Returns the url of the latest image zip for the given arguments.
Args:
board: board for the image zip.
channel: channel for the image zip.
latest_url_base: base url for latest links.
zip_server_base: base url for zipped images.
"""
if latest_url_base:
try:
# Grab the latest image info.
latest_file_url = os.path.join(latest_url_base, channel,
'LATEST-%s' % board)
latest_image_file = urllib.urlopen(latest_file_url)
latest_image = latest_image_file.read()
latest_image_file.close()
# Convert bin.gz into zip.
latest_image = latest_image.replace('.bin.gz', '.zip')
version = latest_image.split('-')[1]
zip_base = os.path.join(zip_server_base, channel, board)
return os.path.join(zip_base, version, latest_image)
except IOError:
Warning(('Could not use latest link provided, defaulting to parsing'
' latest from zip url base.'))
try:
return GetNewestLinkFromZipBase(board, channel, zip_server_base)
except:
Warning('Failed to get url from standard zip base. Trying rc.')
return GetNewestLinkFromZipBase(board + '-rc', channel, zip_server_base)
def GrabZipAndExtractImage(zip_url, download_folder, image_name) :
"""Downloads the zip and extracts the given image.
Doesn't re-download if matching version found already in download folder.
Args:
zip_url - url for the image.
download_folder - download folder to store zip file and extracted images.
image_name - name of the image to extract from the zip file.
"""
zip_path = os.path.join(download_folder, 'image.zip')
versioned_url_path = os.path.join(download_folder, 'download_url')
found_cached = False
if os.path.exists(versioned_url_path):
fh = open(versioned_url_path)
version_url = fh.read()
fh.close()
if version_url == zip_url and os.path.exists(os.path.join(download_folder,
image_name)):
Info('Using cached %s' % image_name)
found_cached = True
if not found_cached:
Info('Downloading %s' % zip_url)
RunCommand(['rm', '-rf', download_folder], print_cmd=False)
os.mkdir(download_folder)
urllib.urlretrieve(zip_url, zip_path)
# Using unzip because python implemented unzip in native python so
# extraction is really slow.
Info('Unzipping image %s' % image_name)
RunCommand(['unzip', '-d', download_folder, zip_path],
print_cmd=False, error_message='Failed to download %s' % zip_url)
ModifyBootDesc(download_folder)
# Put url in version file so we don't have to do this every time.
fh = open(versioned_url_path, 'w+')
fh.write(zip_url)
fh.close()
version = zip_url.split('/')[-2]
if not _GreaterVersion(version, _NEW_STYLE_VERSION) == version:
# If the version isn't ready for new style, touch file to use old style.
old_style_touch_path = os.path.join(download_folder, '.use_e1000')
fh = open(old_style_touch_path, 'w+')
fh.close()
def RunAUTestHarness(board, channel, latest_url_base, zip_server_base,
no_graphics, type, remote, clean, test_results_root):
"""Runs the auto update test harness.
The auto update test harness encapsulates testing the auto-update mechanism
for the latest image against the latest official image from the channel. This
also tests images with suite_Smoke (built-in as part of its verification
process).
Args:
board: the board for the latest image.
channel: the channel to run the au test harness against.
latest_url_base: base url for getting latest links.
zip_server_base: base url for zipped images.
no_graphics: boolean - If True, disable graphics during vm test.
type: which test harness to run. Possible values: real, vm.
remote: ip address for real test harness run.
clean: Clean the state of test harness before running.
test_results_root: Root directory to store au_test_harness results.
"""
crosutils_root = os.path.join(os.path.dirname(__file__), '..')
download_folder = os.path.abspath('latest_download')
zip_url = GetLatestZipUrl(board, channel, latest_url_base, zip_server_base)
GrabZipAndExtractImage(zip_url, download_folder, _IMAGE_TO_EXTRACT)
# Tests go here.
latest_image = RunCommand(['./get_latest_image.sh', '--board=%s' % board],
cwd=crosutils_root, redirect_stdout=True,
print_cmd=True).strip()
update_engine_path = os.path.join(crosutils_root, '..', 'platform',
'update_engine')
cmd = ['bin/cros_au_test_harness',
'--base_image=%s' % os.path.join(download_folder,
_IMAGE_TO_EXTRACT),
'--target_image=%s' % os.path.join(latest_image,
_IMAGE_TO_EXTRACT),
'--board=%s' % board,
'--type=%s' % type,
'--remote=%s' % remote,
'--private_key=%s' % os.path.join(update_engine_path,
'unittest_key.pem'),
'--public_key=%s' % os.path.join(update_engine_path,
'unittest_key.pub.pem'),
]
if test_results_root: cmd.append('--test_results_root=%s' % test_results_root)
if no_graphics: cmd.append('--no_graphics')
if clean: cmd.append('--clean')
RunCommand(cmd, cwd=crosutils_root)
def main():
parser = optparse.OptionParser()
parser.add_option('-b', '--board',
help='board for the image to compare against.')
parser.add_option('-c', '--channel',
help='channel for the image to compare against.')
parser.add_option('--cache', default=False, action='store_true',
help='Cache payloads')
parser.add_option('-l', '--latestbase',
help='Base url for latest links.')
parser.add_option('-z', '--zipbase',
help='Base url for hosted images.')
parser.add_option('--no_graphics', action='store_true', default=False,
help='Disable graphics for the vm test.')
parser.add_option('--test_results_root', default=None,
help='Root directory to store test results. Should '
'be defined relative to chroot root.')
parser.add_option('--type', default='vm',
help='type of test to run: [vm, real]. Default: vm.')
parser.add_option('--remote', default='0.0.0.0',
help='For real tests, ip address of the target machine.')
# Set the usage to include flags.
parser.set_usage(parser.format_help())
(options, args) = parser.parse_args()
if args: parser.error('Extra args found %s.' % args)
if not options.board: parser.error('Need board for image to compare against.')
if not options.channel: parser.error('Need channel e.g. dev-channel.')
if not options.zipbase: parser.error('Need zip url base to get images.')
RunAUTestHarness(options.board, options.channel, options.latestbase,
options.zipbase, options.no_graphics, options.type,
options.remote, not options.cache,
options.test_results_root)
if __name__ == '__main__':
main()

View File

@ -1,228 +0,0 @@
#!/usr/bin/python
#
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for ctest."""
import mox
import os
import unittest
import urllib
import ctest
_TEST_BOOT_DESC = """
--arch="x86"
--output_dir="/home/chrome-bot/0.8.70.5-a1"
--espfs_mountpoint="/home/chrome-bot/0.8.70.5-a1/esp"
--enable_rootfs_verification
"""
class CrosTestTest(mox.MoxTestBase):
"""Test class for CTest."""
def setUp(self):
mox.MoxTestBase.setUp(self)
self.board = 'test-board'
self.channel = 'test-channel'
self.version = '1.2.3.4.5'
self.revision = '7ghfa9999-12345'
self.image_name = 'TestOS-%s-%s' % (self.version, self.revision)
self.download_folder = 'test_folder'
self.latestbase = 'http://test-latest/TestOS'
self.zipbase = 'http://test-zips/archive/TestOS'
self.image_url = '%s/%s/%s/%s/%s.zip' % (self.zipbase, self.channel,
self.board, self.version,
self.image_name)
self.test_regex = 'ChromeOS-\d+\.\d+\.\d+\.\d+-.*\.zip'
def testModifyBootDesc(self):
"""Tests to make sure we correctly modify a boot desc."""
in_chroot_path = ctest.ReinterpretPathForChroot(os.path.abspath(
self.download_folder))
self.mox.StubOutWithMock(__builtins__, 'open')
self.mox.StubOutWithMock(ctest.fileinput, 'input')
m_file = self.mox.CreateMock(file)
mock_file = _TEST_BOOT_DESC.splitlines(True)
ctest.fileinput.input('%s/%s' % (os.path.abspath(self.download_folder),
'boot.desc'),
inplace=1).AndReturn(mock_file)
m_file.write('\n')
m_file.write(' --arch="x86"\n')
m_file.write(' --output_dir="%s"\n' % in_chroot_path)
m_file.write(' --espfs_mountpoint="%s/%s"\n' % (in_chroot_path, 'esp'))
m_file.write(' --enable_rootfs_verification\n')
self.mox.ReplayAll()
ctest.ModifyBootDesc(os.path.abspath(self.download_folder), m_file)
self.mox.VerifyAll()
def testGetLatestZipUrl(self):
"""Test case that tests GetLatestZipUrl with test urls."""
self.mox.StubOutWithMock(urllib, 'urlopen')
m_file = self.mox.CreateMock(file)
urllib.urlopen('%s/%s/LATEST-%s' % (self.latestbase, self.channel,
self.board)).AndReturn(m_file)
m_file.read().AndReturn('%s.bin.gz' % self.image_name)
m_file.close()
self.mox.ReplayAll()
self.assertEquals(ctest.GetLatestZipUrl(self.board, self.channel,
self.latestbase, self.zipbase),
self.image_url)
self.mox.VerifyAll()
def testGetLatestZipFromBadUrl(self):
"""Tests whether GetLatestZipUrl returns correct url given bad link."""
self.mox.StubOutWithMock(urllib, 'urlopen')
self.mox.StubOutWithMock(ctest, 'GetNewestLinkFromZipBase')
m_file = self.mox.CreateMock(file)
urllib.urlopen('%s/%s/LATEST-%s' % (self.latestbase, self.channel,
self.board)).AndRaise(IOError('Cannot open url.'))
ctest.GetNewestLinkFromZipBase(self.board, self.channel,
self.zipbase).AndReturn(self.image_url)
self.mox.ReplayAll()
self.assertEquals(ctest.GetLatestZipUrl(self.board, self.channel,
self.latestbase, self.zipbase),
self.image_url)
self.mox.VerifyAll()
def testGrabZipAndExtractImageUseCached(self):
"""Test case where cache holds our image."""
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(__builtins__, 'open')
m_file = self.mox.CreateMock(file)
os.path.exists('%s/%s' % (
self.download_folder, 'download_url')).AndReturn(True)
open('%s/%s' % (self.download_folder, 'download_url')).AndReturn(m_file)
m_file.read().AndReturn(self.image_url)
m_file.close()
os.path.exists('%s/%s' % (
self.download_folder, ctest._IMAGE_TO_EXTRACT)).AndReturn(True)
self.mox.ReplayAll()
ctest.GrabZipAndExtractImage(self.image_url, self.download_folder,
ctest._IMAGE_TO_EXTRACT)
self.mox.VerifyAll()
def CommonDownloadAndExtractImage(self):
"""Common code to mock downloading image, unzipping it and setting url."""
zip_path = os.path.join(self.download_folder, 'image.zip')
m_file = self.mox.CreateMock(file)
ctest.RunCommand(['rm', '-rf', self.download_folder], print_cmd=False)
os.mkdir(self.download_folder)
urllib.urlretrieve(self.image_url, zip_path)
ctest.RunCommand(['unzip', '-d', self.download_folder, zip_path],
print_cmd=False, error_message=mox.IgnoreArg())
ctest.ModifyBootDesc(self.download_folder)
open('%s/%s' % (self.download_folder, 'download_url'),
'w+').AndReturn(m_file)
m_file.write(self.image_url)
m_file.close()
self.mox.ReplayAll()
ctest.GrabZipAndExtractImage(self.image_url, self.download_folder,
ctest._IMAGE_TO_EXTRACT)
self.mox.VerifyAll()
def testGrabZipAndExtractImageNoCache(self):
"""Test case where download_url doesn't exist."""
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(os, 'mkdir')
self.mox.StubOutWithMock(__builtins__, 'open')
self.mox.StubOutWithMock(ctest, 'RunCommand')
self.mox.StubOutWithMock(urllib, 'urlretrieve')
self.mox.StubOutWithMock(ctest, 'ModifyBootDesc')
m_file = self.mox.CreateMock(file)
os.path.exists('%s/%s' % (
self.download_folder, 'download_url')).AndReturn(False)
self.CommonDownloadAndExtractImage()
def testGrabZipAndExtractImageWrongCache(self):
"""Test case where download_url exists but doesn't match our url."""
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(os, 'mkdir')
self.mox.StubOutWithMock(__builtins__, 'open')
self.mox.StubOutWithMock(ctest, 'RunCommand')
self.mox.StubOutWithMock(urllib, 'urlretrieve')
self.mox.StubOutWithMock(ctest, 'ModifyBootDesc')
m_file = self.mox.CreateMock(file)
os.path.exists('%s/%s' % (
self.download_folder, 'download_url')).AndReturn(True)
open('%s/%s' % (self.download_folder, 'download_url')).AndReturn(m_file)
m_file.read().AndReturn(self.image_url)
m_file.close()
os.path.exists('%s/%s' % (
self.download_folder, ctest._IMAGE_TO_EXTRACT)).AndReturn(False)
self.CommonDownloadAndExtractImage()
def testGetLatestLinkFromPage(self):
"""Tests whether we get the latest link from a url given a regex."""
test_url = 'test_url'
test_html = """
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<html>
<body>
<h1>Test Index</h1>
<a href="ZsomeCruft">Cruft</a>
<a href="YotherCruft">Cruft</a>
<a href="ChromeOS-0.9.12.4-blahblah.zip">testlink1/</a>
<a href="ChromeOS-0.9.12.4-blahblah.zip.other/">testlink2/</a>
<a href="ChromeOS-Factory-0.9.12.4-blahblah.zip/">testlink3/</a>
</body></html>
"""
self.mox.StubOutWithMock(urllib, 'urlopen')
m_file = self.mox.CreateMock(file)
urllib.urlopen(test_url).AndReturn(m_file)
m_file.read().AndReturn(test_html)
m_file.close()
self.mox.ReplayAll()
latest_link = ctest.GetLatestLinkFromPage(test_url, regex=self.test_regex)
self.assertTrue(latest_link == 'ChromeOS-0.9.12.4-blahblah.zip')
self.mox.VerifyAll()
class HTMLDirectoryParserTest(unittest.TestCase):
"""Test class for HTMLDirectoryParser."""
def setUp(self):
self.test_regex = '\d+\.\d+\.\d+\.\d+/'
def testHandleStarttagGood(self):
parser = ctest.HTMLDirectoryParser(regex=self.test_regex)
parser.handle_starttag('a', [('href', '0.9.74.1/')])
self.assertTrue('0.9.74.1' in parser.link_list)
def testHandleStarttagBad(self):
parser = ctest.HTMLDirectoryParser(regex=self.test_regex)
parser.handle_starttag('a', [('href', 'ZsomeCruft/')])
self.assertTrue('ZsomeCruft' not in parser.link_list)
if __name__ == '__main__':
unittest.main()

View File

@ -11,8 +11,8 @@ import subprocess
import sys
_STDOUT_IS_TTY = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
CROSUTILS_DIRECTORY = os.path.realpath(os.path.dirname(os.path.dirname(
__file__)))
CROSUTILS_DIRECTORY = os.path.dirname(os.path.dirname(
os.path.realpath(__file__)))
# TODO(sosa): Move logging to logging module.