Revert "Passes cache location to tests and runs the tests in parallel."

This reverts commit be787f3525a47fb5baac5cb50c0f13972bc7cfb6.

Revert "Fix sudo again."

This reverts commit 0f411ec7d9bf09ee008544fdc52751b42970a611.

TBR dgarrett

BUG=
TEST=

Review URL: http://codereview.chromium.org/6348022

Change-Id: I2fb9ca4967ff173c17840b96eb68b02ff2b7774a
This commit is contained in:
Chris Sosa 2011-01-25 18:16:55 -08:00
parent 0f411ec7d9
commit 404bfdf073
5 changed files with 68 additions and 241 deletions

View File

@ -15,7 +15,6 @@
import optparse import optparse
import os import os
import re import re
import subprocess
import sys import sys
import threading import threading
import time import time
@ -24,7 +23,6 @@ import urllib
sys.path.append(os.path.join(os.path.dirname(__file__), '../lib')) sys.path.append(os.path.join(os.path.dirname(__file__), '../lib'))
from cros_build_lib import Die from cros_build_lib import Die
from cros_build_lib import GetIPAddress
from cros_build_lib import Info from cros_build_lib import Info
from cros_build_lib import ReinterpretPathForChroot from cros_build_lib import ReinterpretPathForChroot
from cros_build_lib import RunCommand from cros_build_lib import RunCommand
@ -33,8 +31,6 @@ from cros_build_lib import Warning
import cros_test_proxy import cros_test_proxy
global dev_server_cache
class UpdateException(Exception): class UpdateException(Exception):
"""Exception thrown when _UpdateImage or _UpdateUsingPayload fail""" """Exception thrown when _UpdateImage or _UpdateUsingPayload fail"""
@ -473,14 +469,12 @@ class RealAUTest(unittest.TestCase, AUTest):
class VirtualAUTest(unittest.TestCase, AUTest): class VirtualAUTest(unittest.TestCase, AUTest):
"""Test harness for updating virtual machines.""" """Test harness for updating virtual machines."""
vm_image_path = None
# VM Constants. # VM Constants.
_FULL_VDISK_SIZE = 6072 _FULL_VDISK_SIZE = 6072
_FULL_STATEFULFS_SIZE = 3074 _FULL_STATEFULFS_SIZE = 3074
_KVM_PID_FILE = '/tmp/harness_pid'
# Class variables used to acquire individual VM variables per test.
_vm_lock = threading.Lock()
_next_port = 9222
def _KillExistingVM(self, pid_file): def _KillExistingVM(self, pid_file):
if os.path.exists(pid_file): if os.path.exists(pid_file):
@ -491,22 +485,10 @@ class VirtualAUTest(unittest.TestCase, AUTest):
assert not os.path.exists(pid_file) assert not os.path.exists(pid_file)
def _AcquireUniquePortAndPidFile(self):
"""Acquires unique ssh port and pid file for VM."""
with VirtualAUTest._vm_lock:
self._ssh_port = VirtualAUTest._next_port
self._kvm_pid_file = '/tmp/kvm.%d' % self._ssh_port
VirtualAUTest._next_port += 1
def setUp(self): def setUp(self):
"""Unit test overriden method. Is called before every test.""" """Unit test overriden method. Is called before every test."""
AUTest.setUp(self) AUTest.setUp(self)
self.vm_image_path = None self._KillExistingVM(self._KVM_PID_FILE)
self._AcquireUniquePortAndPidFile()
self._KillExistingVM(self._kvm_pid_file)
def tearDown(self):
self._KillExistingVM(self._kvm_pid_file)
@classmethod @classmethod
def ProcessOptions(cls, parser, options): def ProcessOptions(cls, parser, options):
@ -545,42 +527,26 @@ class VirtualAUTest(unittest.TestCase, AUTest):
self.assertTrue(os.path.exists(self.vm_image_path)) self.assertTrue(os.path.exists(self.vm_image_path))
def _UpdateImage(self, image_path, src_image_path='', stateful_change='old', def _UpdateImage(self, image_path, src_image_path='', stateful_change='old',
proxy_port=''): proxy_port=None):
"""Updates VM image with image_path.""" """Updates VM image with image_path."""
stateful_change_flag = self.GetStatefulChangeFlag(stateful_change) stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
if src_image_path and self._first_update: if src_image_path and self._first_update:
src_image_path = self.vm_image_path src_image_path = self.vm_image_path
self._first_update = False self._first_update = False
# Check image payload cache first. cmd = ['%s/cros_run_vm_update' % self.crosutilsbin,
update_id = _GenerateUpdateId(target=image_path, src=src_image_path) '--update_image_path=%s' % image_path,
cache_path = dev_server_cache[update_id] '--vm_image_path=%s' % self.vm_image_path,
if cache_path: '--snapshot',
Info('Using cache %s' % cache_path) self.graphics_flag,
update_url = DevServerWrapper.GetDevServerURL(proxy_port, cache_path) '--persist',
cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, '--kvm_pid=%s' % self._KVM_PID_FILE,
'--vm_image_path=%s' % self.vm_image_path, stateful_change_flag,
'--snapshot', '--src_image=%s' % src_image_path,
self.graphics_flag, ]
'--persist',
'--kvm_pid=%s' % self._kvm_pid_file, if proxy_port:
'--ssh_port=%s' % self._ssh_port, cmd.append('--proxy_port=%s' % proxy_port)
stateful_change_flag,
'--update_url=%s' % update_url,
]
else:
cmd = ['%s/cros_run_vm_update' % self.crosutilsbin,
'--update_image_path=%s' % image_path,
'--vm_image_path=%s' % self.vm_image_path,
'--snapshot',
self.graphics_flag,
'--persist',
'--kvm_pid=%s' % self._kvm_pid_file,
'--ssh_port=%s' % self._ssh_port,
stateful_change_flag,
'--src_image=%s' % src_image_path,
'--proxy_port=%s' % proxy_port
]
if self.verbose: if self.verbose:
try: try:
@ -602,8 +568,7 @@ class VirtualAUTest(unittest.TestCase, AUTest):
'--snapshot', '--snapshot',
self.graphics_flag, self.graphics_flag,
'--persist', '--persist',
'--kvm_pid=%s' % self._kvm_pid_file, '--kvm_pid=%s' % self._KVM_PID_FILE,
'--ssh_port=%s' % self._ssh_port,
stateful_change_flag, stateful_change_flag,
] ]
@ -629,8 +594,7 @@ class VirtualAUTest(unittest.TestCase, AUTest):
'--image_path=%s' % self.vm_image_path, '--image_path=%s' % self.vm_image_path,
'--snapshot', '--snapshot',
'--persist', '--persist',
'--kvm_pid=%s' % self._kvm_pid_file, '--kvm_pid=%s' % self._KVM_PID_FILE,
'--ssh_port=%s' % self._ssh_port,
self.verify_suite, self.verify_suite,
] ]
@ -649,15 +613,15 @@ class GenerateVirtualAUDeltasTest(VirtualAUTest):
def setUp(self): def setUp(self):
AUTest.setUp(self) AUTest.setUp(self)
def tearDown(self):
pass
def _UpdateImage(self, image_path, src_image_path='', stateful_change='old', def _UpdateImage(self, image_path, src_image_path='', stateful_change='old',
proxy_port=None): proxy_port=None):
if src_image_path and self._first_update: if src_image_path and self._first_update:
src_image_path = self.vm_image_path src_image_path = self.vm_image_path
self._first_update = False self._first_update = False
image_path = ReinterpretPathForChroot(image_path)
if src_image_path:
src_image_path = ReinterpretPathForChroot(src_image_path)
if not self.delta_list.has_key(image_path): if not self.delta_list.has_key(image_path):
self.delta_list[image_path] = set([src_image_path]) self.delta_list[image_path] = set([src_image_path])
else: else:
@ -671,98 +635,33 @@ class GenerateVirtualAUDeltasTest(VirtualAUTest):
class ParallelJob(threading.Thread): class ParallelJob(threading.Thread):
"""Small wrapper for threading. Thread that releases a semaphores on exit.""" """Small wrapper for threading.Thread that releases a semaphore on exit."""
def __init__(self, semaphore, target, args):
def __init__(self, starting_semaphore, ending_semaphore, target, args):
"""Initializes an instance of a job.
Args:
starting_semaphore: Semaphore used by caller to wait on such that
there isn't more than a certain number of threads running. Should
be initialized to a value for the number of threads wanting to be run
at a time.
ending_semaphore: Semaphore is released every time a job ends. Should be
initialized to 0 before starting first job. Should be acquired once for
each job. Threading.Thread.join() has a bug where if the run function
terminates too quickly join() will hang forever.
target: The func to run.
args: Args to pass to the fun.
"""
threading.Thread.__init__(self, target=target, args=args) threading.Thread.__init__(self, target=target, args=args)
self._target = target self._target = target
self._args = args self._args = args
self._starting_semaphore = starting_semaphore self._semaphore = semaphore
self._ending_semaphore = ending_semaphore
self._output = None self._output = None
self._completed = False self._completed = False
def run(self): def run(self):
"""Thread override. Runs the method specified and sets output."""
try: try:
self._output = self._target(*self._args) threading.Thread.run(self)
finally: finally:
# From threading.py to avoid a refcycle.
del self._target, self._args
# Our own clean up.
self._Cleanup() self._Cleanup()
self._completed = True self._completed = True
def GetOutput(self): def GetOutput(self):
"""Returns the output of the method run."""
assert self._completed, 'GetOutput called before thread was run.' assert self._completed, 'GetOutput called before thread was run.'
return self._output return self._output
def _Cleanup(self): def _Cleanup(self):
"""Releases semaphores for a waiting caller.""" self._semaphore.release()
self._starting_semaphore.release()
self._ending_semaphore.release()
def __str__(self): def __str__(self):
return '%s(%s)' % (self._target, self._args) return '%s(%s)' % (self._target, self._args)
class DevServerWrapper(threading.Thread):
"""A Simple wrapper around a devserver instance."""
def __init__(self):
self.proc = None
threading.Thread.__init__(self)
def run(self):
# Kill previous running instance of devserver if it exists.
RunCommand(['sudo', 'pkill', '-f', 'devserver.py', ], error_ok=True,
print_cmd=False)
self.proc = subprocess.Popen(['sudo',
'./start_devserver',
'--archive_dir=./static',
'--client_prefix=ChromeOSUpdateEngine',
'--production',
])
self.proc.communicate()
def Stop(self):
"""Kills the devserver instance."""
self.proc.kill()
@classmethod
def GetDevServerURL(cls, port, sub_dir):
"""Returns the dev server url for a given port and sub directory."""
ip_addr = GetIPAddress()
if not port: port = 8080
url = 'http://%(ip)s:%(port)s/%(dir)s' % {'ip': ip_addr,
'port': str(port),
'dir': sub_dir}
return url
def _GenerateUpdateId(target, src):
"""Returns a simple representation id of target and src paths."""
if src:
return '%s->%s' % (target, src)
else:
return target
def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args): def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args):
"""Runs set number of specified jobs in parallel. """Runs set number of specified jobs in parallel.
@ -777,39 +676,29 @@ def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args):
return (x, y) return (x, y)
threads = [] threads = []
job_start_semaphore = threading.Semaphore(number_of_sumultaneous_jobs) job_pool_semaphore = threading.Semaphore(number_of_sumultaneous_jobs)
join_semaphore = threading.Semaphore(0)
assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' assert len(jobs) == len(jobs_args), 'Length of args array is wrong.'
# Create the parallel jobs. # Create the parallel jobs.
for job, args in map(_TwoTupleize, jobs, jobs_args): for job, args in map(_TwoTupleize, jobs, jobs_args):
thread = ParallelJob(job_start_semaphore, join_semaphore, target=job, thread = ParallelJob(job_pool_semaphore, target=job, args=args)
args=args)
threads.append(thread) threads.append(thread)
# We use a semaphore to ensure we don't run more jobs that required. # We use a semaphore to ensure we don't run more jobs that required.
# After each thread finishes, it releases (increments semaphore). # After each thread finishes, it releases (increments semaphore).
# Acquire blocks of num jobs reached and continues when a thread finishes. # Acquire blocks of num jobs reached and continues when a thread finishes.
for next_thread in threads: for next_thread in threads:
job_start_semaphore.acquire(blocking=True) job_pool_semaphore.acquire(blocking=True)
Info('Starting job %s' % next_thread) Info('Starting %s' % next_thread)
next_thread.start() next_thread.start()
# Wait on the rest of the threads to finish. # Wait on the rest of the threads to finish.
for thread in threads: for thread in threads:
join_semaphore.acquire(blocking=True) thread.join()
return [thread.GetOutput() for thread in threads] return [thread.GetOutput() for thread in threads]
def _PrepareTestSuite(parser, options, test_class):
"""Returns a prepared test suite given by the options and test class."""
test_class.ProcessOptions(parser, options)
test_loader = unittest.TestLoader()
test_loader.testMethodPrefix = options.test_prefix
return test_loader.loadTestsFromTestCase(test_class)
def _PregenerateUpdates(parser, options): def _PregenerateUpdates(parser, options):
"""Determines all deltas that will be generated and generates them. """Determines all deltas that will be generated and generates them.
@ -819,80 +708,41 @@ def _PregenerateUpdates(parser, options):
parser: parser from main. parser: parser from main.
options: options from parsed parser. options: options from parsed parser.
Returns: Returns:
Dictionary of Update Identifiers->Relative cache locations. Array of output from generating updates.
""" """
def _GenerateVMUpdate(target, src): def _GenerateVMUpdate(target, src):
"""Generates an update using the devserver.""" """Generates an update using the devserver."""
target = ReinterpretPathForChroot(target) RunCommand(['sudo',
if src: './start_devserver',
src = ReinterpretPathForChroot(src) '--pregenerate_update',
'--exit',
return RunCommand(['sudo', '--image=%s' % target,
'./start_devserver', '--src_image=%s' % src,
'--pregenerate_update', '--for_vm'
'--exit', ], enter_chroot=True)
'--image=%s' % target,
'--src_image=%s' % src,
'--for_vm',
], redirect_stdout=True, enter_chroot=True,
print_cmd=False)
# Get the list of deltas by mocking out update method in test class. # Get the list of deltas by mocking out update method in test class.
test_suite = _PrepareTestSuite(parser, options, GenerateVirtualAUDeltasTest) GenerateVirtualAUDeltasTest.ProcessOptions(parser, options)
test_loader = unittest.TestLoader()
test_loader.testMethodPrefix = options.test_prefix
test_suite = test_loader.loadTestsFromTestCase(GenerateVirtualAUDeltasTest)
test_result = unittest.TextTestRunner(verbosity=0).run(test_suite) test_result = unittest.TextTestRunner(verbosity=0).run(test_suite)
Info('The following delta updates are required.') Info('The following delta updates are required.')
update_ids = []
jobs = [] jobs = []
args = [] args = []
for target, srcs in GenerateVirtualAUDeltasTest.delta_list.items(): for target, srcs in GenerateVirtualAUDeltasTest.delta_list.items():
for src in srcs: for src in srcs:
update_id = _GenerateUpdateId(target=target, src=src) if src:
print >> sys.stderr, 'AU: %s' % update_id print >> sys.stderr, 'DELTA AU %s -> %s' % (src, target)
update_ids.append(update_id) else:
print >> sys.stderr, 'FULL AU %s' % target
jobs.append(_GenerateVMUpdate) jobs.append(_GenerateVMUpdate)
args.append((target, src)) args.append((target, src))
raw_results = _RunParallelJobs(options.jobs, jobs, args) results = _RunParallelJobs(options.jobs, jobs, args)
results = [] return results
# Parse the output.
key_line_re = re.compile('^PREGENERATED_UPDATE=([\w/.]+)')
for result in raw_results:
for line in result.splitlines():
match = key_line_re.search(line)
if match:
# Convert blah/blah/update.gz -> update/blah/blah.
path_to_update_gz = match.group(1).rstrip()
(path_to_update_dir, _, _) = path_to_update_gz.rpartition('/update.gz')
results.append('/'.join(['update', path_to_update_dir]))
break
assert len(raw_results) == len(results), \
'Insufficient number cache directories returned.'
# Build the dictionary from our id's and returned cache paths.
cache_dictionary = {}
for index, id in enumerate(update_ids):
cache_dictionary[id] = results[index]
return cache_dictionary
def _RunTestsInParallel(parser, options, test_class):
"""Runs the tests given by the options and test_class in parallel."""
threads = []
args = []
test_suite = _PrepareTestSuite(parser, options, test_class)
for test in test_suite:
test_name = test.id()
test_case = unittest.TestLoader().loadTestsFromName(test_name)
threads.append(unittest.TextTestRunner().run)
args.append(test_case)
results = _RunParallelJobs(options.jobs, threads, args)
if not (test_result.wasSuccessful() for test_result in results):
Die('Test harness was not successful')
def main(): def main():
@ -927,28 +777,22 @@ def main():
if leftover_args: if leftover_args:
parser.error('Found extra options we do not support: %s' % leftover_args) parser.error('Found extra options we do not support: %s' % leftover_args)
# Figure out the test_class.
if options.type == 'vm': test_class = VirtualAUTest if options.type == 'vm': test_class = VirtualAUTest
elif options.type == 'real': test_class = RealAUTest elif options.type == 'real': test_class = RealAUTest
else: parser.error('Could not parse harness type %s.' % options.type) else: parser.error('Could not parse harness type %s.' % options.type)
# TODO(sosa): Caching doesn't really make sense on non-vm images (yet). # TODO(sosa): Caching doesn't really make sense on non-vm images (yet).
global dev_server_cache if options.type == 'vm':
if options.type == 'vm' and options.jobs > 1: _PregenerateUpdates(parser, options)
dev_server_cache = _PregenerateUpdates(parser, options)
my_server = DevServerWrapper()
my_server.start()
try:
_RunTestsInParallel(parser, options, test_class)
finally:
my_server.Stop()
else: # Run the test suite.
dev_server_cache = None test_class.ProcessOptions(parser, options)
test_suite = _PrepareTestSuite(parser, options, test_class) test_loader = unittest.TestLoader()
test_result = unittest.TextTestRunner(verbosity=2).run(test_suite) test_loader.testMethodPrefix = options.test_prefix
if not test_result.wasSuccessful(): test_suite = test_loader.loadTestsFromTestCase(test_class)
Die('Test harness was not successful.') test_result = unittest.TextTestRunner(verbosity=2).run(test_suite)
if not test_result.wasSuccessful():
Die('Test harness was not successful.')
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -16,7 +16,6 @@ DEFINE_string src_image "" \
"Create a delta update by passing in the image on the remote machine." "Create a delta update by passing in the image on the remote machine."
DEFINE_string stateful_update_flag "" "Flags to pass to stateful update." s DEFINE_string stateful_update_flag "" "Flags to pass to stateful update." s
DEFINE_string update_image_path "" "Path of the image to update to." u DEFINE_string update_image_path "" "Path of the image to update to." u
DEFINE_string update_url "" "Full url of an update image."
DEFINE_string vm_image_path "" "Path of the VM image to update from." v DEFINE_string vm_image_path "" "Path of the VM image to update from." v
set -e set -e
@ -25,6 +24,8 @@ set -e
FLAGS "$@" || exit 1 FLAGS "$@" || exit 1
eval set -- "${FLAGS_ARGV}" eval set -- "${FLAGS_ARGV}"
[ -n "${FLAGS_update_image_path}" ] || [ -n "${FLAGS_payload}" ] || \
die "You must specify a path to an image to use as an update."
[ -n "${FLAGS_vm_image_path}" ] || \ [ -n "${FLAGS_vm_image_path}" ] || \
die "You must specify a path to a vm image." die "You must specify a path to a vm image."
@ -44,12 +45,11 @@ if [ -n "${FLAGS_proxy_port}" ]; then
fi fi
$(dirname $0)/../image_to_live.sh \ $(dirname $0)/../image_to_live.sh \
--for_vm \
--remote=127.0.0.1 \ --remote=127.0.0.1 \
--ssh_port=${FLAGS_ssh_port} \ --ssh_port=${FLAGS_ssh_port} \
--stateful_update_flag=${FLAGS_stateful_update_flag} \ --stateful_update_flag=${FLAGS_stateful_update_flag} \
--src_image="${FLAGS_src_image}" \ --src_image="${FLAGS_src_image}" \
--update_url="${FLAGS_update_url}" \
--verify \ --verify \
--for_vm \
${IMAGE_ARGS} ${IMAGE_ARGS}

