[cloud] Do not rely on ECS instances to import images to Alibaba Cloud

Spinning up ECS instances is supported in all ECS regions (unlike
Function Compute), but turns out to be unacceptably unreliable since
Alibaba Cloud has a very irritating tendency to fail to launch ECS
instances for a variety of spurious and unpredictable reasons.

Rewrite the censorship bypass mechanism to use the (extremely slow)
CopyImage API call to copy an imported image from an uncensored region
to a censored region.

Signed-off-by: Michael Brown <mcb30@ipxe.org>
This commit is contained in:
Michael Brown 2026-04-17 13:54:49 +01:00
parent 13ee60c198
commit 7e54e75a2f

View File

@ -1,10 +1,10 @@
#!/usr/bin/env python3
import argparse
import base64
from collections import namedtuple
from concurrent.futures import ThreadPoolExecutor, as_completed
import datetime
from itertools import cycle
import logging
from pathlib import Path
import subprocess
@ -25,9 +25,6 @@ import alibabacloud_tea_openapi.models
import alibabacloud_tea_util as util
import alibabacloud_tea_util.client
import alibabacloud_tea_util.models
import alibabacloud_vpc20160428 as vpc
import alibabacloud_vpc20160428.client
import alibabacloud_vpc20160428.models
# For regions in mainland China, the Chinese state censorship laws
# prohibit direct access to OSS bucket contents.
@ -40,20 +37,16 @@ logger = logging.getLogger('ali-import')
ECS_ENDPOINT = 'ecs.aliyuncs.com'
OSS_FORBIDDEN_REGION_CODE = 'ForbidCreateNewBucket'
OSS_BUCKET_NAME_LEN = 63
IPXE_STORAGE_PREFIX = 'ipxe-upload-temp-'
IPXE_STORAGE_TAG = 'ipxe-upload-temp'
IPXE_STORAGE_DISK_CATEGORY = 'cloud_essd'
IPXE_SG_TAG = 'ipxe-default-sg'
IPXE_VSWITCH_TAG = 'ipxe-default-vswitch'
Clients = namedtuple('Clients', ['region', 'ecs', 'oss'])
Image = namedtuple('Image',
['path', 'family', 'name', 'arch', 'mode', 'public'])
Clients = namedtuple('Clients', ['region', 'ecs', 'oss', 'vpc'])
Image = namedtuple('Image', ['path', 'family', 'name', 'arch', 'mode'])
def image(filename, basefamily, basename):
def image(filename, basefamily, basename, public):
"""Construct image description"""
with tempfile.NamedTemporaryFile(mode='w+t') as mtoolsrc:
mtoolsrc.writelines([
@ -76,7 +69,7 @@ def image(filename, basefamily, basename):
name = '%s%s' % (basename, suffix)
arch = uefi[0] if len(uefi) == 1 else None if uefi else 'x86_64'
mode = 'UEFI' if uefi else 'BIOS'
return Image(path, family, name, arch, mode)
return Image(path, family, name, arch, mode, public)
def all_regions():
"""Get list of all regions"""
@ -94,183 +87,47 @@ def all_clients(region):
ecsconf = openapi.models.Config(credential=cred, region_id=region)
osscred = oss.credentials.EnvironmentVariableCredentialsProvider()
ossconf = oss.config.Config(credentials_provider=osscred, region=region)
vpcconf = openapi.models.Config(credential=cred, region_id=region)
clients = Clients(
region=region,
ecs=ecs.client.Client(ecsconf),
oss=oss.client.Client(ossconf),
vpc=vpc.client.Client(vpcconf),
)
return clients
def delete_temp_instance(clients, instance, retry=False):
"""Remove temporary instance"""
logger.info("%s: deleting %s" % (clients.region, instance))
while True:
req = ecs.models.DeleteInstanceRequest(
instance_id=instance,
force=True,
force_stop=True,
)
try:
rsp = clients.ecs.delete_instance(req)
except openapi.exceptions.ClientException:
# Very recently created instances often cannot be
# terminated until some undocumented part of the control
# plane decides that enough time has elapsed
if retry:
time.sleep(1)
continue
raise
break
def run_temp_instance_command(clients, instance, command):
"""Run command on temporary instance"""
command_content=' '.join(command)
logger.info("%s: running %s" % (clients.region, command_content))
req = ecs.models.RunCommandRequest(
region_id=clients.region,
instance_id=[instance],
type='RunShellScript',
command_content=command_content,
)
rsp = clients.ecs.run_command(req)
invocation = rsp.body.invoke_id
while True:
time.sleep(1)
req = ecs.models.DescribeInvocationResultsRequest(
region_id=clients.region,
invoke_id=invocation,
)
rsp = clients.ecs.describe_invocation_results(req)
result = rsp.body.invocation.invocation_results.invocation_result[0]
if result.invoke_record_status not in ('Pending', 'Running'):
break
output = base64.b64decode(result.output).decode()
if result.invocation_status != 'Success':
raise RuntimeError(output if output else result.invocation_status)
return result
def create_temp_instance(clients, family, machine, role):
"""Create temporary instance (and remove any stale temporary instances)"""
tag = ecs.models.DescribeInstancesRequestTag(
key=IPXE_STORAGE_TAG,
value=IPXE_STORAGE_TAG,
)
req = ecs.models.DescribeInstancesRequest(
region_id=clients.region,
tag=[tag],
)
rsp = clients.ecs.describe_instances(req)
for instance in rsp.body.instances.instance or []:
delete_temp_instance(clients, instance.instance_id)
req = ecs.models.DescribeAvailableResourceRequest(
region_id=clients.region,
destination_resource='Zone',
instance_type=machine,
)
rsp = clients.ecs.describe_available_resource(req)
if rsp.body.available_zones is None:
# Cannot create instances in this region
logger.warning("%s: no zones support %s" % (clients.region, machine))
return None
zone_id = next(x.zone_id
for x in rsp.body.available_zones.available_zone or []
if x.status == 'Available')
logger.info("%s: creating %s in %s" % (clients.region, machine, zone_id))
tag = ecs.models.DescribeSecurityGroupsRequestTag(
key=IPXE_SG_TAG,
value=IPXE_SG_TAG,
)
req = ecs.models.DescribeSecurityGroupsRequest(
region_id=clients.region,
tag=[tag],
)
rsp = clients.ecs.describe_security_groups(req)
sgs = rsp.body.security_groups.security_group or []
sg_id = sgs[0].security_group_id
vpc_id = sgs[0].vpc_id
tag = vpc.models.DescribeVSwitchesRequestTag(
key=IPXE_VSWITCH_TAG,
value=IPXE_VSWITCH_TAG,
)
req = vpc.models.DescribeVSwitchesRequest(
region_id=clients.region,
vpc_id=vpc_id,
zone_id=zone_id,
tag=[tag],
)
rsp = clients.vpc.describe_vswitches(req)
vswitches = rsp.body.v_switches.v_switch or []
vswitch_id = vswitches[0].v_switch_id
name = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4())
sysdisk = ecs.models.RunInstancesRequestSystemDisk(
category=IPXE_STORAGE_DISK_CATEGORY,
)
now = datetime.datetime.now(datetime.UTC)
lifetime = datetime.timedelta(hours=1)
release = (now + lifetime).strftime('%Y-%m-%dT%H:%M:%SZ')
tag = ecs.models.RunInstancesRequestTag(
key=IPXE_STORAGE_TAG,
value=IPXE_STORAGE_TAG,
)
req = ecs.models.RunInstancesRequest(
region_id=clients.region,
image_family=family,
instance_type=machine,
instance_name=name,
auto_release_time=release,
ram_role_name=role,
system_disk=sysdisk,
security_group_ids=[sg_id],
v_switch_id=vswitch_id,
internet_charge_type='PayByTraffic',
internet_max_bandwidth_out=100,
tag=[tag],
)
try:
rsp = clients.ecs.run_instances(req)
except openapi.exceptions.ClientException as exc:
if exc.code in ('RegionUnauthorized',
'InvalidPeriod.RegionDiscontinued',
'InvalidInstanceType.ValueNotSupported'):
logger.warning("%s: ECS lied about availability" % clients.region)
return None
raise
instance_id = rsp.body.instance_id_sets.instance_id_set[0]
logger.info("%s: created %s" % (clients.region, instance_id))
command = ['aliyun', 'configure', 'set', '--mode', 'EcsRamRole',
'--region', clients.region]
run_temp_instance_command(clients, instance_id, command)
return instance_id
def delete_temp_bucket(clients, instance, bucket):
def delete_temp_bucket(clients, bucket):
"""Remove temporary bucket"""
logger.info("%s: deleting %s" % (clients.region, bucket))
assert bucket.startswith(IPXE_STORAGE_PREFIX)
command = ['aliyun', 'oss', 'rm', 'oss://%s' % bucket,
'--bucket', '--recursive', '--force',
'--endpoint', ('oss-%s-internal.aliyuncs.com' % clients.region)]
run_temp_instance_command(clients, instance, command)
req = oss.models.ListObjectsV2Request(
bucket=bucket,
prefix=IPXE_STORAGE_PREFIX,
)
rsp = clients.oss.list_objects_v2(req)
delete = [x.key for x in rsp.contents or ()]
if delete:
req = oss.models.DeleteMultipleObjectsRequest(
bucket=bucket,
objects=[oss.models.DeleteObject(x) for x in delete],
)
rsp = clients.oss.delete_multiple_objects(req)
req = oss.models.DeleteBucketRequest(bucket=bucket)
rsp = clients.oss.delete_bucket(req)
def create_temp_bucket(clients, instance):
def create_temp_bucket(clients):
"""Create temporary bucket (and remove any stale temporary buckets)"""
if clients.region.startswith('cn-'):
# Object storage is non-functional in Chinese mainland regions
# due to censorship restrictions
return None
prefix = '%s%s-' % (IPXE_STORAGE_PREFIX, clients.region)
req = oss.models.ListBucketsRequest(prefix=prefix)
rsp = clients.oss.list_buckets(req)
buckets = [x.name for x in rsp.buckets or ()]
for bucket in buckets:
delete_temp_bucket(clients, instance, bucket)
delete_temp_bucket(clients, bucket)
bucket = ('%s%s' % (prefix, uuid4()))[:OSS_BUCKET_NAME_LEN]
req = oss.models.PutBucketRequest(bucket=bucket)
try:
rsp = clients.oss.put_bucket(req)
except oss.exceptions.OperationError as exc:
# AliCloud provides no other way to detect non-functional regions
if exc.unwrap().code == OSS_FORBIDDEN_REGION_CODE:
logger.warning("%s: non-functional OSS" % clients.region)
return None
raise exc
rsp = clients.oss.put_bucket(req)
logger.info("%s: created %s" % (clients.region, bucket))
return bucket
@ -282,20 +139,8 @@ def upload_image(clients, bucket, image):
rsp = clients.oss.put_object_from_file(req, image.path)
return key
def copy_images(clients, instance, bucket, source):
"""Copy disk images to bucket from uncensored bucket"""
logger.info("%s: syncing from %s" % (clients.region, source['bucket']))
command = ['aliyun', 'oss', 'sync',
'oss://%s' % source['bucket'], 'syncdir',
'--endpoint', ('oss-%s.aliyuncs.com' % source['region'])]
run_temp_instance_command(clients, instance, command)
logger.info("%s: syncing to %s" % (clients.region, bucket))
command = ['aliyun', 'oss', 'sync', 'syncdir', 'oss://%s' % bucket,
'--endpoint', ('oss-%s-internal.aliyuncs.com' % clients.region)]
run_temp_instance_command(clients, instance, command)
def delete_images(clients, name):
"""Remove existing images"""
def delete_image(clients, name):
"""Remove existing image (if applicable)"""
req = ecs.models.DescribeImagesRequest(
region_id=clients.region,
image_name=name,
@ -318,10 +163,43 @@ def delete_images(clients, name):
)
rsp = clients.ecs.delete_image(req)
def import_image(clients, image, bucket, key, public, overwrite):
def wait_for_task(clients, task_id):
"""Wait for task to complete"""
while True:
time.sleep(5)
req = ecs.models.DescribeTasksRequest(
region_id=clients.region,
task_ids=task_id,
)
rsp = clients.ecs.describe_tasks(req)
assert len(rsp.body.task_set.task) == 1
assert rsp.body.task_set.task[0].task_id == task_id
status = rsp.body.task_set.task[0].task_status
if status not in ('Waiting', 'Processing'):
break
if status != 'Finished':
raise RuntimeError(status)
def wait_for_image(clients, image_id):
"""Wait for image to become available"""
while True:
time.sleep(5)
req = ecs.models.DescribeImagesRequest(
region_id=clients.region,
image_id=image_id,
)
rsp = clients.ecs.describe_images(req)
if len(rsp.body.images.image):
assert len(rsp.body.images.image) == 1
assert rsp.body.images.image[0].image_id == image_id
status = rsp.body.images.image[0].status
if status != 'Creating':
break
if status != 'Available':
raise RuntimeError(status)
def import_image(clients, image, bucket, key):
"""Import image"""
if overwrite:
delete_images(clients, image.name)
logger.info("%s: importing %s" % (clients.region, image.name))
disk = ecs.models.ImportImageRequestDiskDeviceMapping(
disk_image_size = 1,
@ -339,34 +217,46 @@ def import_image(clients, image, bucket, key, public, overwrite):
rsp = clients.ecs.import_image(req)
image_id = rsp.body.image_id
task_id = rsp.body.task_id
while True:
time.sleep(5)
req = ecs.models.DescribeTasksRequest(
region_id=clients.region,
task_ids=task_id,
)
rsp = clients.ecs.describe_tasks(req)
status = rsp.body.task_set.task[0].task_status
if status not in ('Waiting', 'Processing'):
break
if status != 'Finished':
raise RuntimeError(status)
wait_for_task(clients, task_id)
wait_for_image(clients, image_id)
logger.info("%s: imported %s (%s)" %
(clients.region, image.name, image_id))
return image_id
def copy_image(clients, image, image_id, censored):
"""Copy imported image to censored region"""
logger.info("%s: copying %s (%s) to %s" %
(clients.region, image.name, image_id, censored.region))
req = ecs.models.CopyImageRequest(
region_id=clients.region,
image_id=image_id,
destination_region_id=censored.region,
destination_image_name=image.name,
)
rsp = clients.ecs.copy_image(req)
copy_id = rsp.body.image_id
wait_for_image(censored, copy_id)
logger.info("%s: copied %s (%s) to %s" %
(clients.region, image.name, copy_id, censored.region))
return copy_id
def finalise_image(clients, image, image_id):
"""Finalise image attributes and permissions"""
logger.info("%s: finalising %s (%s)" %
(clients.region, image.name, image_id))
req = ecs.models.ModifyImageAttributeRequest(
region_id=clients.region,
image_id=image_id,
image_family=image.family,
)
rsp = clients.ecs.modify_image_attribute(req)
if public:
if image.public:
req = ecs.models.ModifyImageSharePermissionRequest(
region_id=clients.region,
image_id=image_id,
is_public=True,
)
rsp = clients.ecs.modify_image_share_permission(req)
logger.info("%s: imported %s (%s)" %
(clients.region, image.name, image_id))
return image_id
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Import Alibaba Cloud image")
@ -376,18 +266,11 @@ parser.add_argument('--name', '-n',
parser.add_argument('--family', '-f', default='ipxe',
help="Base family name")
parser.add_argument('--public', '-p', action='store_true',
help="Make image public")
help="Make image(s) public")
parser.add_argument('--overwrite', action='store_true',
help="Overwrite any existing image with same name")
parser.add_argument('--region', '-r', action='append',
help="AliCloud region(s)")
parser.add_argument('--role', '-R', default="iPXECensorshipBypassRole",
help="AliCloud OSS censorship bypass role")
parser.add_argument('--helper-family',
default="acs:alibaba_cloud_linux_4_lts_x64",
help="Helper OS image family")
parser.add_argument('--helper-machine', default="ecs.e-c1m1.large",
help="Helper machine type")
parser.add_argument('image', nargs='+', help="iPXE disk image")
args = parser.parse_args()
@ -401,101 +284,93 @@ if not args.name:
args.name = '%s-%s' % (args.family,
datetime.date.today().strftime('%Y%m%d'))
# Construct image list
images = [image(x, args.family, args.name) for x in args.image]
# Use all regions if none specified
if not args.region:
args.region = all_regions()
regions = args.region or all_regions()
# Construct image list
images = [image(x, args.family, args.name, args.public) for x in args.image]
imports = [(region, image) for region in regions for image in images]
# Construct per-region clients
clients = {region: all_clients(region) for region in args.region}
clients = {region: all_clients(region) for region in regions}
# Create temporary instances in each region
with ThreadPoolExecutor(max_workers=len(args.region)) as executor:
futures = {executor.submit(create_temp_instance,
clients=clients[region],
family=args.helper_family,
machine=args.helper_machine,
role=args.role): region
for region in args.region}
instances = {futures[x]: x.result() for x in as_completed(futures)}
# Delete existing images from all regions, if applicable
if args.overwrite:
with ThreadPoolExecutor(max_workers=len(imports)) as executor:
futures = {executor.submit(delete_image,
clients=clients[region],
name=image.name): (region, image)
for region, image in imports}
done = {futures[x]: x.result() for x in as_completed(futures)}
# Create temporary buckets in each region (requires instance to exist)
with ThreadPoolExecutor(max_workers=len(args.region)) as executor:
# Create temporary buckets in all uncensored regions
with ThreadPoolExecutor(max_workers=len(regions)) as executor:
futures = {executor.submit(create_temp_bucket,
clients=clients[region],
instance=instances[region]): region
for region in args.region
if instances[region] is not None}
clients=clients[region]): region
for region in regions}
buckets = {futures[x]: x.result() for x in as_completed(futures)}
if not any(buckets.values()):
parser.error("At least one non-Chinese region is required")
# Select an uncensored region with functioning object storage
uncensored = next((k for k, v in buckets.items()
if v is not None and not k.startswith('cn-')), None)
if uncensored is None:
parser.error("At least one available uncensored region is required")
# Upload images directly to chosen uncensored region
with ThreadPoolExecutor(max_workers=len(images)) as executor:
# Upload images directly to uncensored regions
with ThreadPoolExecutor(max_workers=len(imports)) as executor:
futures = {executor.submit(upload_image,
clients=clients[uncensored],
bucket=buckets[uncensored],
image=image): image
for image in images}
clients=clients[region],
bucket=buckets[region],
image=image): (region, image)
for region, image in imports if buckets[region]}
keys = {futures[x]: x.result() for x in as_completed(futures)}
# Copy images to all other regions
with ThreadPoolExecutor(max_workers=len(args.region)) as executor:
source = {'region': uncensored, 'bucket': buckets[uncensored]}
futures = {executor.submit(copy_images,
clients=clients[region],
instance=instances[region],
bucket=buckets[region],
source=source): region
for region in args.region
if instances[region] is not None and buckets[region] is not None
and region != uncensored}
done = {futures[x]: x.result() for x in as_completed(futures)}
# Import all images
imports = [(region, image) for region in args.region for image in images]
# Import images to uncensored regions
with ThreadPoolExecutor(max_workers=len(imports)) as executor:
futures = {executor.submit(import_image,
clients=clients[region],
image=image,
bucket=buckets[region],
key=keys[image],
public=args.public,
overwrite=args.overwrite): (region, image)
for region, image in imports
if instances[region] is not None and buckets[region] is not None}
key=keys[(region, image)]): (region, image)
for region, image in imports if buckets[region]}
results = {futures[x]: x.result() for x in as_completed(futures)}
# Remove temporary buckets
with ThreadPoolExecutor(max_workers=len(args.region)) as executor:
futures = {executor.submit(delete_temp_bucket,
# Select source uncensored region for each copy
#
# Copies are rate-limited by source region, so spread the copies
# across all available uncensored regions.
#
copies = [(region, censored, image) for region, (censored, image) in zip(
cycle(region for region in regions if buckets[region]),
((region, image) for region, image in imports if not buckets[region]),
)]
# Copy images to censored regions
with ThreadPoolExecutor(max_workers=len(imports)) as executor:
futures = {executor.submit(copy_image,
clients=clients[region],
instance=instances[region],
bucket=buckets[region]): region
for region in args.region
if instances[region] is not None and buckets[region] is not None}
censored=clients[censored],
image=image,
image_id=results[(region, image)]):
(censored, image)
for region, censored, image in copies}
results.update({futures[x]: x.result() for x in as_completed(futures)})
# Finalise images
with ThreadPoolExecutor(max_workers=len(imports)) as executor:
futures = {executor.submit(finalise_image,
clients=clients[region],
image=image,
image_id=results[(region, image)]):
(region, image)
for region, image in imports}
done = {futures[x]: x.result() for x in as_completed(futures)}
# Remove temporary instances
with ThreadPoolExecutor(max_workers=len(args.region)) as executor:
futures = {executor.submit(delete_temp_instance,
# Remove temporary buckets
with ThreadPoolExecutor(max_workers=len(regions)) as executor:
futures = {executor.submit(delete_temp_bucket,
clients=clients[region],
instance=instances[region],
retry=True): region
for region in args.region
if instances[region] is not None}
bucket=buckets[region]): region
for region in regions if buckets[region]}
done = {futures[x]: x.result() for x in as_completed(futures)}
# Show created images
for region, image in imports:
mark = "(*)" if region == uncensored else ""
result = ("[no ECS]" if instances[region] is None else
"[no OSS]" if buckets[region] is None else
results[(region, image)])
print("%s%s %s (%s) %s" % (region, mark, image.name, image.family, result))
image_id = results[(region, image)]
print("%s %s (%s) %s" % (region, image.name, image.family, image_id))