diff --git a/contrib/cloud/ali-import b/contrib/cloud/ali-import index 30d2ba763..d51a4e159 100755 --- a/contrib/cloud/ali-import +++ b/contrib/cloud/ali-import @@ -1,10 +1,10 @@ #!/usr/bin/env python3 import argparse -import base64 from collections import namedtuple from concurrent.futures import ThreadPoolExecutor, as_completed import datetime +from itertools import cycle import logging from pathlib import Path import subprocess @@ -25,9 +25,6 @@ import alibabacloud_tea_openapi.models import alibabacloud_tea_util as util import alibabacloud_tea_util.client import alibabacloud_tea_util.models -import alibabacloud_vpc20160428 as vpc -import alibabacloud_vpc20160428.client -import alibabacloud_vpc20160428.models # For regions in mainland China, the Chinese state censorship laws # prohibit direct access to OSS bucket contents. @@ -40,20 +37,16 @@ logger = logging.getLogger('ali-import') ECS_ENDPOINT = 'ecs.aliyuncs.com' -OSS_FORBIDDEN_REGION_CODE = 'ForbidCreateNewBucket' OSS_BUCKET_NAME_LEN = 63 IPXE_STORAGE_PREFIX = 'ipxe-upload-temp-' IPXE_STORAGE_TAG = 'ipxe-upload-temp' -IPXE_STORAGE_DISK_CATEGORY = 'cloud_essd' -IPXE_SG_TAG = 'ipxe-default-sg' -IPXE_VSWITCH_TAG = 'ipxe-default-vswitch' +Clients = namedtuple('Clients', ['region', 'ecs', 'oss']) +Image = namedtuple('Image', + ['path', 'family', 'name', 'arch', 'mode', 'public']) -Clients = namedtuple('Clients', ['region', 'ecs', 'oss', 'vpc']) -Image = namedtuple('Image', ['path', 'family', 'name', 'arch', 'mode']) - -def image(filename, basefamily, basename): +def image(filename, basefamily, basename, public): """Construct image description""" with tempfile.NamedTemporaryFile(mode='w+t') as mtoolsrc: mtoolsrc.writelines([ @@ -76,7 +69,7 @@ def image(filename, basefamily, basename): name = '%s%s' % (basename, suffix) arch = uefi[0] if len(uefi) == 1 else None if uefi else 'x86_64' mode = 'UEFI' if uefi else 'BIOS' - return Image(path, family, name, arch, mode) + return Image(path, family, name, arch, mode, public) def all_regions(): """Get list of all regions""" @@ -94,183 +87,47 @@ def all_clients(region): ecsconf = openapi.models.Config(credential=cred, region_id=region) osscred = oss.credentials.EnvironmentVariableCredentialsProvider() ossconf = oss.config.Config(credentials_provider=osscred, region=region) - vpcconf = openapi.models.Config(credential=cred, region_id=region) clients = Clients( region=region, ecs=ecs.client.Client(ecsconf), oss=oss.client.Client(ossconf), - vpc=vpc.client.Client(vpcconf), ) return clients -def delete_temp_instance(clients, instance, retry=False): - """Remove temporary instance""" - logger.info("%s: deleting %s" % (clients.region, instance)) - while True: - req = ecs.models.DeleteInstanceRequest( - instance_id=instance, - force=True, - force_stop=True, - ) - try: - rsp = clients.ecs.delete_instance(req) - except openapi.exceptions.ClientException: - # Very recently created instances often cannot be - # terminated until some undocumented part of the control - # plane decides that enough time has elapsed - if retry: - time.sleep(1) - continue - raise - break - -def run_temp_instance_command(clients, instance, command): - """Run command on temporary instance""" - command_content=' '.join(command) - logger.info("%s: running %s" % (clients.region, command_content)) - req = ecs.models.RunCommandRequest( - region_id=clients.region, - instance_id=[instance], - type='RunShellScript', - command_content=command_content, - ) - rsp = clients.ecs.run_command(req) - invocation = rsp.body.invoke_id - while True: - time.sleep(1) - req = ecs.models.DescribeInvocationResultsRequest( - region_id=clients.region, - invoke_id=invocation, - ) - rsp = clients.ecs.describe_invocation_results(req) - result = rsp.body.invocation.invocation_results.invocation_result[0] - if result.invoke_record_status not in ('Pending', 'Running'): - break - output = base64.b64decode(result.output).decode() - if result.invocation_status != 'Success': - raise RuntimeError(output if output else result.invocation_status) - return result - -def create_temp_instance(clients, family, machine, role): - """Create temporary instance (and remove any stale temporary instances)""" - tag = ecs.models.DescribeInstancesRequestTag( - key=IPXE_STORAGE_TAG, - value=IPXE_STORAGE_TAG, - ) - req = ecs.models.DescribeInstancesRequest( - region_id=clients.region, - tag=[tag], - ) - rsp = clients.ecs.describe_instances(req) - for instance in rsp.body.instances.instance or []: - delete_temp_instance(clients, instance.instance_id) - req = ecs.models.DescribeAvailableResourceRequest( - region_id=clients.region, - destination_resource='Zone', - instance_type=machine, - ) - rsp = clients.ecs.describe_available_resource(req) - if rsp.body.available_zones is None: - # Cannot create instances in this region - logger.warning("%s: no zones support %s" % (clients.region, machine)) - return None - zone_id = next(x.zone_id - for x in rsp.body.available_zones.available_zone or [] - if x.status == 'Available') - logger.info("%s: creating %s in %s" % (clients.region, machine, zone_id)) - tag = ecs.models.DescribeSecurityGroupsRequestTag( - key=IPXE_SG_TAG, - value=IPXE_SG_TAG, - ) - req = ecs.models.DescribeSecurityGroupsRequest( - region_id=clients.region, - tag=[tag], - ) - rsp = clients.ecs.describe_security_groups(req) - sgs = rsp.body.security_groups.security_group or [] - sg_id = sgs[0].security_group_id - vpc_id = sgs[0].vpc_id - tag = vpc.models.DescribeVSwitchesRequestTag( - key=IPXE_VSWITCH_TAG, - value=IPXE_VSWITCH_TAG, - ) - req = vpc.models.DescribeVSwitchesRequest( - region_id=clients.region, - vpc_id=vpc_id, - zone_id=zone_id, - tag=[tag], - ) - rsp = clients.vpc.describe_vswitches(req) - vswitches = rsp.body.v_switches.v_switch or [] - vswitch_id = vswitches[0].v_switch_id - name = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4()) - sysdisk = ecs.models.RunInstancesRequestSystemDisk( - category=IPXE_STORAGE_DISK_CATEGORY, - ) - now = datetime.datetime.now(datetime.UTC) - lifetime = datetime.timedelta(hours=1) - release = (now + lifetime).strftime('%Y-%m-%dT%H:%M:%SZ') - tag = ecs.models.RunInstancesRequestTag( - key=IPXE_STORAGE_TAG, - value=IPXE_STORAGE_TAG, - ) - req = ecs.models.RunInstancesRequest( - region_id=clients.region, - image_family=family, - instance_type=machine, - instance_name=name, - auto_release_time=release, - ram_role_name=role, - system_disk=sysdisk, - security_group_ids=[sg_id], - v_switch_id=vswitch_id, - internet_charge_type='PayByTraffic', - internet_max_bandwidth_out=100, - tag=[tag], - ) - try: - rsp = clients.ecs.run_instances(req) - except openapi.exceptions.ClientException as exc: - if exc.code in ('RegionUnauthorized', - 'InvalidPeriod.RegionDiscontinued', - 'InvalidInstanceType.ValueNotSupported'): - logger.warning("%s: ECS lied about availability" % clients.region) - return None - raise - instance_id = rsp.body.instance_id_sets.instance_id_set[0] - logger.info("%s: created %s" % (clients.region, instance_id)) - command = ['aliyun', 'configure', 'set', '--mode', 'EcsRamRole', - '--region', clients.region] - run_temp_instance_command(clients, instance_id, command) - return instance_id - -def delete_temp_bucket(clients, instance, bucket): +def delete_temp_bucket(clients, bucket): """Remove temporary bucket""" logger.info("%s: deleting %s" % (clients.region, bucket)) assert bucket.startswith(IPXE_STORAGE_PREFIX) - command = ['aliyun', 'oss', 'rm', 'oss://%s' % bucket, - '--bucket', '--recursive', '--force', - '--endpoint', ('oss-%s-internal.aliyuncs.com' % clients.region)] - run_temp_instance_command(clients, instance, command) + req = oss.models.ListObjectsV2Request( + bucket=bucket, + prefix=IPXE_STORAGE_PREFIX, + ) + rsp = clients.oss.list_objects_v2(req) + delete = [x.key for x in rsp.contents or ()] + if delete: + req = oss.models.DeleteMultipleObjectsRequest( + bucket=bucket, + objects=[oss.models.DeleteObject(x) for x in delete], + ) + rsp = clients.oss.delete_multiple_objects(req) + req = oss.models.DeleteBucketRequest(bucket=bucket) + rsp = clients.oss.delete_bucket(req) -def create_temp_bucket(clients, instance): +def create_temp_bucket(clients): """Create temporary bucket (and remove any stale temporary buckets)""" + if clients.region.startswith('cn-'): + # Object storage is non-functional in Chinese mainland regions + # due to censorship restrictions + return None prefix = '%s%s-' % (IPXE_STORAGE_PREFIX, clients.region) req = oss.models.ListBucketsRequest(prefix=prefix) rsp = clients.oss.list_buckets(req) buckets = [x.name for x in rsp.buckets or ()] for bucket in buckets: - delete_temp_bucket(clients, instance, bucket) + delete_temp_bucket(clients, bucket) bucket = ('%s%s' % (prefix, uuid4()))[:OSS_BUCKET_NAME_LEN] req = oss.models.PutBucketRequest(bucket=bucket) - try: - rsp = clients.oss.put_bucket(req) - except oss.exceptions.OperationError as exc: - # AliCloud provides no other way to detect non-functional regions - if exc.unwrap().code == OSS_FORBIDDEN_REGION_CODE: - logger.warning("%s: non-functional OSS" % clients.region) - return None - raise exc + rsp = clients.oss.put_bucket(req) logger.info("%s: created %s" % (clients.region, bucket)) return bucket @@ -282,20 +139,8 @@ def upload_image(clients, bucket, image): rsp = clients.oss.put_object_from_file(req, image.path) return key -def copy_images(clients, instance, bucket, source): - """Copy disk images to bucket from uncensored bucket""" - logger.info("%s: syncing from %s" % (clients.region, source['bucket'])) - command = ['aliyun', 'oss', 'sync', - 'oss://%s' % source['bucket'], 'syncdir', - '--endpoint', ('oss-%s.aliyuncs.com' % source['region'])] - run_temp_instance_command(clients, instance, command) - logger.info("%s: syncing to %s" % (clients.region, bucket)) - command = ['aliyun', 'oss', 'sync', 'syncdir', 'oss://%s' % bucket, - '--endpoint', ('oss-%s-internal.aliyuncs.com' % clients.region)] - run_temp_instance_command(clients, instance, command) - -def delete_images(clients, name): - """Remove existing images""" +def delete_image(clients, name): + """Remove existing image (if applicable)""" req = ecs.models.DescribeImagesRequest( region_id=clients.region, image_name=name, @@ -318,10 +163,43 @@ def delete_images(clients, name): ) rsp = clients.ecs.delete_image(req) -def import_image(clients, image, bucket, key, public, overwrite): +def wait_for_task(clients, task_id): + """Wait for task to complete""" + while True: + time.sleep(5) + req = ecs.models.DescribeTasksRequest( + region_id=clients.region, + task_ids=task_id, + ) + rsp = clients.ecs.describe_tasks(req) + assert len(rsp.body.task_set.task) == 1 + assert rsp.body.task_set.task[0].task_id == task_id + status = rsp.body.task_set.task[0].task_status + if status not in ('Waiting', 'Processing'): + break + if status != 'Finished': + raise RuntimeError(status) + +def wait_for_image(clients, image_id): + """Wait for image to become available""" + while True: + time.sleep(5) + req = ecs.models.DescribeImagesRequest( + region_id=clients.region, + image_id=image_id, + ) + rsp = clients.ecs.describe_images(req) + if len(rsp.body.images.image): + assert len(rsp.body.images.image) == 1 + assert rsp.body.images.image[0].image_id == image_id + status = rsp.body.images.image[0].status + if status != 'Creating': + break + if status != 'Available': + raise RuntimeError(status) + +def import_image(clients, image, bucket, key): """Import image""" - if overwrite: - delete_images(clients, image.name) logger.info("%s: importing %s" % (clients.region, image.name)) disk = ecs.models.ImportImageRequestDiskDeviceMapping( disk_image_size = 1, @@ -339,34 +217,46 @@ def import_image(clients, image, bucket, key, public, overwrite): rsp = clients.ecs.import_image(req) image_id = rsp.body.image_id task_id = rsp.body.task_id - while True: - time.sleep(5) - req = ecs.models.DescribeTasksRequest( - region_id=clients.region, - task_ids=task_id, - ) - rsp = clients.ecs.describe_tasks(req) - status = rsp.body.task_set.task[0].task_status - if status not in ('Waiting', 'Processing'): - break - if status != 'Finished': - raise RuntimeError(status) + wait_for_task(clients, task_id) + wait_for_image(clients, image_id) + logger.info("%s: imported %s (%s)" % + (clients.region, image.name, image_id)) + return image_id + +def copy_image(clients, image, image_id, censored): + """Copy imported image to censored region""" + logger.info("%s: copying %s (%s) to %s" % + (clients.region, image.name, image_id, censored.region)) + req = ecs.models.CopyImageRequest( + region_id=clients.region, + image_id=image_id, + destination_region_id=censored.region, + destination_image_name=image.name, + ) + rsp = clients.ecs.copy_image(req) + copy_id = rsp.body.image_id + wait_for_image(censored, copy_id) + logger.info("%s: copied %s (%s) to %s" % + (clients.region, image.name, copy_id, censored.region)) + return copy_id + +def finalise_image(clients, image, image_id): + """Finalise image attributes and permissions""" + logger.info("%s: finalising %s (%s)" % + (clients.region, image.name, image_id)) req = ecs.models.ModifyImageAttributeRequest( region_id=clients.region, image_id=image_id, image_family=image.family, ) rsp = clients.ecs.modify_image_attribute(req) - if public: + if image.public: req = ecs.models.ModifyImageSharePermissionRequest( region_id=clients.region, image_id=image_id, is_public=True, ) rsp = clients.ecs.modify_image_share_permission(req) - logger.info("%s: imported %s (%s)" % - (clients.region, image.name, image_id)) - return image_id # Parse command-line arguments parser = argparse.ArgumentParser(description="Import Alibaba Cloud image") @@ -376,18 +266,11 @@ parser.add_argument('--name', '-n', parser.add_argument('--family', '-f', default='ipxe', help="Base family name") parser.add_argument('--public', '-p', action='store_true', - help="Make image public") + help="Make image(s) public") parser.add_argument('--overwrite', action='store_true', help="Overwrite any existing image with same name") parser.add_argument('--region', '-r', action='append', help="AliCloud region(s)") -parser.add_argument('--role', '-R', default="iPXECensorshipBypassRole", - help="AliCloud OSS censorship bypass role") -parser.add_argument('--helper-family', - default="acs:alibaba_cloud_linux_4_lts_x64", - help="Helper OS image family") -parser.add_argument('--helper-machine', default="ecs.e-c1m1.large", - help="Helper machine type") parser.add_argument('image', nargs='+', help="iPXE disk image") args = parser.parse_args() @@ -401,101 +284,93 @@ if not args.name: args.name = '%s-%s' % (args.family, datetime.date.today().strftime('%Y%m%d')) -# Construct image list -images = [image(x, args.family, args.name) for x in args.image] - # Use all regions if none specified -if not args.region: - args.region = all_regions() +regions = args.region or all_regions() + +# Construct image list +images = [image(x, args.family, args.name, args.public) for x in args.image] +imports = [(region, image) for region in regions for image in images] # Construct per-region clients -clients = {region: all_clients(region) for region in args.region} +clients = {region: all_clients(region) for region in regions} -# Create temporary instances in each region -with ThreadPoolExecutor(max_workers=len(args.region)) as executor: - futures = {executor.submit(create_temp_instance, - clients=clients[region], - family=args.helper_family, - machine=args.helper_machine, - role=args.role): region - for region in args.region} - instances = {futures[x]: x.result() for x in as_completed(futures)} +# Delete existing images from all regions, if applicable +if args.overwrite: + with ThreadPoolExecutor(max_workers=len(imports)) as executor: + futures = {executor.submit(delete_image, + clients=clients[region], + name=image.name): (region, image) + for region, image in imports} + done = {futures[x]: x.result() for x in as_completed(futures)} -# Create temporary buckets in each region (requires instance to exist) -with ThreadPoolExecutor(max_workers=len(args.region)) as executor: +# Create temporary buckets in all uncensored regions +with ThreadPoolExecutor(max_workers=len(regions)) as executor: futures = {executor.submit(create_temp_bucket, - clients=clients[region], - instance=instances[region]): region - for region in args.region - if instances[region] is not None} + clients=clients[region]): region + for region in regions} buckets = {futures[x]: x.result() for x in as_completed(futures)} +if not any(buckets.values()): + parser.error("At least one non-Chinese region is required") -# Select an uncensored region with functioning object storage -uncensored = next((k for k, v in buckets.items() - if v is not None and not k.startswith('cn-')), None) -if uncensored is None: - parser.error("At least one available uncensored region is required") - -# Upload images directly to chosen uncensored region -with ThreadPoolExecutor(max_workers=len(images)) as executor: +# Upload images directly to uncensored regions +with ThreadPoolExecutor(max_workers=len(imports)) as executor: futures = {executor.submit(upload_image, - clients=clients[uncensored], - bucket=buckets[uncensored], - image=image): image - for image in images} + clients=clients[region], + bucket=buckets[region], + image=image): (region, image) + for region, image in imports if buckets[region]} keys = {futures[x]: x.result() for x in as_completed(futures)} -# Copy images to all other regions -with ThreadPoolExecutor(max_workers=len(args.region)) as executor: - source = {'region': uncensored, 'bucket': buckets[uncensored]} - futures = {executor.submit(copy_images, - clients=clients[region], - instance=instances[region], - bucket=buckets[region], - source=source): region - for region in args.region - if instances[region] is not None and buckets[region] is not None - and region != uncensored} - done = {futures[x]: x.result() for x in as_completed(futures)} - -# Import all images -imports = [(region, image) for region in args.region for image in images] +# Import images to uncensored regions with ThreadPoolExecutor(max_workers=len(imports)) as executor: futures = {executor.submit(import_image, clients=clients[region], image=image, bucket=buckets[region], - key=keys[image], - public=args.public, - overwrite=args.overwrite): (region, image) - for region, image in imports - if instances[region] is not None and buckets[region] is not None} + key=keys[(region, image)]): (region, image) + for region, image in imports if buckets[region]} results = {futures[x]: x.result() for x in as_completed(futures)} -# Remove temporary buckets -with ThreadPoolExecutor(max_workers=len(args.region)) as executor: - futures = {executor.submit(delete_temp_bucket, +# Select source uncensored region for each copy +# +# Copies are rate-limited by source region, so spread the copies +# across all available uncensored regions. +# +copies = [(region, censored, image) for region, (censored, image) in zip( + cycle(region for region in regions if buckets[region]), + ((region, image) for region, image in imports if not buckets[region]), +)] + +# Copy images to censored regions +with ThreadPoolExecutor(max_workers=len(imports)) as executor: + futures = {executor.submit(copy_image, clients=clients[region], - instance=instances[region], - bucket=buckets[region]): region - for region in args.region - if instances[region] is not None and buckets[region] is not None} + censored=clients[censored], + image=image, + image_id=results[(region, image)]): + (censored, image) + for region, censored, image in copies} + results.update({futures[x]: x.result() for x in as_completed(futures)}) + +# Finalise images +with ThreadPoolExecutor(max_workers=len(imports)) as executor: + futures = {executor.submit(finalise_image, + clients=clients[region], + image=image, + image_id=results[(region, image)]): + (region, image) + for region, image in imports} done = {futures[x]: x.result() for x in as_completed(futures)} -# Remove temporary instances -with ThreadPoolExecutor(max_workers=len(args.region)) as executor: - futures = {executor.submit(delete_temp_instance, +# Remove temporary buckets +with ThreadPoolExecutor(max_workers=len(regions)) as executor: + futures = {executor.submit(delete_temp_bucket, clients=clients[region], - instance=instances[region], - retry=True): region - for region in args.region - if instances[region] is not None} + bucket=buckets[region]): region + for region in regions if buckets[region]} done = {futures[x]: x.result() for x in as_completed(futures)} # Show created images for region, image in imports: - mark = "(*)" if region == uncensored else "" - result = ("[no ECS]" if instances[region] is None else - "[no OSS]" if buckets[region] is None else - results[(region, image)]) - print("%s%s %s (%s) %s" % (region, mark, image.name, image.family, result)) + image_id = results[(region, image)] + print("%s %s (%s) %s" % (region, image.name, image.family, image_id))