Remove test keys from images we test with in the test harness.

Cleaned up _InsertKeyIntoImage to be more general (able to remove key if no key set).  Also made it much more intelligent ... only doing things when it needs to.

Moved mount / unmount code to cros_lib.

BUG=chromium-os:12684
TEST=Ran it a lot ... inspected images after and saw tests pass.

Review URL: http://codereview.chromium.org/6698017

Change-Id: Icd96d1178eeabf45a2d5916fbcab8bf7ffba7e21
This commit is contained in:
Chris Sosa 2011-03-16 16:07:32 -07:00
parent 3b11b450ea
commit 31b641ec3c
6 changed files with 169 additions and 77 deletions

View File

@ -25,6 +25,7 @@ class AUTest(unittest.TestCase):
See documentation for au_worker for more information.
"""
test_results_root = None
public_key_managers = []
@classmethod
def ProcessOptions(cls, options, use_dummy_worker):

View File

@ -16,7 +16,6 @@ import optparse
import os
import re
import sys
import tempfile
import unittest
sys.path.append(os.path.join(os.path.dirname(__file__), '../lib'))
@ -27,9 +26,9 @@ import au_worker
import dummy_au_worker
import dev_server_wrapper
import parallel_test_job
import public_key_manager
import update_exception
def _PrepareTestSuite(options, use_dummy_worker=False):
"""Returns a prepared test suite given by the options and test class."""
au_test.AUTest.ProcessOptions(options, use_dummy_worker)
@ -82,9 +81,12 @@ def _PregenerateUpdates(options):
update_ids = []
jobs = []
args = []
modified_images = set()
for target, srcs in dummy_au_worker.DummyAUWorker.delta_list.items():
modified_images.add(target)
for src_key in srcs:
(src, _ , key) = src_key.partition('+')
if src: modified_images.add(src)
# TODO(sosa): Add private key as part of caching name once devserver can
# handle it its own cache.
update_id = dev_server_wrapper.GenerateUpdateId(target, src, key)
@ -93,6 +95,17 @@ def _PregenerateUpdates(options):
jobs.append(_GenerateVMUpdate)
args.append((target, src, key))
# Always add the base image path. This is only useful for non-delta updates.
modified_images.add(options.base_image)
# Add public key to all images we are using.
if options.public_key:
cros_lib.Info('Adding public keys to images for testing.')
for image in modified_images:
manager = public_key_manager.PublicKeyManager(image, options.public_key)
manager.AddKeyToImage()
au_test.AUTest.public_key_managers.append(manager)
raw_results = parallel_test_job.RunParallelJobs(options.jobs, jobs, args,
print_status=True)
results = []
@ -137,7 +150,7 @@ def _RunTestsInParallel(options):
for test in test_suite:
test_name = test.id()
test_case = unittest.TestLoader().loadTestsFromName(test_name)
threads.append(unittest.TextTestRunner().run)
threads.append(unittest.TextTestRunner(verbosity=2).run)
args.append(test_case)
results = parallel_test_job.RunParallelJobs(options.jobs, threads, args,
@ -147,52 +160,6 @@ def _RunTestsInParallel(options):
cros_lib.Die('Test harness was not successful')
def _InsertPublicKeyIntoImage(image_path, key_path):
"""Inserts public key into image @ static update_engine location."""
from_dir = os.path.dirname(image_path)
image = os.path.basename(image_path)
crosutils_dir = os.path.abspath(__file__).rsplit('/', 2)[0]
target_key_path = 'usr/share/update_engine/update-payload-key.pub.pem'
# Temporary directories for this function.
rootfs_dir = tempfile.mkdtemp(suffix='rootfs', prefix='tmp')
stateful_dir = tempfile.mkdtemp(suffix='stateful', prefix='tmp')
cros_lib.Info('Copying %s into %s' % (key_path, image_path))
try:
cros_lib.RunCommand(['./mount_gpt_image.sh',
'--from=%s' % from_dir,
'--image=%s' % image,
'--rootfs_mountpt=%s' % rootfs_dir,
'--stateful_mountpt=%s' % stateful_dir,
], print_cmd=False, redirect_stdout=True,
redirect_stderr=True, cwd=crosutils_dir)
path = os.path.join(rootfs_dir, target_key_path)
dir_path = os.path.dirname(path)
cros_lib.RunCommand(['sudo', 'mkdir', '--parents', dir_path],
print_cmd=False)
cros_lib.RunCommand(['sudo', 'cp', '--force', '-p', key_path, path],
print_cmd=False)
finally:
# Unmount best effort regardless.
cros_lib.RunCommand(['./mount_gpt_image.sh',
'--unmount',
'--rootfs_mountpt=%s' % rootfs_dir,
'--stateful_mountpt=%s' % stateful_dir,
], print_cmd=False, redirect_stdout=True,
redirect_stderr=True, cwd=crosutils_dir)
# Clean up our directories.
os.rmdir(rootfs_dir)
os.rmdir(stateful_dir)
cros_lib.RunCommand(['bin/cros_make_image_bootable',
cros_lib.ReinterpretPathForChroot(from_dir),
image],
print_cmd=False, redirect_stdout=True,
redirect_stderr=True, enter_chroot=True,
cwd=crosutils_dir)
def _CleanPreviousWork(options):
"""Cleans up previous work from the devserver cache and local image cache."""
cros_lib.Info('Cleaning up previous work.')
@ -257,9 +224,6 @@ def main():
cros_lib.Info('Base image not specified. Using target as base image.')
options.base_image = options.target_image
# Sanity checks on keys and insert them onto the image. The caches must be
# cleaned so we know that the vm images and payloads match the possibly new
# key.
if options.private_key or options.public_key:
error_msg = ('Could not find %s key. Both private and public keys must be '
'specified if either is specified.')
@ -267,10 +231,6 @@ def main():
error_msg % 'private'
assert options.public_key and os.path.exists(options.public_key), \
error_msg % 'public'
_InsertPublicKeyIntoImage(options.target_image, options.public_key)
if options.target_image != options.base_image:
_InsertPublicKeyIntoImage(options.base_image, options.public_key)
options.clean = True
# Clean up previous work if requested.
if options.clean: _CleanPreviousWork(options)
@ -279,24 +239,38 @@ def main():
if not os.path.exists(options.test_results_root):
os.makedirs(options.test_results_root)
# Generate cache of updates to use during test harness.
update_cache = _PregenerateUpdates(options)
au_worker.AUWorker.SetUpdateCache(update_cache)
my_server = dev_server_wrapper.DevServerWrapper(
au_test.AUTest.test_results_root)
my_server.start()
# Pre-generate update modifies images by adding public keys to them.
# Wrap try to make sure we clean this up before we're done.
try:
if options.type == 'vm':
_RunTestsInParallel(options)
else:
# TODO(sosa) - Take in a machine pool for a real test.
# Can't run in parallel with only one remote device.
test_suite = _PrepareTestSuite(options)
test_result = unittest.TextTestRunner(verbosity=2).run(test_suite)
if not test_result.wasSuccessful(): cros_lib.Die('Test harness failed.')
# Generate cache of updates to use during test harness.
update_cache = _PregenerateUpdates(options)
au_worker.AUWorker.SetUpdateCache(update_cache)
my_server = dev_server_wrapper.DevServerWrapper(
au_test.AUTest.test_results_root)
my_server.start()
try:
if options.type == 'vm':
_RunTestsInParallel(options)
else:
# TODO(sosa) - Take in a machine pool for a real test.
# Can't run in parallel with only one remote device.
test_suite = _PrepareTestSuite(options)
test_result = unittest.TextTestRunner(verbosity=2).run(test_suite)
if not test_result.wasSuccessful(): cros_lib.Die('Test harness failed.')
finally:
my_server.Stop()
finally:
my_server.Stop()
# Un-modify any target images we modified. We don't need to un-modify
# non-targets because they aren't important for archival steps.
if options.public_key:
cros_lib.Info('Cleaning up. Removing keys added as part of testing.')
target_directory = os.path.dirname(options.target_image)
for key_manager in au_test.AUTest.public_key_managers:
if key_manager.image_path.startswith(target_directory):
key_manager.RemoveKeyFromImage()
if __name__ == '__main__':

View File

@ -13,7 +13,7 @@ import cros_build_lib as cros_lib
def GenerateUpdateId(target, src, key):
"""Returns a simple representation id of target and src paths."""
update_id = target
if src: update_id = '->'.join([update_id, src])
if src: update_id = '->'.join([src, update_id])
if key: update_id = '+'.join([update_id, key])
return update_id

View File

@ -54,7 +54,6 @@ class ParallelJob(threading.Thread):
def _Cleanup(self):
"""Releases semaphores for a waiting caller."""
cros_lib.Info('Completed job %s' % self)
self._starting_semaphore.release()
self._ending_semaphore.release()
@ -89,7 +88,7 @@ def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args,
threads.append(thread)
# Cache sudo access.
cros_lib.RunCommand(['sudo', 'echo', 'Starting test harness'],
cros_lib.RunCommand(['sudo', 'echo', 'Caching sudo credentials'],
print_cmd=False, redirect_stdout=True,
redirect_stderr=True)
@ -98,11 +97,9 @@ def RunParallelJobs(number_of_simultaneous_jobs, jobs, jobs_args,
# Acquire blocks of num jobs reached and continues when a thread finishes.
for next_thread in threads:
job_start_semaphore.acquire(blocking=True)
cros_lib.Info('Starting job %s' % next_thread)
next_thread.start()
# Wait on the rest of the threads to finish.
cros_lib.Info('Waiting for threads to complete.')
for thread in threads:
while not join_semaphore.acquire(blocking=False):
time.sleep(5)

View File

@ -0,0 +1,91 @@
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module manages interactions between an image and a public key."""
import os
import tempfile
import cros_build_lib as cros_lib
class PublicKeyManager(object):
"""Class wrapping interactions with a public key on an image."""
TARGET_KEY_PATH = 'usr/share/update_engine/update-payload-key.pub.pem'
def __init__(self, image_path, key_path):
"""Initializes a manager with image_path and key_path we plan to insert."""
self.image_path = image_path
self.key_path = key_path
self._rootfs_dir = tempfile.mkdtemp(suffix='rootfs', prefix='tmp')
self._stateful_dir = tempfile.mkdtemp(suffix='stateful', prefix='tmp')
# Gather some extra information about the image.
try:
cros_lib.MountImage(image_path, self._rootfs_dir, self._stateful_dir,
read_only=True)
self._full_target_key_path = os.path.join(
self._rootfs_dir, PublicKeyManager.TARGET_KEY_PATH)
self._is_key_new = True
if os.path.exists(self._full_target_key_path):
diff_output = cros_lib.RunCommand(['diff',
self.key_path,
self._full_target_key_path],
print_cmd=False, redirect_stdout=True,
redirect_stderr=True, error_ok=True)
if not diff_output: self._is_key_new = False
finally:
cros_lib.UnmountImage(self._rootfs_dir, self._stateful_dir)
def __del__(self):
"""Remove our temporary directories we created in init."""
os.rmdir(self._rootfs_dir)
os.rmdir(self._stateful_dir)
def AddKeyToImage(self):
"""Adds the key specified in init to the image."""
if not self._is_key_new:
cros_lib.Info('Public key already on image %s. No work to do.' %
self.image_path)
return
cros_lib.Info('Copying %s into %s' % (self.key_path, self.image_path))
try:
cros_lib.MountImage(self.image_path, self._rootfs_dir, self._stateful_dir,
read_only=False)
dir_path = os.path.dirname(self._full_target_key_path)
cros_lib.RunCommand(['sudo', 'mkdir', '--parents', dir_path],
print_cmd=False)
cros_lib.RunCommand(['sudo', 'cp', '--force', '-p', self.key_path,
self._full_target_key_path], print_cmd=False)
finally:
cros_lib.UnmountImage(self._rootfs_dir, self._stateful_dir)
self._MakeImageBootable()
def RemoveKeyFromImage(self):
"""Removes the key specified in init from the image."""
cros_lib.Info('Removing public key from image %s.' % self.image_path)
try:
cros_lib.MountImage(self.image_path, self._rootfs_dir, self._stateful_dir,
read_only=False)
cros_lib.RunCommand(['sudo', 'rm', '--force', self._full_target_key_path],
print_cmd=False)
finally:
cros_lib.UnmountImage(self._rootfs_dir, self._stateful_dir)
self._MakeImageBootable()
def _MakeImageBootable(self):
"""Makes the image bootable. Note, it is only useful for non-vm images."""
image = os.path.basename(self.image_path)
if 'qemu' in image:
return
from_dir = os.path.dirname(self.image_path)
cros_lib.RunCommand(['bin/cros_make_image_bootable',
cros_lib.ReinterpretPathForChroot(from_dir),
image], print_cmd=False, redirect_stdout=True,
redirect_stderr=True, enter_chroot=True,
cwd=cros_lib.CROSUTILS_DIRECTORY)

