diff --git a/contrib/cloud/ali-import b/contrib/cloud/ali-import new file mode 100755 index 000000000..995db956e --- /dev/null +++ b/contrib/cloud/ali-import @@ -0,0 +1,449 @@ +#!/usr/bin/env python3 + +import argparse +import base64 +from collections import namedtuple +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import date +import http +import io +import json +from pathlib import Path +import subprocess +import time +from uuid import uuid4 +import zipfile + +import alibabacloud_credentials as credentials +import alibabacloud_credentials.client +import alibabacloud_credentials.models +import alibabacloud_ecs20140526 as ecs +import alibabacloud_ecs20140526.client +import alibabacloud_ecs20140526.models +import alibabacloud_fc20230330 as fc +import alibabacloud_fc20230330.client +import alibabacloud_fc20230330.models +import alibabacloud_oss_v2 as oss +import alibabacloud_ram20150501 as ram +import alibabacloud_ram20150501.client +import alibabacloud_ram20150501.models +import alibabacloud_sts20150401 as sts +import alibabacloud_sts20150401.client +import alibabacloud_sts20150401.models +import alibabacloud_tea_openapi as openapi +import alibabacloud_tea_openapi.client +import alibabacloud_tea_openapi.models +import alibabacloud_tea_util as util +import alibabacloud_tea_util.client +import alibabacloud_tea_util.models + +ECS_ENDPOINT = 'ecs.aliyuncs.com' +RAM_ENDPOINT = 'ram.aliyuncs.com' +STS_ENDPOINT = 'sts.aliyuncs.com' + +FC_NODE_RUNTIME = 'nodejs20' +FC_MAX_ATTEMPTS = 5 +FC_CONNECT_TIMEOUT_MS = 10000 +FC_READ_TIMEOUT_MS = 60000 +FC_TIMEOUT_SEC = 60 +FC_MEMORY_SIZE_MB = 128 + +OSS_FORBIDDEN_REGION_CODE = 'ForbidCreateNewBucket' +OSS_BUCKET_NAME_LEN = 63 + +IPXE_STORAGE_PREFIX = 'ipxe-upload-temp-' + +# For regions in mainland China, the Chinese state censorship laws +# prohibit direct access to OSS bucket contents. +# +# We work around this restriction by creating a temporary Function +# Compute function in each region to access OSS via the internal OSS +# endpoints, which are not subject to these restrictions. Yes, this +# is somewhat absurd. +# +IPXE_CENSORSHIP_BYPASS_FUNCTION = f''' +const prefix = "{IPXE_STORAGE_PREFIX}"; +''' + ''' +const assert = require("node:assert"); +const OSS = require("ali-oss"); +exports.handler = async (event, context) => { + const payload = JSON.parse(event.toString()); + console.log(JSON.stringify(payload)); + const src = payload.source && new OSS({ + region: "oss-" + payload.source.region, + bucket: payload.source.bucket, + accessKeyId: context.credentials.accessKeyId, + accessKeySecret: context.credentials.accessKeySecret, + stsToken: context.credentials.securityToken, + }); + const dst = new OSS({ + region: "oss-" + context.region, + internal: true, + bucket: payload.bucket, + accessKeyId: context.credentials.accessKeyId, + accessKeySecret: context.credentials.accessKeySecret, + stsToken: context.credentials.securityToken, + }); + const add = payload.keys || []; + const del = ((await dst.listV2({prefix: prefix})).objects || []) + .map(x => x.name).filter(x => ! add.includes(x)); + assert(add.every(x => x.startsWith(prefix))); + assert(del.every(x => x.startsWith(prefix))); + if (add.length) + console.log("Creating: " + add.sort().join(", ")); + if (del.length) + console.log("Deleting: " + del.sort().join(", ")); + await Promise.all([ + ...add.map(async (x) => dst.putStream(x, (await src.getStream(x)).stream)), + ...(del.length ? [dst.deleteMulti(del)] : []), + ]); +}; +''' + +Clients = namedtuple('Clients', ['region', 'ecs', 'fc', 'oss']) +Image = namedtuple('Image', ['path', 'family', 'name', 'arch', 'mode']) + +def image(filename, basefamily, basename): + """Construct image description""" + mapping = { + b'BOOTX64.EFI': 'x86_64', + b'BOOTAA64.EFI': 'arm64', + } + mdir = subprocess.run(['mdir', '-b', '-i', filename, '::/EFI/BOOT'], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + check=False) + uefi = [v for k, v in mapping.items() if k in mdir.stdout] + suffix = ('-uefi-%s' % uefi[0].replace('_', '-') if len(uefi) == 1 else + '-uefi-multi' if uefi else '') + path = Path(filename) + family = '%s%s' % (basefamily, suffix) + name = '%s%s' % (basename, suffix) + arch = uefi[0] if len(uefi) == 1 else None if uefi else 'x86_64' + mode = 'UEFI' if uefi else 'BIOS' + return Image(path, family, name, arch, mode) + +def all_regions(): + """Get list of all regions""" + cred = credentials.client.Client() + conf = openapi.models.Config(credential=cred, endpoint=ECS_ENDPOINT) + client = ecs.client.Client(conf) + req = ecs.models.DescribeRegionsRequest() + rsp = client.describe_regions(req) + regions = sorted(x.region_id for x in rsp.body.regions.region) + return regions + +def account_id(): + """Get account ID""" + cred = credentials.client.Client() + conf = openapi.models.Config(credential=cred, endpoint=STS_ENDPOINT) + client = sts.client.Client(conf) + rsp = client.get_caller_identity() + return rsp.body.account_id + +def role_arn(name): + """Get role resource name""" + cred = credentials.client.Client() + conf = openapi.models.Config(credential=cred, endpoint=RAM_ENDPOINT) + client = ram.client.Client(conf) + req = ram.models.GetRoleRequest(role_name=name) + rsp = client.get_role(req) + return rsp.body.role.arn + +def all_clients(region, account): + """Construct all per-region clients""" + cred = credentials.client.Client() + ecsconf = openapi.models.Config(credential=cred, region_id=region) + fcep = '%s.%s.fc.aliyuncs.com' % (account, region) + fcconf = openapi.models.Config(credential=cred, endpoint=fcep) + osscred = oss.credentials.EnvironmentVariableCredentialsProvider() + ossconf = oss.config.Config(credentials_provider=osscred, region=region) + clients = Clients( + region=region, + ecs=ecs.client.Client(ecsconf), + fc=fc.client.Client(fcconf), + oss=oss.client.Client(ossconf), + ) + return clients + +def delete_temp_function(clients, func): + """Remove temporary function""" + assert func.startswith(IPXE_STORAGE_PREFIX) + clients.fc.delete_function(func) + +def create_temp_function(clients, role): + """Create temporary function (and remove any stale temporary functions)""" + req = fc.models.ListFunctionsRequest(prefix=IPXE_STORAGE_PREFIX) + try: + rsp = clients.fc.list_functions(req) + except openapi.client.UnretryableException: + # AliCloud provides no other way to detect non-functional regions + return None + funcs = [x.function_name for x in rsp.body.functions or ()] + for func in funcs: + delete_temp_function(clients, func) + buf = io.BytesIO() + with zipfile.ZipFile(buf, 'w') as zfh: + zfh.writestr('index.js', IPXE_CENSORSHIP_BYPASS_FUNCTION) + zf = base64.b64encode(buf.getvalue()).decode() + code = fc.models.InputCodeLocation(zip_file=zf) + func = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4()) + body = fc.models.CreateFunctionInput( + code=code, + function_name=func, + handler='index.handler', + memory_size=FC_MEMORY_SIZE_MB, + role=role, + runtime=FC_NODE_RUNTIME, + timeout=FC_TIMEOUT_SEC, + ) + req = fc.models.CreateFunctionRequest(body=body) + rsp = clients.fc.create_function(req) + return func + +def call_temp_function(clients, func, payload): + """Call temporary function""" + hdr = fc.models.InvokeFunctionHeaders( + x_fc_invocation_type='Sync', + x_fc_log_type='Tail', + ) + body = json.dumps(payload) + req = fc.models.InvokeFunctionRequest(body=body) + run = util.models.RuntimeOptions( + autoretry=True, + max_attempts=FC_MAX_ATTEMPTS, + connect_timeout=FC_CONNECT_TIMEOUT_MS, + read_timeout=FC_READ_TIMEOUT_MS, + ) + rsp = clients.fc.invoke_function_with_options(func, req, hdr, run) + log = base64.b64decode(rsp.headers.get('x-fc-log-result', b'')).decode() + if rsp.status_code != http.HTTPStatus.OK: + raise RuntimeError(rsp) + if 'x-fc-error-type' in rsp.headers: + raise RuntimeError(log) + +def delete_temp_bucket(clients, func, bucket): + """Remove temporary bucket""" + assert bucket.startswith(IPXE_STORAGE_PREFIX) + payload = {'bucket': bucket} + call_temp_function(clients, func, payload) + req = oss.models.DeleteBucketRequest(bucket=bucket) + clients.oss.delete_bucket(req) + +def create_temp_bucket(clients, func): + """Create temporary bucket (and remove any stale temporary buckets)""" + prefix = '%s%s-' % (IPXE_STORAGE_PREFIX, clients.region) + req = oss.models.ListBucketsRequest(prefix=prefix) + rsp = clients.oss.list_buckets(req) + buckets = [x.name for x in rsp.buckets or ()] + for bucket in buckets: + delete_temp_bucket(clients, func, bucket) + bucket = ('%s%s' % (prefix, uuid4()))[:OSS_BUCKET_NAME_LEN] + req = oss.models.PutBucketRequest(bucket=bucket) + try: + rsp = clients.oss.put_bucket(req) + except oss.exceptions.OperationError as exc: + # AliCloud provides no other way to detect non-functional regions + if exc.unwrap().code == OSS_FORBIDDEN_REGION_CODE: + return None + raise exc + return bucket + +def upload_image(clients, bucket, image): + """Upload disk image to uncensored bucket""" + key = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4()) + req = oss.models.PutObjectRequest(bucket=bucket, key=key) + rsp = clients.oss.put_object_from_file(req, image.path) + return key + +def copy_images(clients, func, bucket, source, keys): + """Copy disk images to bucket from uncensored bucket""" + payload = { + 'bucket': bucket, + 'source': source, + 'keys': keys, + } + call_temp_function(clients, func, payload) + +def delete_images(clients, name): + """Remove existing images""" + req = ecs.models.DescribeImagesRequest( + region_id=clients.region, + image_name=name, + image_owner_alias='self', + ) + rsp = clients.ecs.describe_images(req) + for image in rsp.body.images.image or (): + req = ecs.models.DeleteImageRequest( + region_id=clients.region, + image_id=image.image_id + ) + rsp = clients.ecs.delete_image(req) + +def import_image(clients, image, bucket, key, public, overwrite): + """Import image""" + if overwrite: + delete_images(clients, image.name) + disk = ecs.models.ImportImageRequestDiskDeviceMapping( + disk_image_size = 1, + format = 'RAW', + ossbucket = bucket, + ossobject = key, + ) + req = ecs.models.ImportImageRequest( + region_id=clients.region, + image_name=image.name, + architecture=image.arch, + boot_mode=image.mode, + disk_device_mapping=[disk], + ) + rsp = clients.ecs.import_image(req) + image_id = rsp.body.image_id + task_id = rsp.body.task_id + while True: + time.sleep(5) + req = ecs.models.DescribeTasksRequest( + region_id=clients.region, + task_ids=task_id, + ) + rsp = clients.ecs.describe_tasks(req) + status = rsp.body.task_set.task[0].task_status + if status not in ('Waiting', 'Processing'): + break + if status != 'Finished': + raise RuntimeError(status) + req = ecs.models.ModifyImageAttributeRequest( + region_id=clients.region, + image_id=image_id, + image_family=image.family, + ) + rsp = clients.ecs.modify_image_attribute(req) + if public: + req = ecs.models.ModifyImageSharePermissionRequest( + region_id=clients.region, + image_id=image_id, + is_public=True, + ) + rsp = clients.ecs.modify_image_share_permission(req) + return image_id + +# Parse command-line arguments +parser = argparse.ArgumentParser(description="Import Alibaba Cloud image") +parser.add_argument('--name', '-n', + help="Base image name") +parser.add_argument('--family', '-f', default='ipxe', + help="Base family name") +parser.add_argument('--public', '-p', action='store_true', + help="Make image public") +parser.add_argument('--overwrite', action='store_true', + help="Overwrite any existing image with same name") +parser.add_argument('--region', '-r', action='append', + help="AliCloud region(s)") +parser.add_argument('--fc-role', '-F', default="AliyunFcDefaultRole", + help="AliCloud role for censorship bypass function") +parser.add_argument('image', nargs='+', help="iPXE disk image") +args = parser.parse_args() + +# Use default name if none specified +if not args.name: + args.name = '%s-%s' % (args.family, date.today().strftime('%Y%m%d')) + +# Construct image list +images = [image(x, args.family, args.name) for x in args.image] + +# Use all regions if none specified +if not args.region: + args.region = all_regions() + +# Look up resource names +fcrole = role_arn(args.fc_role) + +# Construct per-region clients +account = account_id() +clients = {region: all_clients(region, account) for region in args.region} + +# Create temporary functions in each region +with ThreadPoolExecutor(max_workers=len(args.region)) as executor: + futures = {executor.submit(create_temp_function, + clients=clients[region], + role=fcrole): region + for region in args.region} + funcs = {futures[x]: x.result() for x in as_completed(futures)} + +# Create temporary buckets in each region (requires function to exist) +with ThreadPoolExecutor(max_workers=len(args.region)) as executor: + futures = {executor.submit(create_temp_bucket, + clients=clients[region], + func=funcs[region]): region + for region in args.region + if funcs[region] is not None} + buckets = {futures[x]: x.result() for x in as_completed(futures)} + +# Select an uncensored region with functioning object storage +uncensored = next((k for k, v in buckets.items() + if v is not None and not k.startswith('cn-')), None) +if uncensored is None: + parser.error("At least one available uncensored region is required") + +# Upload images directly to chosen uncensored region +with ThreadPoolExecutor(max_workers=len(images)) as executor: + futures = {executor.submit(upload_image, + clients=clients[uncensored], + bucket=buckets[uncensored], + image=image): image + for image in images} + keys = {futures[x]: x.result() for x in as_completed(futures)} + +# Copy images to all other regions +with ThreadPoolExecutor(max_workers=len(args.region)) as executor: + source = {'region': uncensored, 'bucket': buckets[uncensored]} + futures = {executor.submit(copy_images, + clients=clients[region], + func=funcs[region], + bucket=buckets[region], + source=source, + keys=list(keys.values())): region + for region in args.region + if funcs[region] is not None and buckets[region] is not None + and region != uncensored} + done = {futures[x]: x.result() for x in as_completed(futures)} + +# Import all images +imports = [(region, image) for region in args.region for image in images] +with ThreadPoolExecutor(max_workers=len(imports)) as executor: + futures = {executor.submit(import_image, + clients=clients[region], + image=image, + bucket=buckets[region], + key=keys[image], + public=args.public, + overwrite=args.overwrite): (region, image) + for region, image in imports + if funcs[region] is not None and buckets[region] is not None} + results = {futures[x]: x.result() for x in as_completed(futures)} + +# Remove temporary buckets +with ThreadPoolExecutor(max_workers=len(args.region)) as executor: + futures = {executor.submit(delete_temp_bucket, + clients=clients[region], + func=funcs[region], + bucket=buckets[region]): region + for region in args.region + if funcs[region] is not None and buckets[region] is not None} + done = {futures[x]: x.result() for x in as_completed(futures)} + +# Remove temporary functions +with ThreadPoolExecutor(max_workers=len(args.region)) as executor: + futures = {executor.submit(delete_temp_function, + clients=clients[region], + func=funcs[region]): region + for region in args.region + if funcs[region] is not None} + done = {futures[x]: x.result() for x in as_completed(futures)} + +# Show created images +for region, image in imports: + mark = "(*)" if region == uncensored else "" + result = ("[no FC]" if funcs[region] is None else + "[no OSS]" if buckets[region] is None else + results[(region, image)]) + print("%s%s %s (%s) %s" % (region, mark, image.name, image.family, result))