View File

@ -25,7 +25,6 @@ DEFINE_boolean update_known_hosts ${FLAGS_FALSE} \
"Update your known_hosts with the new remote instance's key." "Update your known_hosts with the new remote instance's key."
DEFINE_string update_log "update_engine.log" \ DEFINE_string update_log "update_engine.log" \
"Path to log for the update_engine." "Path to log for the update_engine."
DEFINE_string update_url "" "Full url of an update image."
DEFINE_boolean verify ${FLAGS_TRUE} "Verify image on device after update." DEFINE_boolean verify ${FLAGS_TRUE} "Verify image on device after update."
# Flags for devserver. # Flags for devserver.
@ -45,6 +44,7 @@ DEFINE_string src_image "" \
"Create a delta update by passing in the image on the remote machine." "Create a delta update by passing in the image on the remote machine."
DEFINE_boolean update_stateful ${FLAGS_TRUE} \ DEFINE_boolean update_stateful ${FLAGS_TRUE} \
"Perform update of stateful partition e.g. /var /usr/local." "Perform update of stateful partition e.g. /var /usr/local."
DEFINE_string update_url "" "Full url of an update image."
# Flags for stateful update. # Flags for stateful update.
DEFINE_string stateful_update_flag "" \ DEFINE_string stateful_update_flag "" \