View File

@ -11,6 +11,8 @@ import subprocess
import sys
_STDOUT_IS_TTY = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
CROSUTILS_DIRECTORY = os.path.realpath(os.path.dirname(os.path.dirname(
__file__)))
# TODO(sosa): Move logging to logging module.
@ -297,3 +299,30 @@ def GetIPAddress(device='eth0'):
else:
Warning('Failed to find ip address in %s' % ifconfig_output)
return None
def MountImage(image_path, root_dir, stateful_dir, read_only):
"""Mounts a Chromium OS image onto mount dir points."""
from_dir = os.path.dirname(image_path)
image = os.path.basename(image_path)
extra_args = []
if read_only: extra_args.append('--read_only')
cmd = ['./mount_gpt_image.sh',
'--from=%s' % from_dir,
'--image=%s' % image,
'--rootfs_mountpt=%s' % root_dir,
'--stateful_mountpt=%s' % stateful_dir,
]
cmd.extend(extra_args)
RunCommand(cmd, print_cmd=False, redirect_stdout=True, redirect_stderr=True,
cwd=CROSUTILS_DIRECTORY)
def UnmountImage(root_dir, stateful_dir):
"""Unmounts a Chromium OS image specified by mount dir points."""
RunCommand(['./mount_gpt_image.sh',
'--unmount',
'--rootfs_mountpt=%s' % root_dir,
'--stateful_mountpt=%s' % stateful_dir,
], print_cmd=False, redirect_stdout=True, redirect_stderr=True,
cwd=CROSUTILS_DIRECTORY)