flatcar-scripts/build_library/disk_util
Michael Marineau 8ca62121d9 feat(disk_util): Add mount and unmount commands.
This will replace the assorted mix of mount/umount scripts.
2013-12-30 16:12:04 -08:00

646 lines
20 KiB
Python
Executable File

#!/usr/bin/python
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import json
import os
import subprocess
import sys
import uuid
# First sector we can use.
GPT_RESERVED_SECTORS = 34
class ConfigNotFound(Exception):
pass
class PartitionNotFound(Exception):
pass
class InvalidLayout(Exception):
pass
class InvalidAdjustment(Exception):
pass
def LoadPartitionConfig(options):
"""Loads a partition tables configuration file into a Python object.
Args:
options: Flags passed to the script
Returns:
Object containing disk layout configuration
"""
valid_keys = set(('_comment', 'metadata', 'layouts'))
valid_layout_keys = set((
'_comment', 'type', 'num', 'label', 'blocks', 'block_size', 'fs_blocks',
'fs_block_size', 'fs_type', 'features', 'uuid', 'alignment', 'mount',
'binds'))
integer_layout_keys = set((
'blocks', 'block_size', 'fs_blocks', 'fs_block_size', 'alignment'))
required_layout_keys = set(('type', 'num', 'label', 'blocks'))
filename = options.disk_layout_file
if not os.path.exists(filename):
raise ConfigNotFound('Partition config %s was not found!' % filename)
with open(filename) as f:
config = json.load(f)
unknown_keys = set(config.keys()) - valid_keys
if unknown_keys:
raise InvalidLayout('Unknown items: %s' % ' '.join(unknown_keys))
try:
metadata = config['metadata']
base = config['layouts']['base']
for key in ('alignment', 'block_size', 'fs_block_size'):
metadata[key] = int(metadata[key])
except KeyError as e:
raise InvalidLayout('Metadata is missing required entries: %s' % e)
# Sometimes qemu-img expects disks sizes aligned to 64k
align_bytes = metadata['alignment'] * metadata['block_size']
if align_bytes < 65536 or align_bytes % 65536 != 0:
raise InvalidLayout('Invalid alignment, 64KB or better required')
def VerifyLayout(layout_name, layout, base=None):
for part_num, part in layout.iteritems():
part['num'] = int(part_num)
part_keys = set(part.iterkeys())
unknown_keys = part_keys - valid_layout_keys
if unknown_keys:
raise InvalidLayout('Unknown items in partition %s %s: %r' %
(layout_name, part_num, ' '.join(unknown_keys)))
for int_key in integer_layout_keys.intersection(part_keys):
part[int_key] = int(part[int_key])
if part.get('type', None) == 'blank':
continue
if base:
part_keys.update(base.iterkeys())
missing_keys = required_layout_keys - part_keys
if missing_keys:
raise InvalidLayout('Missing items in partition %s %s: %s' %
(layout_name, part_num, ' '.join(missing_keys)))
if 'uuid' in part:
try:
# double check the string formatting
part['uuid'] = str(uuid.UUID(part['uuid']))
except ValueError as e:
raise InvalidLayout('Invalid uuid %r: %s' % (part['uuid'], e))
if 'fs_type' in part:
if part['fs_type'] not in ('ext2', 'ext4', 'vfat'):
raise InvalidLayout('Invalid fs_type: %r' % part['fs_type'])
def Align(count, alignment):
offset = count % alignment
if offset:
count += alignment - offset
return count
def FillExtraValues(layout_name, layout, base=None):
# Reserved size for first GPT
disk_block_count = GPT_RESERVED_SECTORS
# Fill in default values from base,
# dict doesn't have a update()+setdefault() method so this looks tedious
if base:
for part_num, base_part in base.iteritems():
part = layout.setdefault(part_num, {})
for base_key, base_value in base_part.iteritems():
part.setdefault(base_key, base_value)
for part_num, part in layout.iteritems():
if part['type'] == 'blank':
continue
part.setdefault('alignment', metadata['alignment'])
part['bytes'] = part['blocks'] * metadata['block_size']
part.setdefault('fs_block_size', metadata['fs_block_size'])
part.setdefault('fs_blocks', part['bytes'] // part['fs_block_size'])
part['fs_bytes'] = part['fs_blocks'] * part['fs_block_size']
if part['fs_bytes'] > part['bytes']:
raise InvalidLayout(
'Filesystem may not be larger than partition: %s %s: %d > %d' %
(layout_name, part_num, part['fs_bytes'], part['bytes']))
disk_block_count = Align(disk_block_count, part['alignment'])
part['first_block'] = disk_block_count
part['first_byte'] = disk_block_count * metadata['block_size']
disk_block_count += part['blocks']
part.setdefault('uuid', str(uuid.uuid4()))
# Reserved size for second GPT plus align disk image size
disk_block_count += GPT_RESERVED_SECTORS
disk_block_count = Align(disk_block_count, metadata['alignment'])
# If this is the requested layout stash the disk size into the global
# metadata. Kinda odd but the best place I've got with this data structure.
if layout_name == options.disk_layout:
metadata['blocks'] = disk_block_count
# Verify 'base' before other layouts because it is inherited by the others
# Fill in extra/default values in base last so they aren't inherited
VerifyLayout('base', base)
for layout_name, layout in config['layouts'].iteritems():
if layout_name == 'base':
continue
VerifyLayout(layout_name, layout, base)
FillExtraValues(layout_name, layout, base)
FillExtraValues('base', base)
return config, config['layouts'][options.disk_layout]
def GetPartitionTableFromConfig(options):
"""Loads a partition table and returns a given partition table type
Args:
options: Flags passed to the script
Returns:
A list defining all known partitions.
"""
config, partitions = LoadPartitionConfig(options)
return partitions
def WritePartitionTable(options, config=None, partitions=None):
"""Writes the given partition table to a disk image or device.
Args:
options: Flags passed to the script
config: Complete layout configuration file object
partitions: Selected layout configuration object
"""
def Cgpt(*args):
subprocess.check_call(['cgpt'] + [str(a) for a in args])
if not (config and partitions):
config, partitions = LoadPartitionConfig(options)
Cgpt('create', '-c', '-s', config['metadata']['blocks'], options.disk_image)
esp_number = None
for partition in partitions.itervalues():
if partition['type'] != 'blank':
Cgpt('add', '-i', partition['num'],
'-b', partition['first_block'],
'-s', partition['blocks'],
'-t', partition['type'],
'-l', partition['label'],
'-u', partition['uuid'],
options.disk_image)
if partition['type'] == 'efi':
esp_number = partition['num']
if esp_number is None:
raise InvalidLayout('Table does not include an EFI partition.')
if options.mbr_boot_code:
Cgpt('boot', '-p',
'-b', options.mbr_boot_code,
'-i', esp_number,
options.disk_image)
Cgpt('show', options.disk_image)
def Sudo(cmd, stdout_null=False):
"""Little wrapper around sudo with support for redirecting to /dev/null
Some tools like tune2fs don't have a quiet mode which just adds
useless noise to our build output, drowning out what may be more
interesting news.
Args:
cmd: a command and arguments to run.
stdout_null: bool to enable redirecting stdout to /dev/null.
"""
null = None
if stdout_null:
null = open('/dev/null', 'w')
try:
subprocess.check_call(['sudo'] + [str(c) for c in cmd], stdout=null)
finally:
if null:
null.close()
def FormatExt(part, device):
"""Format an ext2 or ext4 filesystem.
Args:
part: dict defining the partition
device: name of the block device to format
"""
Sudo(['mke2fs', '-q',
'-t', part['fs_type'],
'-b', part['fs_block_size'],
device,
part['fs_blocks']])
# TODO(marineam): Make more of these fs options configurable.
Sudo(['tune2fs', '-L', part['label'],
'-U', 'clear',
'-T', '20091119110000',
'-c', '0', '-i', '0', # Disable auto fsck
'-m', '0', '-r', '0', # Disable reserve blocks
'-e', 'remount-ro',
device],
stdout_null=True)
def FormatFat(part, device):
"""Format a FAT filesystem.
Args:
part: dict defining the partition
device: name of the block device to format
"""
# The block-count argument to mkfs.vfat is in units of 1k
vfat_block_size = 1024
vfat_blocks = part['bytes'] // vfat_block_size
Sudo(['mkfs.vfat', '-n', part['label'],
device,
vfat_blocks],
stdout_null=True)
def Format(options):
"""Writes the given partition table and initialize fresh filesystems.
Args:
options: Flags passed to the script
"""
# Note on using sudo: We don't really need to do this stuff as root
# but mke2fs and friends doesn't have an option to make filesystems at
# arbitrary offsets but using loop devices makes that possible.
config, partitions = LoadPartitionConfig(options)
WritePartitionTable(options, config, partitions)
for part in partitions.itervalues():
if 'fs_type' not in part:
continue
print "Formatting partition %s (%s) as %s" % (
part['num'], part['label'], part['fs_type'])
loop_dev = subprocess.check_output(['sudo', 'losetup',
'--offset', str(part['first_byte']),
'--sizelimit', str(part['bytes']),
'--find', '--show', options.disk_image])
loop_dev = loop_dev.strip()
try:
if part['fs_type'] in ('ext2', 'ext4'):
FormatExt(part, loop_dev)
elif part['fs_type'] == 'vfat':
FormatFat(part, loop_dev)
else:
raise Exception("Unhandled fs type %s" % part['fs_type'])
finally:
Sudo(['losetup', '--detach', loop_dev])
def Mount(options):
"""Mount the given disk image.
The existing partition table is used to determine what exists but the
disk layout config is used to look up mount points and binds.
Args:
options: Flags passed to the script
"""
config, partitions = LoadPartitionConfig(options)
mounts = {}
cgpt_show = subprocess.check_output(
['cgpt', 'show', '-q', options.disk_image])
for line in cgpt_show.split('\n'):
if not line.strip():
continue
fields = line.split(None, 3)
if len(fields) != 4 or not all(f.isdigit() for f in fields[:3]):
raise Exception('Invalid output from cgpt show -q: %r' % line)
first_block = int(fields[0])
blocks = int(fields[1])
part_num = fields[2]
part = partitions.get(part_num, {})
path = part.get('mount', None)
if not path or not path.startswith('/'):
continue
mounts[path] = {'path': path,
'offset': first_block * config['metadata']['block_size'],
'size': blocks * config['metadata']['block_size'],
'type': part.get('fs_type', 'auto'),
'binds': part.get('binds', {})}
rootfs = mounts.pop('/', None)
if not rootfs:
raise InvalidLayout('No partition defined to mount on /')
def DoMount(mount):
full_path = os.path.realpath(options.mount_dir + mount['path'])
mount_opts = ['loop',
'offset=%d' % mount['offset'],
'sizelimit=%d' % mount['size']]
if options.read_only:
mount_opts.append('ro')
else:
Sudo(['mkdir', '-p', full_path])
Sudo(['mount', '-t', mount['type'],
'-o', ','.join(mount_opts),
options.disk_image, full_path])
for src, dst in mount['binds'].iteritems():
# src may be relative or absolute, os.path.join handles this.
full_src = os.path.realpath(
options.mount_dir + os.path.join(mount['path'], src))
full_dst = os.path.realpath(options.mount_dir + dst)
Sudo(['mkdir', '-p', full_src, full_dst])
Sudo(['mount', '--bind', full_src, full_dst])
DoMount(rootfs)
for mount in mounts.itervalues():
DoMount(mount)
def Umount(options):
"""Unmount the given path.
Args:
options: Flags passed to the script
"""
Sudo(['umount', '--recursive', '--detach-loop', options.mount_dir])
def GetPartitionByNumber(partitions, num):
"""Given a partition table and number returns the partition object.
Args:
partitions: List of partitions to search in
num: Number of partition to find
Returns:
An object for the selected partition
"""
partition = partitions.get(str(num), None)
if not partition or partition['type'] == 'blank':
raise PartitionNotFound('Partition not found')
return partition
def GetPartitionByLabel(partitions, label):
"""Given a partition table and label returns the partition object.
Args:
partitions: List of partitions to search in
label: Label of partition to find
Returns:
An object for the selected partition
"""
for partition in partitions.itervalues():
if partition['type'] == 'blank':
continue
elif partition['label'] == label:
return partition
raise PartitionNotFound('Partition not found')
def GetBlockSize(options):
"""Returns the partition table block size.
Args:
options: Flags passed to the script
Prints:
Block size of all partitions in the layout
"""
config, partitions = LoadPartitionConfig(options)
print config['metadata']['block_size']
def GetFilesystemBlockSize(options):
"""Returns the filesystem block size.
This is used for all partitions in the table that have filesystems.
Args:
options: Flags passed to the script
Prints:
Block size of all filesystems in the layout
"""
config, partitions = LoadPartitionConfig(options)
print config['metadata']['fs_block_size']
def GetPartitionSize(options):
"""Returns the partition size of a given partition for a given layout type.
Args:
options: Flags passed to the script
Prints:
Size of selected partition in bytes
"""
partitions = GetPartitionTableFromConfig(options)
partition = GetPartitionByNumber(partitions, options.partition_num)
print partition['bytes']
def GetFilesystemSize(options):
"""Returns the filesystem size of a given partition for a given layout type.
If no filesystem size is specified, returns the partition size.
Args:
options: Flags passed to the script
Prints:
Size of selected partition filesystem in bytes
"""
partitions = GetPartitionTableFromConfig(options)
partition = GetPartitionByNumber(partitions, options.partition_num)
print partition.get('fs_bytes', partition['bytes'])
def GetLabel(options):
"""Returns the label for a given partition.
Args:
options: Flags passed to the script
Prints:
Label of selected partition, or 'UNTITLED' if none specified
"""
partitions = GetPartitionTableFromConfig(options)
partition = GetPartitionByNumber(partitions, options.partition_num)
print partition.get('label', 'UNTITLED')
def GetNum(options):
"""Returns the number for a given label.
Args:
options: Flags passed to the script
Prints:
Number of selected partition, or '-1' if there is no number
"""
partitions = GetPartitionTableFromConfig(options)
partition = GetPartitionByLabel(partitions, options.label)
print partition.get('num', '-1')
def GetUuid(options):
"""Returns the unique partition UUID for a given label.
Args:
options: Flags passed to the script
Prints:
String containing the requested UUID
"""
partitions = GetPartitionTableFromConfig(options)
partition = GetPartitionByLabel(partitions, options.label)
print partition.get('uuid', '')
def DoDebugOutput(options):
"""Prints out a human readable disk layout in on-disk order.
This will round values larger than 1MB, it's exists to quickly
visually verify a layout looks correct.
Args:
options: Flags passed to the script
"""
partitions = GetPartitionTableFromConfig(options)
for num, partition in sorted(partitions.iteritems()):
if partition['type'] != 'blank':
if partition['bytes'] < 1024 * 1024:
size = '%d bytes' % partition['bytes']
else:
size = '%d MB' % (partition['bytes'] / 1024 / 1024)
if 'fs_bytes' in partition:
if partition['fs_bytes'] < 1024 * 1024:
fs_size = '%d bytes' % partition['fs_bytes']
else:
fs_size = '%d MB' % (partition['fs_bytes'] / 1024 / 1024)
print '%s: %s - %s/%s' % (num, partition['label'], fs_size, size)
else:
print '%s: %s - %s' % (num, partition['label'], size)
else:
print '%s: blank' % num
def DoParseOnly(options):
"""Parses a layout file only, used before reading sizes to check for errors.
Args:
options: Flags passed to the script
"""
GetPartitionTableFromConfig(options)
def main(argv):
default_layout_file = os.environ.get('DISK_LAYOUT_FILE',
os.path.join(os.path.dirname(__file__), 'legacy_disk_layout.json'))
default_layout_type = os.environ.get('DISK_LAYOUT_TYPE', 'base')
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--disk_layout_file', default=default_layout_file,
help='path to disk layout json file')
parser.add_argument('--disk_layout', default=default_layout_type,
help='disk layout type from the json file')
actions = parser.add_subparsers(title='actions')
a = actions.add_parser('write_gpt', help='write gpt to new image')
a.add_argument('--mbr_boot_code',
help='path to mbr boot block, such as syslinux/gptmbr.bin')
a.add_argument('disk_image', help='path to disk image file')
a.set_defaults(func=WritePartitionTable)
a = actions.add_parser('format', help='write gpt and filesystems to image')
a.add_argument('--mbr_boot_code',
help='path to mbr boot block, such as syslinux/gptmbr.bin')
a.add_argument('disk_image', help='path to disk image file')
a.set_defaults(func=Format)
a = actions.add_parser('mount', help='mount filesystems in image')
a.add_argument('--read_only', '-r', help='mount filesystems read-only')
a.add_argument('disk_image', help='path to disk image file')
a.add_argument('mount_dir', help='path to root filesystem mount point')
a.set_defaults(func=Mount)
a = actions.add_parser('umount', help='unmount a image mount point')
a.add_argument('mount_dir', help='path to root filesystem mount point')
a.set_defaults(func=Umount)
a = actions.add_parser('readblocksize', help='get device block size')
a.set_defaults(func=GetBlockSize)
a = actions.add_parser('readfsblocksize', help='get filesystem block size')
a.set_defaults(func=GetFilesystemBlockSize)
a = actions.add_parser('readpartsize', help='get partition size')
a.add_argument('partition_num', type=int, help='partition number')
a.set_defaults(func=GetPartitionSize)
a = actions.add_parser('readfssize', help='get filesystem size')
a.add_argument('partition_num', type=int, help='partition number')
a.set_defaults(func=GetFilesystemSize)
a = actions.add_parser('readlabel', help='get partition label')
a.add_argument('partition_num', type=int, help='partition number')
a.set_defaults(func=GetLabel)
a = actions.add_parser('readnum', help='get partition number')
a.add_argument('label', help='partition label')
a.set_defaults(func=GetNum)
a = actions.add_parser('readuuid', help='get partition uuid')
a.add_argument('label', help='partition label')
a.set_defaults(func=GetUuid)
a = actions.add_parser('debug', help='dump debug output')
a.set_defaults(func=DoDebugOutput)
a = actions.add_parser('parseonly', help='validate config')
a.set_defaults(func=DoParseOnly)
options = parser.parse_args(argv[1:])
options.func(options)
if __name__ == '__main__':
main(sys.argv)