View File

@ -6,7 +6,6 @@
import inspect import inspect
import os import os
import re
import subprocess import subprocess
import sys import sys
@ -246,21 +245,3 @@ def ReinterpretPathForChroot(path):
new_path = os.path.join('/home', os.getenv('USER'), 'trunk', relative_path) new_path = os.path.join('/home', os.getenv('USER'), 'trunk', relative_path)
return new_path return new_path
def GetIPAddress(device='eth0'):
"""Returns the IP Address for a given device using ifconfig.
socket.gethostname() is insufficient for machines where the host files are
not set up "correctly." Since some of our builders may have this issue,
this method gives you a generic way to get the address so you are reachable
either via a VM or remote machine on the same network.
"""
ifconfig_output = RunCommand(['ifconfig', device], redirect_stdout=True,
print_cmd=False)
match = re.search('.*inet addr:(\d+\.\d+\.\d+\.\d+).*', ifconfig_output)
if match:
return match.group(1)
else:
Warning('Failed to find ip address in %s' % ifconfig_output)
return None

View File

@ -38,6 +38,8 @@ function blocking_kill() {
! ps -p ${1} > /dev/null ! ps -p ${1} > /dev/null
} }
# TODO(rtc): These flags assume that we'll be using KVM on Lucid and won't work
# on Hardy.
# $1: Path to the virtual image to start. # $1: Path to the virtual image to start.
function start_kvm() { function start_kvm() {
# Override default pid file. # Override default pid file.