mirror of
				https://source.denx.de/u-boot/u-boot.git
				synced 2025-10-26 05:51:29 +01:00 
			
		
		
		
	-----BEGIN PGP SIGNATURE----- iQGzBAABCgAdFiEEGjx/cOCPqxcHgJu/FHw5/5Y0tywFAmGk/4sACgkQFHw5/5Y0 tyyjtgwAo0jCRa1Vyc5z2RrINGdARoPhNcCnBNMYEVTjH9uP0/Mzlpo4i1IWX5qP nEuVmP01KjtWDRNy7Cpr45/j2PkTZ0THqXGZKpTG/yALdUKZw0wbzeh/CMllWh0A 9yJfMbv2/IV79rbb6osxItjusSr5PnrU5fHsR+zUfC8NRZpzWGfpH8a6NpfGvGYo 3OUKNJrGv930C3MqVnUEHPIuz+vL9fWLSs/rK/JHVZt+ALU45uYtz4cKycMoElVd IUJe3mhKhw0k8dO8R+p3UAEFrP+LZdcBLgCt8wRQK6Yl4lcxYOtZgYuJuM9Kp20s abqIuky3TWwFjpNM60Gmo63Yf967jOeI/lfITq2juZ5TBlNKOO6Z/NRVUsHkNKFG 7qJC14/NxuSdN4u9s2h+rJBBGYc+BzWVO/ikigHnsCeFQYfafJpGTnPSJr55OU5X eeB6l3blx6jwxOPAz0JhLecZb4e027R+eKionirMEnVPENtnJjF1d+CulkekahUO LvdqKaBE =h4j5 -----END PGP SIGNATURE----- Merge tag 'v2022.01-rc3' into next Prepare v2022.01-rc3 Signed-off-by: Tom Rini <trini@konsulko.com>
		
			
				
	
	
		
			382 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			382 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # SPDX-License-Identifier: GPL-2.0
 | |
| # Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved.
 | |
| 
 | |
| """
 | |
| Utility code shared across multiple tests.
 | |
| """
 | |
| 
 | |
| import hashlib
 | |
| import inspect
 | |
| import os
 | |
| import os.path
 | |
| import pathlib
 | |
| import signal
 | |
| import sys
 | |
| import time
 | |
| import re
 | |
| import pytest
 | |
| 
 | |
| def md5sum_data(data):
 | |
|     """Calculate the MD5 hash of some data.
 | |
| 
 | |
|     Args:
 | |
|         data: The data to hash.
 | |
| 
 | |
|     Returns:
 | |
|         The hash of the data, as a binary string.
 | |
|     """
 | |
| 
 | |
|     h = hashlib.md5()
 | |
|     h.update(data)
 | |
|     return h.digest()
 | |
| 
 | |
| def md5sum_file(fn, max_length=None):
 | |
|     """Calculate the MD5 hash of the contents of a file.
 | |
| 
 | |
|     Args:
 | |
|         fn: The filename of the file to hash.
 | |
|         max_length: The number of bytes to hash. If the file has more
 | |
|             bytes than this, they will be ignored. If None or omitted, the
 | |
|             entire file will be hashed.
 | |
| 
 | |
|     Returns:
 | |
|         The hash of the file content, as a binary string.
 | |
|     """
 | |
| 
 | |
|     with open(fn, 'rb') as fh:
 | |
|         if max_length:
 | |
|             params = [max_length]
 | |
|         else:
 | |
|             params = []
 | |
|         data = fh.read(*params)
 | |
|     return md5sum_data(data)
 | |
| 
 | |
| class PersistentRandomFile:
 | |
|     """Generate and store information about a persistent file containing
 | |
|     random data."""
 | |
| 
 | |
|     def __init__(self, u_boot_console, fn, size):
 | |
|         """Create or process the persistent file.
 | |
| 
 | |
|         If the file does not exist, it is generated.
 | |
| 
 | |
|         If the file does exist, its content is hashed for later comparison.
 | |
| 
 | |
|         These files are always located in the "persistent data directory" of
 | |
|         the current test run.
 | |
| 
 | |
|         Args:
 | |
|             u_boot_console: A console connection to U-Boot.
 | |
|             fn: The filename (without path) to create.
 | |
|             size: The desired size of the file in bytes.
 | |
| 
 | |
|         Returns:
 | |
|             Nothing.
 | |
|         """
 | |
| 
 | |
