diff --git a/build_library/precompute_pcr.py b/build_library/precompute_pcr.py index f9617b71c2..0c03a2ffa4 100755 --- a/build_library/precompute_pcr.py +++ b/build_library/precompute_pcr.py @@ -201,6 +201,665 @@ def load_commands_file(filepath): return commands +# --------------------------------------------------------------------------- +# GRUB config evaluator for Flatcar grub.cfg +# --------------------------------------------------------------------------- + +class GrubEvaluator: + """Evaluate a Flatcar grub.cfg and produce the PCR 8 command list. + + This is a minimal GRUB script interpreter that handles the subset + of GRUB scripting used by the Flatcar grub.cfg: variable assignments, + conditionals, function definitions/calls, source, menuentry, and + the test ([) builtin. + + Runtime filesystem state and GRUB environment variables are provided + by the caller via constructor arguments. + + Args: + grub_cfg: text content of the main grub.cfg (with @@MOUNTUSR@@ + already replaced) + env: dict of pre-set GRUB environment variables. Must include + at minimum 'root' (e.g. 'hd0,gpt1'). Common variables: + root - boot disk partition (e.g. 'hd0,gpt1') + grub_platform - 'efi', 'pc', or 'xen' + grub_cpu - 'x86_64' or 'arm64' + prefix - initial prefix (overwritten by grub.cfg) + oem_partition: value that 'search --set oem --part-label OEM' + should return (e.g. 'hd0,gpt6'), or None if no OEM partition + oem_grub_cfg: text content of the OEM grub.cfg, or None + existing_files: set of file paths that exist on the boot disk, + used for '[ -f path ]' tests. Paths should use the GRUB + format '(device)/path' (e.g. '(hd0,gpt1)/flatcar/first_boot') + usr_uuid: UUID returned by gptprio.next (selects USR-A or USR-B) + menuentry: which menuentry --id to boot (default: 'flatcar') + """ + + def __init__(self, grub_cfg, env=None, oem_partition=None, + oem_grub_cfg=None, existing_files=None, + usr_uuid=None, menuentry='flatcar'): + self.env = dict(env) if env else {} + self.oem_partition = oem_partition + self.oem_grub_cfg = oem_grub_cfg + self.existing_files = set(existing_files) if existing_files else set() + self.usr_uuid = usr_uuid or '' + self.menuentry = menuentry + self.commands = [] # PCR 8 measured commands + self.pcr9_files = [] # PCR 9 measured file contents + self.functions = {} # name -> list of lines + self.kernel_cmdline = None + self._raw_text = grub_cfg + + # If OEM partition and grub.cfg are provided, register the file + if self.oem_partition and self.oem_grub_cfg is not None: + self.existing_files.add(f'({self.oem_partition})/grub.cfg') + + lines = self._preprocess(grub_cfg) + self._execute(lines) + + def _preprocess(self, text): + """Split script into logical lines, joining continuation lines.""" + result = [] + for line in text.splitlines(): + stripped = line.strip() + if not stripped or stripped.startswith('#'): + continue + result.append(stripped) + return result + + def _menuentry_source(self, text, start_token_line): + """Reconstruct the raw source of a menuentry block for measurement. + + GRUB measures menuentry commands using the original source text + with quotes removed from the command line but body lines + preserved with original whitespace. Variables in the body are + NOT expanded. + + Returns: the measured string for the menuentry definition. + """ + # Find the menuentry line in the original text + raw_lines = text.splitlines() + # Find a line starting with 'menuentry' that matches + # the stripped start_token_line + start_idx = None + for idx, raw in enumerate(raw_lines): + if raw.strip() == start_token_line: + start_idx = idx + break + if start_idx is None: + return start_token_line + + # Collect until closing } + depth = 0 + end_idx = start_idx + for idx in range(start_idx, len(raw_lines)): + for ch in raw_lines[idx]: + if ch == '{': + depth += 1 + elif ch == '}': + depth -= 1 + if depth == 0 and idx > start_idx: + end_idx = idx + break + + # Build the measured source: first line (quote-removed) + body + } + # First line: tokenize and expand/unquote, then join + first_tokens = self._tokenize(start_token_line) + # Remove { from the end + if first_tokens and first_tokens[-1] == '{': + first_tokens = first_tokens[:-1] + measured_first = ' '.join(self._expand_and_unquote(t) for t in first_tokens) + measured_first += ' {' + + # Body lines: use raw source (no stripping, no expansion) + body_lines = raw_lines[start_idx + 1:end_idx] + # The closing } line + result = measured_first + for bl in body_lines: + result += '\n' + bl + result += '\n}' + return result + + def _expand_and_unquote(self, s): + """Expand variables and remove quotes from a GRUB word. + + Returns the expanded string as a single value (no word splitting). + + GRUB processes words by: + 1. Expanding $var / ${var} references + 2. Removing quote characters (double and single quotes) + 3. Inside double quotes, variables are expanded + 4. Inside single quotes, text is literal (no expansion) + """ + result = [] + i = 0 + while i < len(s): + if s[i] == '"': + # Double-quoted section: expand variables, remove quotes + i += 1 + while i < len(s) and s[i] != '"': + if s[i] == '$': + val, consumed = self._expand_var(s, i) + result.append(val) + i += consumed + else: + result.append(s[i]) + i += 1 + if i < len(s): + i += 1 # skip closing " + elif s[i] == "'": + # Single-quoted section: literal, remove quotes + i += 1 + while i < len(s) and s[i] != "'": + result.append(s[i]) + i += 1 + if i < len(s): + i += 1 # skip closing ' + elif s[i] == '$': + val, consumed = self._expand_var(s, i) + result.append(val) + i += consumed + else: + result.append(s[i]) + i += 1 + return ''.join(result) + + def _expand_to_words(self, s): + """Expand variables and remove quotes, returning word-split list. + + Unquoted variable expansions are subject to word splitting + (whitespace-separated). Quoted sections are not split. + Returns a list of strings (words). + """ + # Build segments: each is (text, splittable) + segments = [] + i = 0 + while i < len(s): + if s[i] == '"': + # Double-quoted: no word splitting + parts = [] + i += 1 + while i < len(s) and s[i] != '"': + if s[i] == '$': + val, consumed = self._expand_var(s, i) + parts.append(val) + i += consumed + else: + parts.append(s[i]) + i += 1 + if i < len(s): + i += 1 + segments.append((''.join(parts), False)) + elif s[i] == "'": + # Single-quoted: no word splitting + parts = [] + i += 1 + while i < len(s) and s[i] != "'": + parts.append(s[i]) + i += 1 + if i < len(s): + i += 1 + segments.append((''.join(parts), False)) + elif s[i] == '$': + # Unquoted variable: subject to word splitting + val, consumed = self._expand_var(s, i) + segments.append((val, True)) + i += consumed + else: + # Unquoted literal + j = i + while j < len(s) and s[j] not in ('"', "'", '$'): + j += 1 + segments.append((s[i:j], False)) + i = j + + # Concatenate segments, tracking which chars are splittable + full = [] + splittable = [] + for text, is_split in segments: + full.extend(text) + splittable.extend([is_split] * len(text)) + + # Word-split: split on whitespace in splittable regions + words = [] + current = [] + for ch, sp in zip(full, splittable): + if sp and ch in (' ', '\t'): + if current: + words.append(''.join(current)) + current = [] + else: + current.append(ch) + if current: + words.append(''.join(current)) + return words + + def _expand_var(self, s, i): + """Expand a variable reference starting at position i. + + Returns (expanded_value, chars_consumed). + """ + if i + 1 >= len(s): + return ('$', 1) + if s[i + 1] == '{': + end = s.find('}', i + 2) + if end != -1: + var = s[i + 2:end] + return (self.env.get(var, ''), end - i + 1) + return ('${', 2) + elif s[i + 1] == '?': + return (self.env.get('?', '0'), 2) + else: + j = i + 1 + while j < len(s) and (s[j].isalnum() or s[j] == '_'): + j += 1 + var = s[i + 1:j] + if var: + return (self.env.get(var, ''), j - i) + return ('$', 1) + + def _tokenize(self, line): + """Split a line into raw tokens respecting quotes. + + Tokens are returned as-is (with quotes and $var references + intact). Use _expand_and_unquote() on each token to get the + final measured form. + """ + tokens = [] + i = 0 + while i < len(line): + if line[i].isspace(): + i += 1 + continue + if line[i] == '{': + tokens.append('{') + i += 1 + elif line[i] == '}': + tokens.append('}') + i += 1 + elif line[i] == ';': + tokens.append(';') + i += 1 + else: + # Accumulate a word — may contain quotes + j = i + while j < len(line) and not line[j].isspace() and line[j] not in ('{', '}', ';'): + if line[j] in ('"', "'"): + q = line[j] + j += 1 + while j < len(line) and line[j] != q: + j += 1 + if j < len(line): + j += 1 + else: + j += 1 + tokens.append(line[i:j]) + i = j + return tokens + + def _measure_cmd(self, cmd_str): + """Record a command for PCR 8 measurement.""" + self.commands.append(cmd_str) + + def _eval_test(self, args): + """Evaluate a [ ... ] test expression. + + Handles: -n, -z, -f, -a (AND), -o (OR), =, !=, -ne, -eq + """ + # Remove trailing ] + if args and args[-1] == ']': + args = args[:-1] + + # Split on -o (OR) first + or_groups = [] + current = [] + for a in args: + if a == '-o': + or_groups.append(current) + current = [] + else: + current.append(a) + or_groups.append(current) + + for group in or_groups: + if self._eval_and_group(group): + return True + return False + + def _eval_and_group(self, args): + """Evaluate a group of AND-separated test expressions.""" + and_parts = [] + current = [] + for a in args: + if a == '-a': + and_parts.append(current) + current = [] + else: + current.append(a) + and_parts.append(current) + + for part in and_parts: + if not self._eval_single_test(part): + return False + return True + + def _eval_single_test(self, args): + if not args: + return False + if len(args) == 2 and args[0] == '-n': + return args[1] != '' + if len(args) == 2 and args[0] == '-z': + return args[1] == '' + if len(args) == 2 and args[0] == '-f': + return args[1] in self.existing_files + if len(args) == 3 and args[1] == '=': + return args[0] == args[2] + if len(args) == 3 and args[1] == '!=': + return args[0] != args[2] + if len(args) == 3 and args[1] == '-ne': + try: + return int(args[0]) != int(args[2]) + except ValueError: + return True + if len(args) == 3 and args[1] == '-eq': + try: + return int(args[0]) == int(args[2]) + except ValueError: + return False + # Single arg: true if non-empty + if len(args) == 1: + return args[0] != '' + return False + + def _find_block_end(self, lines, start): + """Find matching fi/done/} for a block starting at 'start'. + + Returns (then_lines, elif_parts, else_lines, end_index) + for if blocks, or (body_lines, end_index) for { } blocks. + """ + # For if/elif/else/fi blocks + depth = 0 + i = start + while i < len(lines): + tokens = self._tokenize(lines[i]) + if not tokens: + i += 1 + continue + cmd = tokens[0] + if cmd == 'if': + depth += 1 + elif cmd == 'fi': + depth -= 1 + if depth == 0: + return i + i += 1 + return len(lines) - 1 + + def _parse_if_block(self, lines, start): + """Parse an if/elif/else/fi block starting at 'start'. + + Returns (branches, end_index) where branches is a list of + (condition_line_or_None, body_lines) tuples. + """ + branches = [] + depth = 0 + i = start + current_cond = lines[start] # the 'if ...; then' line + current_body = [] + in_block = True + + i = start + 1 + while i < len(lines): + tokens = self._tokenize(lines[i]) + if not tokens: + i += 1 + continue + cmd = tokens[0] + + # Track nested if depth + if cmd == 'if': + depth += 1 + current_body.append(lines[i]) + elif cmd == 'fi': + if depth > 0: + depth -= 1 + current_body.append(lines[i]) + else: + branches.append((current_cond, current_body)) + return branches, i + elif cmd in ('elif', 'else') and depth == 0: + branches.append((current_cond, current_body)) + current_cond = lines[i] if cmd == 'elif' else None + current_body = [] + else: + current_body.append(lines[i]) + i += 1 + + branches.append((current_cond, current_body)) + return branches, i + + def _parse_cond_line(self, line): + """Extract the test args from 'if [ ... ]; then' or 'elif [ ... ]; then'.""" + tokens = self._tokenize(line) + # Remove 'if' or 'elif' prefix + if tokens and tokens[0] in ('if', 'elif'): + tokens = tokens[1:] + # Remove trailing '; then' or 'then' + while tokens and tokens[-1] in ('then', ';'): + tokens.pop() + return tokens + + def _execute(self, lines): + """Execute a list of preprocessed GRUB script lines.""" + i = 0 + # Collect menuentry definitions for deferred execution + menuentries = [] + while i < len(lines): + line = lines[i] + tokens = self._tokenize(line) + if tokens and tokens[0] == 'menuentry': + # Collect the menuentry, measure it, but defer body execution + entry_id = None + for k, t in enumerate(tokens): + if t.startswith('--id='): + entry_id = t[5:] + break + # Collect body until closing } + body = [] + j = i + 1 + depth = 1 + while j < len(lines): + t2 = self._tokenize(lines[j]) + for tk in t2: + if tk == '{': + depth += 1 + elif tk == '}': + depth -= 1 + if depth == 0: + break + body.append(lines[j]) + j += 1 + + me_source = self._menuentry_source(self._raw_text, line) + self._measure_cmd(me_source) + + title = self._expand_and_unquote(tokens[1]) + menuentries.append((entry_id, title, body)) + i = j + 1 + else: + i = self._execute_line(lines, i) + + # After all lines processed, execute the selected menuentry + for entry_id, title, body in menuentries: + if entry_id == self.menuentry: + self._measure_cmd(f'setparams {title}') + self._execute(body) + break + + def _execute_line(self, lines, i): + """Execute a single line, return next line index.""" + if i >= len(lines): + return i + 1 + + line = lines[i] + tokens = self._tokenize(line) + if not tokens: + return i + 1 + + cmd = tokens[0] + + # --- if / elif / else / fi --- + if cmd == 'if': + branches, end_i = self._parse_if_block(lines, i) + for cond_line, body in branches: + if cond_line is None: + # else branch — always taken + self._execute(body) + break + test_tokens = self._parse_cond_line(cond_line) + expanded = [self._expand_and_unquote(t) for t in test_tokens] + # Measure the test command + test_str = ' '.join(expanded) + self._measure_cmd(test_str) + if expanded[0] == '[': + result = self._eval_test(expanded[1:]) + else: + result = self._run_cmd_for_test(expanded) + if result: + self._execute(body) + break + return end_i + 1 + + # --- function definition --- + if cmd == 'function': + func_name = tokens[1] if len(tokens) > 1 else '' + # Collect body until closing } + body = [] + j = i + 1 + depth = 1 + while j < len(lines): + t = self._tokenize(lines[j]) + for tk in t: + if tk == '{': + depth += 1 + elif tk == '}': + depth -= 1 + if depth == 0: + break + body.append(lines[j]) + j += 1 + self.functions[func_name] = body + return j + 1 + + # --- for loops (not normally taken in Flatcar boot) --- + if cmd == 'for': + # Skip for loops — the net_default_server branch is + # not taken in local boot scenarios + j = i + 1 + depth = 1 + while j < len(lines): + t = self._tokenize(lines[j]) + if t and t[0] == 'for': + depth += 1 + elif t and t[0] == 'done': + depth -= 1 + if depth == 0: + break + j += 1 + return j + 1 + + # --- regular commands --- + # Expand each token with word splitting (unquoted $vars get split) + expanded_words = [] + for t in tokens: + expanded_words.extend(self._expand_to_words(t)) + cmd_str = ' '.join(expanded_words) + self._measure_cmd(cmd_str) + self._handle_cmd(expanded_words) + return i + 1 + + def _run_cmd_for_test(self, tokens): + """Run a command used as an if condition, return success/failure.""" + # For now only [ is used as a test command + return False + + def _handle_cmd(self, tokens): + """Handle side effects of a command (variable changes, etc.).""" + cmd = tokens[0] + + if cmd == 'set' and len(tokens) >= 2: + # set var=value + assignment = ' '.join(tokens[1:]) + eq = assignment.find('=') + if eq != -1: + var = assignment[:eq] + val = assignment[eq + 1:] + self.env[var] = val + + elif cmd == 'search' and '--set' in tokens: + # search --no-floppy --set oem --part-label OEM --hint ... + set_idx = tokens.index('--set') + if set_idx + 1 < len(tokens): + var = tokens[set_idx + 1] + if '--part-label' in tokens: + pl_idx = tokens.index('--part-label') + label = tokens[pl_idx + 1] if pl_idx + 1 < len(tokens) else '' + if label == 'OEM' and self.oem_partition: + self.env[var] = self.oem_partition + # If no OEM partition, var stays unset + + elif cmd == 'source' and len(tokens) >= 2: + # source (oem)/grub.cfg — measure file in PCR 9 + if self.oem_grub_cfg is not None: + self.pcr9_files.append(self.oem_grub_cfg) + # Execute the OEM config + oem_lines = self._preprocess(self.oem_grub_cfg) + self._execute(oem_lines) + + elif cmd == 'gptprio.next': + # gptprio.next -d usr_device -u usr_uuid + if '-u' in tokens: + u_idx = tokens.index('-u') + if u_idx + 1 < len(tokens): + self.env[tokens[u_idx + 1]] = self.usr_uuid + if '-d' in tokens: + d_idx = tokens.index('-d') + if d_idx + 1 < len(tokens): + self.env[tokens[d_idx + 1]] = 'ignored' + self.env['?'] = '0' + + elif cmd in ('linux', 'linuxefi'): + # Kernel command line is measured separately in PCR 8 + cmdline = ' '.join(tokens[1:]) + self.kernel_cmdline = cmdline + + # Function calls + elif cmd in self.functions: + self._execute(self.functions[cmd]) + + +def evaluate_grub_cfg(grub_cfg, env=None, oem_partition=None, + oem_grub_cfg=None, existing_files=None, + usr_uuid=None, menuentry='flatcar'): + """Evaluate a Flatcar grub.cfg and return PCR 8 commands. + + This is the main entry point for the GRUB config evaluator. + See GrubEvaluator for argument descriptions. + + Returns (commands, kernel_cmdline) where commands is the list of + measured command strings and kernel_cmdline is the full kernel + command line string (also measured as the final PCR 8 entry). + """ + ev = GrubEvaluator(grub_cfg, env=env, oem_partition=oem_partition, + oem_grub_cfg=oem_grub_cfg, + existing_files=existing_files, + usr_uuid=usr_uuid, menuentry=menuentry) + commands = list(ev.commands) + if ev.kernel_cmdline: + commands.append(ev.kernel_cmdline) + return commands + + # --------------------------------------------------------------------------- # PCR 9: GRUB source'd files and loaded kernel # --------------------------------------------------------------------------- @@ -327,6 +986,37 @@ def replay_eventlog_simple(eventlog_path, hash_algo='sha256'): # CLI # --------------------------------------------------------------------------- + +def _eval_grub_cfg_from_args(args): + """Build the evaluated GRUB command list from pcr8-eval CLI args.""" + with open(args.grub_cfg, 'r') as f: + grub_cfg = f.read() + + oem_grub_cfg = None + if args.oem_grub_cfg: + with open(args.oem_grub_cfg, 'r') as f: + oem_grub_cfg = f.read() + + root = args.root + existing_files = set() + if args.first_boot: + existing_files.add(f'({root})/flatcar/first_boot') + + env = { + 'root': root, + 'grub_platform': 'efi', + 'grub_cpu': args.grub_cpu, + } + + return evaluate_grub_cfg( + grub_cfg, env=env, + oem_partition=args.oem_partition, + oem_grub_cfg=oem_grub_cfg, + existing_files=existing_files, + usr_uuid=args.usr_uuid, + menuentry=args.menuentry) + + def main(): parser = argparse.ArgumentParser( description='Precompute TPM PCR 4, 8, and 9 values for Flatcar boot.') @@ -366,6 +1056,29 @@ def main(): pa.add_argument('--oem-grub-cfg', nargs='*', default=[], help='Path(s) to OEM grub.cfg file(s) sourced during boot') + # pcr8-eval + p8e = sub.add_parser('pcr8-eval', + help='Compute PCR 8 by evaluating grub.cfg') + p8e.add_argument('--grub-cfg', required=True, + help='Path to grub.cfg (with @@MOUNTUSR@@ replaced)') + p8e.add_argument('--oem-grub-cfg', + help='Path to OEM grub.cfg to source') + p8e.add_argument('--root', default='hd0,gpt1', + help='GRUB root device (default: hd0,gpt1)') + p8e.add_argument('--grub-cpu', default='x86_64', + choices=['x86_64', 'arm64'], + help='CPU architecture (default: x86_64)') + p8e.add_argument('--oem-partition', + help='OEM partition device (e.g. hd0,gpt6)') + p8e.add_argument('--usr-uuid', required=True, + help='USR partition UUID from gptprio') + p8e.add_argument('--first-boot', action='store_true', + help='Simulate first boot (first_boot file exists)') + p8e.add_argument('--menuentry', default='flatcar', + help='Menuentry --id to boot (default: flatcar)') + p8e.add_argument('--print-commands', action='store_true', + help='Print the evaluated command list instead of PCR') + # replay pr = sub.add_parser('replay', help='Replay an eventlog to verify PCR values') @@ -386,6 +1099,14 @@ def main(): commands = load_commands_file(args.commands_file) results['pcr8'] = compute_pcr8(commands, args.algo) + elif args.command == 'pcr8-eval': + commands = _eval_grub_cfg_from_args(args) + if args.print_commands: + for cmd in commands: + print(cmd) + else: + results['pcr8'] = compute_pcr8(commands, args.algo) + elif args.command == 'pcr9': results['pcr9'] = compute_pcr9( args.kernel,