|         self.fn = fn
 | |
| 
 | |
|         self.abs_fn = u_boot_console.config.persistent_data_dir + '/' + fn
 | |
| 
 | |
|         if os.path.exists(self.abs_fn):
 | |
|             u_boot_console.log.action('Persistent data file ' + self.abs_fn +
 | |
|                 ' already exists')
 | |
|             self.content_hash = md5sum_file(self.abs_fn)
 | |
|         else:
 | |
|             u_boot_console.log.action('Generating ' + self.abs_fn +
 | |
|                 ' (random, persistent, %d bytes)' % size)
 | |
|             data = os.urandom(size)
 | |
|             with open(self.abs_fn, 'wb') as fh:
 | |
|                 fh.write(data)
 | |
|             self.content_hash = md5sum_data(data)
 | |
| 
 | |
| def attempt_to_open_file(fn):
 | |
|     """Attempt to open a file, without throwing exceptions.
 | |
| 
 | |
|     Any errors (exceptions) that occur during the attempt to open the file
 | |
|     are ignored. This is useful in order to test whether a file (in
 | |
|     particular, a device node) exists and can be successfully opened, in order
 | |
|     to poll for e.g. USB enumeration completion.
 | |
| 
 | |
|     Args:
 | |
|         fn: The filename to attempt to open.
 | |
| 
 | |
|     Returns:
 | |
|         An open file handle to the file, or None if the file could not be
 | |
|             opened.
 | |
|     """
 | |
| 
 | |
|     try:
 | |
|         return open(fn, 'rb')
 | |
|     except:
 | |
|         return None
 | |
| 
 | |
| def wait_until_open_succeeds(fn):
 | |
|     """Poll until a file can be opened, or a timeout occurs.
 | |
| 
 | |
|     Continually attempt to open a file, and return when this succeeds, or
 | |
|     raise an exception after a timeout.
 | |
| 
 | |
|     Args:
 | |
|         fn: The filename to attempt to open.
 | |
| 
 | |
|     Returns:
 | |
|         An open file handle to the file.
 | |
|     """
 | |
| 
 | |
|     for i in range(100):
 | |
|         fh = attempt_to_open_file(fn)
 | |
|         if fh:
 | |
|             return fh
 | |
|         time.sleep(0.1)
 | |
|     raise Exception('File could not be opened')
 | |
| 
 | |
| def wait_until_file_open_fails(fn, ignore_errors):
 | |
|     """Poll until a file cannot be opened, or a timeout occurs.
 | |
| 
 | |
|     Continually attempt to open a file, and return when this fails, or
 | |
|     raise an exception after a timeout.
 | |
| 
 | |
|     Args:
 | |
|         fn: The filename to attempt to open.
 | |
|         ignore_errors: Indicate whether to ignore timeout errors. If True, the
 | |
|             function will simply return if a timeout occurs, otherwise an
 | |
|             exception will be raised.
 | |
| 
 | |
|     Returns:
 | |
|         Nothing.
 | |
|     """
 | |
| 
 | |
|     for _ in range(100):
 | |
|         fh = attempt_to_open_file(fn)
 | |
|         if not fh:
 | |
|             return
 | |
|         fh.close()
 | |
|         time.sleep(0.1)
 | |
|     if ignore_errors:
 | |
|         return
 | |
|     raise Exception('File can still be opened')
 | |
| 
 | |
| def run_and_log(u_boot_console, cmd, ignore_errors=False, stdin=None):
 | |
|     """Run a command and log its output.
 | |
| 
 | |
|     Args:
 | |
|         u_boot_console: A console connection to U-Boot.
 | |
|         cmd: The command to run, as an array of argv[], or a string.
 | |
|             If a string, note that it is split up so that quoted spaces
 | |
|             will not be preserved. E.g. "fred and" becomes ['"fred', 'and"']
 | |
|         ignore_errors: Indicate whether to ignore errors. If True, the function
 | |
|             will simply return if the command cannot be executed or exits with
 | |
|             an error code, otherwise an exception will be raised if such
 | |
|             problems occur.
 | |
|         stdin: Input string to pass to the command as stdin (or None)
 | |
| 
 | |
|     Returns:
 | |
|         The output as a string.
 | |
|     """
 | |
|     if isinstance(cmd, str):
 | |
|         cmd = cmd.split()
 | |
|     runner = u_boot_console.log.get_runner(cmd[0], sys.stdout)
 | |
|     output = runner.run(cmd, ignore_errors=ignore_errors, stdin=stdin)
 | |
|     runner.close()
 | |
|     return output
 | |
| 
 | |
| def run_and_log_expect_exception(u_boot_console, cmd, retcode, msg):
 | |
|     """Run a command that is expected to fail.
 | |
| 
 | |
|     This runs a command and checks that it fails with the expected return code
 | |
|     and exception method. If not, an exception is raised.
 | |
| 
 | |
|     Args:
 | |
|         u_boot_console: A console connection to U-Boot.
 | |
|         cmd: The command to run, as an array of argv[].
 | |
|         retcode: Expected non-zero return code from the command.
 | |
|         msg: String that should be contained within the command's output.
 | |
|     """
 | |
|     try:
 | |
|         runner = u_boot_console.log.get_runner(cmd[0], sys.stdout)
 | |
|         runner.run(cmd)
 | |
|     except Exception:
 | |
|         assert retcode == runner.exit_status
 | |
|         assert msg in runner.output
 | |
|     else:
 | |
|         raise Exception("Expected an exception with retcode %d message '%s',"
 | |
|                         "but it was not raised" % (retcode, msg))
 | |
|     finally:
 | |
|         runner.close()
 | |
| 
 | |
| ram_base = None
 | |
| def find_ram_base(u_boot_console):
 | |
|     """Find the running U-Boot's RAM location.
 | |
| 
 | |
|     Probe the running U-Boot to determine the address of the first bank
 | |
|     of RAM. This is useful for tests that test reading/writing RAM, or
 | |
|     load/save files that aren't associated with some standard address
 | |
|     typically represented in an environment variable such as
 | |
|     ${kernel_addr_r}. The value is cached so that it only needs to be
 | |
|     actively read once.
 | |
| 
 | |
|     Args:
 | |
|         u_boot_console: A console connection to U-Boot.
 | |
| 
 | |
|     Returns:
 | |
|         The address of U-Boot's first RAM bank, as an integer.
 | |
|     """
 | |
| 
 | |
|     global ram_base
 | |
|     if u_boot_console.config.buildconfig.get('config_cmd_bdi', 'n') != 'y':
 | |
|         pytest.skip('bdinfo command not supported')
 | |
|     if ram_base == -1:
 | |
|         pytest.skip('Previously failed to find RAM bank start')
 | |
|     if ram_base is not None:
 | |
|         return ram_base
 | |
| 
 | |
|     with u_boot_console.log.section('find_ram_base'):
 | |
|         response = u_boot_console.run_command('bdinfo')
 | |
|         for l in response.split('\n'):
 | |
|             if '-> start' in l or 'memstart    =' in l:
 | |
|                 ram_base = int(l.split('=')[1].strip(), 16)
 | |
|                 break
 | |
|         if ram_base is None:
 | |
|             ram_base = -1
 | |
|             raise Exception('Failed to find RAM bank start in `bdinfo`')
 | |
| 
 | |
|     # We don't want ram_base to be zero as some functions test if the given
 | |
|     # address is NULL (0). Besides, on some RISC-V targets the low memory
 | |
|     # is protected that prevents S-mode U-Boot from access.
 | |
|     # Let's add 2MiB then (size of an ARM LPAE/v8 section).
 | |
| 
 | |
|     ram_base += 1024 * 1024 * 2
 | |
| 
 | |
|     return ram_base
 | |
| 
 | |
| class PersistentFileHelperCtxMgr(object):
 | |
|     """A context manager for Python's "with" statement, which ensures that any
 | |
|     generated file is deleted (and hence regenerated) if its mtime is older
 | |
|     than the mtime of the Python module which generated it, and gets an mtime
 | |
|     newer than the mtime of the Python module which generated after it is
 | |
|     generated. Objects of this type should be created by factory function
 | |
|     persistent_file_helper rather than directly."""
 | |
| 
 | |
|     def __init__(self, log, filename):
 | |
|         """Initialize a new object.
 | |
| 
 | |
|         Args:
 | |
|             log: The Logfile object to log to.
 | |
|             filename: The filename of the generated file.
 | |
| 
 | |
|         Returns:
 | |
|             Nothing.
 | |
|         """
 | |
| 
 | |
|         self.log = log
 | |
|         self.filename = filename
 | |
| 
 | |
|     def __enter__(self):
 | |
|         frame = inspect.stack()[1]
 | |
|         module = inspect.getmodule(frame[0])
 | |
|         self.module_filename = module.__file__
 | |
|         self.module_timestamp = os.path.getmtime(self.module_filename)
 | |
| 
 | |
|         if os.path.exists(self.filename):
 | |
|             filename_timestamp = os.path.getmtime(self.filename)
 | |
|             if filename_timestamp < self.module_timestamp:
 | |
|                 self.log.action('Removing stale generated file ' +
 | |
|                     self.filename)
 | |
|                 pathlib.Path(self.filename).unlink()
 | |
| 
 | |
|     def __exit__(self, extype, value, traceback):
 | |
|         if extype:
 | |
|             try:
 | |
|                 pathlib.Path(self.filename).unlink()
 | |
|             except Exception:
 | |
|                 pass
 | |
|             return
 | |
|         logged = False
 | |
|         for _ in range(20):
 | |
|             filename_timestamp = os.path.getmtime(self.filename)
 | |
|             if filename_timestamp > self.module_timestamp:
 | |
|                 break
 | |
|             if not logged:
 | |
|                 self.log.action(
 | |
|                     'Waiting for generated file timestamp to increase')
 | |
|                 logged = True
 | |
|             os.utime(self.filename)
 | |
|             time.sleep(0.1)
 | |
| 
 | |
| def persistent_file_helper(u_boot_log, filename):
 | |
|     """Manage the timestamps and regeneration of a persistent generated
 | |
|     file. This function creates a context manager for Python's "with"
 | |
|     statement
 | |
| 
 | |
|     Usage:
 | |
|         with persistent_file_helper(u_boot_console.log, filename):
 | |
|             code to generate the file, if it's missing.
 | |
| 
 | |
|     Args:
 | |
|         u_boot_log: u_boot_console.log.
 | |
|         filename: The filename of the generated file.
 | |
| 
 | |
|     Returns:
 | |
|         A context manager object.
 | |
|     """
 | |
| 
 | |
|     return PersistentFileHelperCtxMgr(u_boot_log, filename)
 | |
| 
 | |
| def crc32(u_boot_console, address, count):
 | |
|     """Helper function used to compute the CRC32 value of a section of RAM.
 | |
| 
 | |
|     Args:
 | |
|         u_boot_console: A U-Boot console connection.
 | |
|         address: Address where data starts.
 | |
|         count: Amount of data to use for calculation.
 | |
| 
 | |
|     Returns:
 | |
|         CRC32 value
 | |
|     """
 | |
| 
 | |
|     bcfg = u_boot_console.config.buildconfig
 | |
|     has_cmd_crc32 = bcfg.get('config_cmd_crc32', 'n') == 'y'
 | |
|     assert has_cmd_crc32, 'Cannot compute crc32 without CONFIG_CMD_CRC32.'
 | |
|     output = u_boot_console.run_command('crc32 %08x %x' % (address, count))
 | |
| 
 | |
|     m = re.search('==> ([0-9a-fA-F]{8})$', output)
 | |
|     assert m, 'CRC32 operation failed.'
 | |
| 
 | |
|     return m.group(1)
 | |
| 
 | |
| def waitpid(pid, timeout=60, kill=False):
 | |
|     """Wait a process to terminate by its PID
 | |
| 
 | |
|     This is an alternative to a os.waitpid(pid, 0) call that works on
 | |
|     processes that aren't children of the python process.
 | |
| 
 | |
|     Args:
 | |
|         pid: PID of a running process.
 | |
|         timeout: Time in seconds to wait.
 | |
|         kill: Whether to forcibly kill the process after timeout.
 | |
| 
 | |
|     Returns:
 | |
|         True, if the process ended on its own.
 | |
|         False, if the process was killed by this function.
 | |
| 
 | |
|     Raises:
 | |
|         TimeoutError, if the process is still running after timeout.
 | |
|     """
 | |
|     try:
 | |
|         for _ in range(timeout):
 | |
|             os.kill(pid, 0)
 | |
|             time.sleep(1)
 | |
| 
 | |
|         if kill:
 | |
|             os.kill(pid, signal.SIGKILL)
 | |
|             return False
 | |
| 
 | |
|     except ProcessLookupError:
 | |
|         return True
 | |
| 
 | |
|     raise TimeoutError(
 | |
|         "Process with PID {} did not terminate after {} seconds."
 | |
|         .format(pid, timeout)
 | |
|     )
 |