mirror of
https://github.com/ipxe/ipxe.git
synced 2026-04-22 22:21:23 +02:00
Experimentation suggests Alibaba Cloud API calls are extremely unreliable, with a failure rate around 1%. It is therefore necessary to allow for retrying basically every API call. Some API calls (e.g. DescribeImages or ModifyImageAttribute) are naturally idempotent and so safe to retry. Some non-idempotent API calls (e.g. CopyImage) support explicit idempotence tokens. The remaining API calls may simply fail on a retry, if the original request happened to succeed but failed to return a response. We could write convoluted retry logic around the non-idempotent calls, but this would substantially increase the complexity of the already unnecessarily complex code. For now, we assume that retrying non-idempotent requests is probably more likely to fix transient failures than to cause additional problems. Signed-off-by: Michael Brown <mcb30@ipxe.org>
664 lines
26 KiB
Python
Executable File
664 lines
26 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
#
|
|
# Doing anything in Alibaba Cloud is unnecessarily difficult and
|
|
# tedious due to a combination of poor and inconsistent API design,
|
|
# high API call failure rates, and Chinese state censorship laws.
|
|
#
|
|
# We resort to a mixture of strategies to get images imported to all
|
|
# regions:
|
|
#
|
|
# - For regions with working OSS that are not blocked by Chinese
|
|
# state censorship laws, upload the image files to an OSS bucket
|
|
# and then import the images.
|
|
#
|
|
# - For regions with working OSS that are blocked by Chinese state
|
|
# censorship laws but that have working FC, use a temporary FC
|
|
# function to copy the image files from the uncensored OSS buckets
|
|
# and then import the images. Attempt downloads from a variety of
|
|
# uncensored buckets, since cross-region OSS traffic tends to
|
|
# experience a failure rate of around 10% of requests.
|
|
#
|
|
# - For regions that have working OSS but are blocked by Chinese
|
|
# state censorship laws and do not have working FC, or for regions
|
|
# that don't even have working OSS, resort to using CopyImage to
|
|
# copy the previously imported images from another region. Spread
|
|
# the imports across as many source regions as possible to
|
|
# minimise the effect of the CopyImage rate limiting.
|
|
|
|
import argparse
|
|
import base64
|
|
from collections import namedtuple
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import datetime
|
|
import http
|
|
import io
|
|
from itertools import cycle, groupby
|
|
import json
|
|
import logging
|
|
from operator import itemgetter
|
|
from pathlib import Path
|
|
import random
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
from uuid import uuid4
|
|
import zipfile
|
|
|
|
import alibabacloud_credentials as credentials
|
|
import alibabacloud_credentials.client
|
|
import alibabacloud_credentials.models
|
|
import alibabacloud_ecs20140526 as ecs
|
|
import alibabacloud_ecs20140526.client
|
|
import alibabacloud_ecs20140526.models
|
|
import alibabacloud_fc20230330 as fc
|
|
import alibabacloud_fc20230330.client
|
|
import alibabacloud_fc20230330.models
|
|
import alibabacloud_oss_v2 as oss
|
|
import alibabacloud_ram20150501 as ram
|
|
import alibabacloud_ram20150501.client
|
|
import alibabacloud_ram20150501.models
|
|
import alibabacloud_sts20150401 as sts
|
|
import alibabacloud_sts20150401.client
|
|
import alibabacloud_sts20150401.models
|
|
import alibabacloud_tea_openapi as openapi
|
|
import alibabacloud_tea_openapi.client
|
|
import alibabacloud_tea_openapi.models
|
|
import alibabacloud_tea_util as util
|
|
import alibabacloud_tea_util.client
|
|
import alibabacloud_tea_util.models
|
|
|
|
logger = logging.getLogger('ali-import')
|
|
|
|
ECS_ENDPOINT = 'ecs.aliyuncs.com'
|
|
RAM_ENDPOINT = 'ram.aliyuncs.com'
|
|
STS_ENDPOINT = 'sts.aliyuncs.com'
|
|
|
|
FC_NODE_RUNTIME = 'nodejs20'
|
|
FC_TIMEOUT_SEC = 120
|
|
FC_MEMORY_SIZE_MB = 128
|
|
FC_SOURCE_COUNT = 10
|
|
|
|
OSS_FORBIDDEN_REGION_CODE = 'ForbidCreateNewBucket'
|
|
OSS_BUCKET_NAME_LEN = 63
|
|
|
|
IPXE_STORAGE_PREFIX = 'ipxe-upload-temp-'
|
|
|
|
POLL_INTERVAL_SEC = 5
|
|
POLL_MAX_RETRIES = 100
|
|
|
|
# Experimentation suggests Alibaba Cloud API calls are extremely
|
|
# unreliable, with a failure rate around 1%. It is therefore
|
|
# necessary to allow for retrying basically every API call.
|
|
#
|
|
# Some API calls (e.g. DescribeImages or ModifyImageAttribute) are
|
|
# naturally idempotent and so safe to retry. Some non-idempotent API
|
|
# calls (e.g. CopyImage) support explicit idempotence tokens. The
|
|
# remaining API calls may simply fail on a retry, if the original
|
|
# request happened to succeed but failed to return a response.
|
|
#
|
|
# We could write convoluted retry logic around the non-idempotent
|
|
# calls, but this would substantially increase the complexity of the
|
|
# already unnecessarily complex code. For now, we assume that
|
|
# retrying non-idempotent requests is probably more likely to fix
|
|
# transient failures than to cause additional problems.
|
|
#
|
|
RUNTIME_OPTS = util.models.RuntimeOptions(
|
|
autoretry=True,
|
|
max_attempts=5,
|
|
connect_timeout=10000,
|
|
read_timeout=120000,
|
|
)
|
|
|
|
# For regions in mainland China, the Chinese state censorship laws
|
|
# prohibit direct access to OSS bucket contents.
|
|
#
|
|
# We work around this restriction by creating a temporary Function
|
|
# Compute function in each region to access OSS via the internal OSS
|
|
# endpoints, which are not subject to these restrictions. Yes, this
|
|
# is somewhat absurd.
|
|
#
|
|
IPXE_CENSORSHIP_BYPASS_FUNCTION = f'''
|
|
const prefix = "{IPXE_STORAGE_PREFIX}";
|
|
''' + '''
|
|
const assert = require("node:assert");
|
|
const OSS = require("ali-oss");
|
|
exports.handler = async (event, context) => {
|
|
const payload = JSON.parse(event.toString());
|
|
console.log(JSON.stringify(payload));
|
|
const sources = payload.sources || {};
|
|
const dest = new OSS({
|
|
region: "oss-" + context.region,
|
|
internal: true,
|
|
bucket: payload.bucket,
|
|
accessKeyId: context.credentials.accessKeyId,
|
|
accessKeySecret: context.credentials.accessKeySecret,
|
|
stsToken: context.credentials.securityToken,
|
|
});
|
|
const current = ((await dest.listV2({prefix: prefix})).objects || [])
|
|
.map(x => x.name);
|
|
const wanted = Object.keys(sources);
|
|
const add = wanted.filter(x => ! current.includes(x));
|
|
const del = current.filter(x => ! wanted.includes(x));
|
|
assert(add.every(x => x.startsWith(prefix)));
|
|
assert(del.every(x => x.startsWith(prefix)));
|
|
if (add.length)
|
|
console.log("Creating: " + add.sort().join(", "));
|
|
if (del.length)
|
|
console.log("Deleting: " + del.sort().join(", "));
|
|
const copy = async (key) => {
|
|
for (const url of sources[key]) {
|
|
console.log("Downloading " + key + " from " + url);
|
|
try {
|
|
const download = await fetch(url, {signal: AbortSignal.timeout(15000)});
|
|
if (! download.ok)
|
|
throw new Error(download.status);
|
|
const content = await download.arrayBuffer();
|
|
console.log("Downloaded " + key);
|
|
console.log("Uploading " + key);
|
|
await dest.put(key, Buffer.from(content));
|
|
console.log("Uploaded " + key);
|
|
return;
|
|
} catch (err) {
|
|
console.error("Download failed", err);
|
|
}
|
|
}
|
|
throw new Error("All downloads failed for " + key);
|
|
};
|
|
await Promise.all([
|
|
...add.map(copy),
|
|
...(del.length ? [dest.deleteMulti(del)] : []),
|
|
]);
|
|
console.log("Finished");
|
|
};
|
|
'''
|
|
|
|
Clients = namedtuple('Clients', ['region', 'censored', 'ecs', 'fc', 'oss'])
|
|
Image = namedtuple('Image',
|
|
['path', 'family', 'name', 'arch', 'mode', 'key', 'public'])
|
|
|
|
def image(filename, basefamily, basename, public):
|
|
"""Construct image description"""
|
|
with tempfile.NamedTemporaryFile(mode='w+t') as mtoolsrc:
|
|
mtoolsrc.writelines([
|
|
'drive D:', f'file="{filename}"',
|
|
'drive P:', f'file="{filename}"', 'partition=4',
|
|
])
|
|
mtoolsrc.flush()
|
|
mdir = subprocess.run(['mdir', '-b', 'D:/EFI/BOOT', 'P:/EFI/BOOT'],
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
check=False, env={'MTOOLSRC': mtoolsrc.name})
|
|
mapping = {
|
|
b'BOOTX64.EFI': 'x86_64',
|
|
b'BOOTAA64.EFI': 'arm64',
|
|
}
|
|
uefi = [v for k, v in mapping.items() if k in mdir.stdout]
|
|
suffix = ('-uefi-%s' % uefi[0].replace('_', '-') if len(uefi) == 1 else
|
|
'-uefi-multi' if uefi else '')
|
|
path = Path(filename)
|
|
family = '%s%s' % (basefamily, suffix)
|
|
name = '%s%s' % (basename, suffix)
|
|
arch = uefi[0] if len(uefi) == 1 else None if uefi else 'x86_64'
|
|
mode = 'UEFI' if uefi else 'BIOS'
|
|
key = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4())
|
|
return Image(path, family, name, arch, mode, key, public)
|
|
|
|
def all_regions():
|
|
"""Get list of all regions"""
|
|
cred = credentials.client.Client()
|
|
conf = openapi.models.Config(credential=cred, endpoint=ECS_ENDPOINT)
|
|
client = ecs.client.Client(conf)
|
|
req = ecs.models.DescribeRegionsRequest()
|
|
rsp = client.describe_regions(req)
|
|
regions = sorted(x.region_id for x in rsp.body.regions.region)
|
|
return regions
|
|
|
|
def account_id():
|
|
"""Get account ID"""
|
|
cred = credentials.client.Client()
|
|
conf = openapi.models.Config(credential=cred, endpoint=STS_ENDPOINT)
|
|
client = sts.client.Client(conf)
|
|
rsp = client.get_caller_identity()
|
|
return rsp.body.account_id
|
|
|
|
def role_arn(name):
|
|
"""Get role resource name"""
|
|
cred = credentials.client.Client()
|
|
conf = openapi.models.Config(credential=cred, endpoint=RAM_ENDPOINT)
|
|
client = ram.client.Client(conf)
|
|
req = ram.models.GetRoleRequest(role_name=name)
|
|
rsp = client.get_role(req)
|
|
return rsp.body.role.arn
|
|
|
|
def all_clients(region, account):
|
|
"""Construct all per-region clients"""
|
|
cred = credentials.client.Client()
|
|
ecsconf = openapi.models.Config(credential=cred, region_id=region)
|
|
fcep = '%s.%s.fc.aliyuncs.com' % (account, region)
|
|
fcconf = openapi.models.Config(credential=cred, endpoint=fcep)
|
|
osscred = oss.credentials.EnvironmentVariableCredentialsProvider()
|
|
ossconf = oss.config.Config(credentials_provider=osscred, region=region)
|
|
clients = Clients(
|
|
region=region,
|
|
censored=region.startswith('cn-'),
|
|
ecs=ecs.client.Client(ecsconf),
|
|
fc=fc.client.Client(fcconf),
|
|
oss=oss.client.Client(ossconf),
|
|
)
|
|
return clients
|
|
|
|
def delete_temp_function(clients, func):
|
|
"""Remove temporary function"""
|
|
logger.info("delete function %s %s" % (clients.region, func))
|
|
assert func.startswith(IPXE_STORAGE_PREFIX)
|
|
clients.fc.delete_function_with_options(func, {}, RUNTIME_OPTS)
|
|
|
|
def create_temp_function(clients, role):
|
|
"""Create temporary function (and remove any stale temporary functions)"""
|
|
req = fc.models.ListFunctionsRequest(prefix=IPXE_STORAGE_PREFIX)
|
|
try:
|
|
rsp = clients.fc.list_functions_with_options(req, {}, RUNTIME_OPTS)
|
|
except openapi.client.UnretryableException:
|
|
# AliCloud provides no other way to detect non-working regions
|
|
return None
|
|
funcs = [x.function_name for x in rsp.body.functions or ()]
|
|
for func in funcs:
|
|
delete_temp_function(clients, func)
|
|
if not clients.censored:
|
|
# Functions are not required in uncensored regions
|
|
return None
|
|
buf = io.BytesIO()
|
|
with zipfile.ZipFile(buf, 'w') as zfh:
|
|
zfh.writestr('index.js', IPXE_CENSORSHIP_BYPASS_FUNCTION)
|
|
zf = base64.b64encode(buf.getvalue()).decode()
|
|
code = fc.models.InputCodeLocation(zip_file=zf)
|
|
func = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4())
|
|
body = fc.models.CreateFunctionInput(
|
|
code=code,
|
|
function_name=func,
|
|
handler='index.handler',
|
|
memory_size=FC_MEMORY_SIZE_MB,
|
|
role=role,
|
|
runtime=FC_NODE_RUNTIME,
|
|
timeout=FC_TIMEOUT_SEC,
|
|
)
|
|
req = fc.models.CreateFunctionRequest(body=body)
|
|
rsp = clients.fc.create_function_with_options(req, {}, RUNTIME_OPTS)
|
|
logger.info("create function %s %s" % (clients.region, func))
|
|
return func
|
|
|
|
def call_temp_function(clients, func, payload):
|
|
"""Call temporary function"""
|
|
hdr = fc.models.InvokeFunctionHeaders(
|
|
x_fc_invocation_type='Sync',
|
|
x_fc_log_type='Tail',
|
|
)
|
|
body = json.dumps(payload)
|
|
req = fc.models.InvokeFunctionRequest(body=body)
|
|
rsp = clients.fc.invoke_function_with_options(func, req, hdr, RUNTIME_OPTS)
|
|
log = base64.b64decode(rsp.headers.get('x-fc-log-result', b'')).decode()
|
|
if rsp.status_code != http.HTTPStatus.OK:
|
|
raise RuntimeError(rsp)
|
|
if 'x-fc-error-type' in rsp.headers:
|
|
raise RuntimeError(log)
|
|
|
|
def delete_temp_bucket(clients, func, bucket):
|
|
"""Remove temporary bucket"""
|
|
logger.info("delete bucket %s %s" % (clients.region, bucket))
|
|
assert bucket.startswith(IPXE_STORAGE_PREFIX)
|
|
# Delete bucket contents
|
|
if not clients.censored:
|
|
# Uncensored region: use OSS API calls to delete contents
|
|
req = oss.models.ListObjectsV2Request(
|
|
bucket=bucket,
|
|
prefix=IPXE_STORAGE_PREFIX,
|
|
)
|
|
rsp = clients.oss.list_objects_v2(req)
|
|
delete = [x.key for x in rsp.contents or ()]
|
|
if delete:
|
|
req = oss.models.DeleteMultipleObjectsRequest(
|
|
bucket=bucket,
|
|
objects=[oss.models.DeleteObject(x) for x in delete],
|
|
)
|
|
rsp = clients.oss.delete_multiple_objects(req)
|
|
elif func:
|
|
# Censored region with FC: use function to delete contents
|
|
payload = {'bucket': bucket}
|
|
call_temp_function(clients, func, payload)
|
|
else:
|
|
# Censored region without FC: assume bucket must be empty,
|
|
# since we could not have uploaded to it in the first place
|
|
pass
|
|
# Delete the now-empty bucket
|
|
req = oss.models.DeleteBucketRequest(bucket=bucket)
|
|
rsp = clients.oss.delete_bucket(req)
|
|
|
|
def create_temp_bucket(clients, func):
|
|
"""Create temporary bucket (and remove any stale temporary buckets)"""
|
|
prefix = '%s%s-' % (IPXE_STORAGE_PREFIX, clients.region)
|
|
req = oss.models.ListBucketsRequest(prefix=prefix)
|
|
rsp = clients.oss.list_buckets(req)
|
|
buckets = [x.name for x in rsp.buckets or ()]
|
|
for bucket in buckets:
|
|
delete_temp_bucket(clients, func, bucket)
|
|
if clients.censored and not func:
|
|
# We cannot use OSS in censored regions with no Function Compute
|
|
return None
|
|
bucket = ('%s%s' % (prefix, uuid4()))[:OSS_BUCKET_NAME_LEN]
|
|
req = oss.models.PutBucketRequest(bucket=bucket)
|
|
try:
|
|
rsp = clients.oss.put_bucket(req)
|
|
except oss.exceptions.OperationError as exc:
|
|
# AliCloud provides no other way to detect non-working regions
|
|
if exc.unwrap().code == OSS_FORBIDDEN_REGION_CODE:
|
|
return None
|
|
raise exc
|
|
logger.info("create bucket %s %s" % (clients.region, bucket))
|
|
return bucket
|
|
|
|
def upload_object(clients, bucket, image):
|
|
"""Upload disk image object to uncensored bucket"""
|
|
logger.info("upload %s %s" % (clients.region, image.name))
|
|
req = oss.models.PutObjectRequest(bucket=bucket, key=image.key)
|
|
rsp = clients.oss.put_object_from_file(req, image.path)
|
|
req = oss.models.GetObjectRequest(bucket=bucket, key=image.key)
|
|
rsp = clients.oss.presign(req)
|
|
return rsp.url
|
|
|
|
def copy_objects(clients, bucket, func, uploads):
|
|
"""Copy disk image objects to censored bucket from uncensored bucket"""
|
|
logger.info("upload %s (censored)" % clients.region)
|
|
payload = {
|
|
'bucket': bucket,
|
|
'sources': {
|
|
key: random.choices([url for key, url in urls], k=FC_SOURCE_COUNT)
|
|
for key, urls in groupby(sorted(
|
|
((image.key, url) for (region, image), url in uploads.items())
|
|
), key=itemgetter(0))
|
|
}
|
|
}
|
|
call_temp_function(clients, func, payload)
|
|
|
|
def delete_image(clients, name):
|
|
"""Remove existing image (if applicable)"""
|
|
req = ecs.models.DescribeImagesRequest(
|
|
region_id=clients.region,
|
|
image_name=name,
|
|
image_owner_alias='self',
|
|
)
|
|
rsp = clients.ecs.describe_images_with_options(req, RUNTIME_OPTS)
|
|
for image in rsp.body.images.image or ():
|
|
logger.info("delete image %s %s (%s)" %
|
|
(clients.region, image.image_name, image.image_id))
|
|
if image.is_public:
|
|
req = ecs.models.ModifyImageSharePermissionRequest(
|
|
region_id=clients.region,
|
|
image_id=image.image_id,
|
|
is_public=False,
|
|
)
|
|
rsp = clients.ecs.modify_image_share_permission_with_options(
|
|
req, RUNTIME_OPTS
|
|
)
|
|
req = ecs.models.DeleteImageRequest(
|
|
region_id=clients.region,
|
|
image_id=image.image_id
|
|
)
|
|
rsp = clients.ecs.delete_image_with_options(req, RUNTIME_OPTS)
|
|
|
|
def wait_for_task(clients, task_id):
|
|
"""Wait for task to complete"""
|
|
status = 'Unknowable'
|
|
for i in range(POLL_MAX_RETRIES):
|
|
time.sleep(POLL_INTERVAL_SEC)
|
|
req = ecs.models.DescribeTasksRequest(
|
|
region_id=clients.region,
|
|
task_ids=task_id,
|
|
)
|
|
try:
|
|
rsp = clients.ecs.describe_tasks_with_options(req, RUNTIME_OPTS)
|
|
except openapi.client.UnretryableException:
|
|
continue
|
|
assert len(rsp.body.task_set.task) == 1
|
|
assert rsp.body.task_set.task[0].task_id == task_id
|
|
status = rsp.body.task_set.task[0].task_status
|
|
if status not in ('Waiting', 'Processing'):
|
|
break
|
|
if status != 'Finished':
|
|
raise RuntimeError(status)
|
|
|
|
def wait_for_image(clients, image_id):
|
|
"""Wait for image to become available"""
|
|
status = 'Unknowable'
|
|
for i in range(POLL_MAX_RETRIES):
|
|
time.sleep(POLL_INTERVAL_SEC)
|
|
req = ecs.models.DescribeImagesRequest(
|
|
region_id=clients.region,
|
|
image_id=image_id,
|
|
)
|
|
try:
|
|
rsp = clients.ecs.describe_images_with_options(req, RUNTIME_OPTS)
|
|
except openapi.client.UnretryableException:
|
|
continue
|
|
if len(rsp.body.images.image):
|
|
assert len(rsp.body.images.image) == 1
|
|
assert rsp.body.images.image[0].image_id == image_id
|
|
status = rsp.body.images.image[0].status
|
|
if status != 'Creating':
|
|
break
|
|
if status != 'Available':
|
|
raise RuntimeError(status)
|
|
|
|
def import_image(clients, image, bucket):
|
|
"""Import image"""
|
|
logger.info("import %s %s" % (clients.region, image.name))
|
|
disk = ecs.models.ImportImageRequestDiskDeviceMapping(
|
|
disk_image_size = 1,
|
|
format = 'RAW',
|
|
ossbucket = bucket,
|
|
ossobject = image.key,
|
|
)
|
|
req = ecs.models.ImportImageRequest(
|
|
region_id=clients.region,
|
|
image_name=image.name,
|
|
architecture=image.arch,
|
|
boot_mode=image.mode,
|
|
disk_device_mapping=[disk],
|
|
client_token=str(uuid4()),
|
|
)
|
|
rsp = clients.ecs.import_image_with_options(req, RUNTIME_OPTS)
|
|
image_id = rsp.body.image_id
|
|
task_id = rsp.body.task_id
|
|
wait_for_task(clients, task_id)
|
|
wait_for_image(clients, image_id)
|
|
logger.info("image %s %s (%s)" %
|
|
(clients.region, image.name, image_id))
|
|
return image_id
|
|
|
|
def copy_image(clients, image, image_id, censored):
|
|
"""Copy imported image to censored region"""
|
|
logger.info("import %s %s via %s" %
|
|
(censored.region, image.name, clients.region))
|
|
req = ecs.models.CopyImageRequest(
|
|
region_id=clients.region,
|
|
image_id=image_id,
|
|
destination_region_id=censored.region,
|
|
destination_image_name=image.name,
|
|
client_token=str(uuid4()),
|
|
)
|
|
rsp = clients.ecs.copy_image_with_options(req, RUNTIME_OPTS)
|
|
copy_id = rsp.body.image_id
|
|
wait_for_image(censored, copy_id)
|
|
logger.info("image %s %s (%s)" % (censored.region, image.name, copy_id))
|
|
return copy_id
|
|
|
|
def finalise_image(clients, image, image_id):
|
|
"""Finalise image attributes and permissions"""
|
|
logger.info("finalise %s %s (%s)" % (clients.region, image.name, image_id))
|
|
req = ecs.models.ModifyImageAttributeRequest(
|
|
region_id=clients.region,
|
|
image_id=image_id,
|
|
image_family=image.family,
|
|
)
|
|
rsp = clients.ecs.modify_image_attribute_with_options(req, RUNTIME_OPTS)
|
|
if image.public:
|
|
req = ecs.models.ModifyImageSharePermissionRequest(
|
|
region_id=clients.region,
|
|
image_id=image_id,
|
|
is_public=True,
|
|
)
|
|
rsp = clients.ecs.modify_image_share_permission_with_options(
|
|
req, RUNTIME_OPTS
|
|
)
|
|
|
|
# Parse command-line arguments
|
|
parser = argparse.ArgumentParser(description="Import Alibaba Cloud image")
|
|
parser.add_argument('--verbose', '-v', action='count', default=0)
|
|
parser.add_argument('--name', '-n',
|
|
help="Base image name")
|
|
parser.add_argument('--family', '-f', default='ipxe',
|
|
help="Base family name")
|
|
parser.add_argument('--public', '-p', action='store_true',
|
|
help="Make image(s) public")
|
|
parser.add_argument('--overwrite', action='store_true',
|
|
help="Overwrite any existing image with same name")
|
|
parser.add_argument('--region', '-r', action='append',
|
|
help="AliCloud region(s)")
|
|
parser.add_argument('--role', '-R', default="AliyunFcDefaultRole",
|
|
help="AliCloud role for censorship bypass function")
|
|
parser.add_argument('image', nargs='+', help="iPXE disk image")
|
|
args = parser.parse_args()
|
|
|
|
# Configure logging
|
|
loglevels = [logging.WARNING, logging.INFO, logging.DEBUG]
|
|
verbosity = min(args.verbose, (len(loglevels) - 1))
|
|
logging.basicConfig(level=loglevels[verbosity])
|
|
logging.getLogger('apscheduler').setLevel(logging.WARNING)
|
|
|
|
# Use default name if none specified
|
|
if not args.name:
|
|
args.name = '%s-%s' % (args.family,
|
|
datetime.date.today().strftime('%Y%m%d'))
|
|
|
|
# Use all regions if none specified
|
|
regions = args.region or all_regions()
|
|
|
|
# Construct image list
|
|
images = [image(x, args.family, args.name, args.public) for x in args.image]
|
|
imports = [(region, image) for region in regions for image in images]
|
|
workers = len(imports)
|
|
|
|
# Look up resource names
|
|
fcrole = role_arn(args.role)
|
|
|
|
# Construct per-region clients
|
|
account = account_id()
|
|
clients = {region: all_clients(region, account) for region in regions}
|
|
|
|
# Delete existing images from all regions, if applicable
|
|
if args.overwrite:
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(delete_image,
|
|
clients=clients[region],
|
|
name=image.name): (region, image)
|
|
for region, image in imports}
|
|
done = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Create temporary function in each censored region with usable FC
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(create_temp_function,
|
|
clients=clients[region],
|
|
role=fcrole): region
|
|
for region in regions}
|
|
funcs = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Create temporary bucket in each region with usable OSS
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(create_temp_bucket,
|
|
clients=clients[region],
|
|
func=funcs[region]): region
|
|
for region in regions}
|
|
buckets = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Upload image objects directly to each uncensored region with usable OSS
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(upload_object,
|
|
clients=clients[region],
|
|
bucket=buckets[region],
|
|
image=image): (region, image)
|
|
for region, image in imports
|
|
if buckets[region] and not funcs[region]}
|
|
uploads = {futures[x]: x.result() for x in as_completed(futures)}
|
|
if not uploads:
|
|
parser.error("At least one working non-Chinese region is required")
|
|
|
|
# Copy image objects to each censored region with usable OSS and usable FC
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(copy_objects,
|
|
clients=clients[region],
|
|
bucket=buckets[region],
|
|
func=funcs[region],
|
|
uploads=uploads): region
|
|
for region in regions
|
|
if buckets[region] and funcs[region]}
|
|
done = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Import images in each region with usable OSS
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(import_image,
|
|
clients=clients[region],
|
|
image=image,
|
|
bucket=buckets[region]): (region, image)
|
|
for region, image in imports if buckets[region]}
|
|
results = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Copy images to regions without usable OSS
|
|
#
|
|
# Copies are rate-limited by source region, so spread the copies
|
|
# across all available regions with imported images.
|
|
#
|
|
copies = [(region, censored, image) for region, (censored, image) in zip(
|
|
cycle(region for region in regions if buckets[region]),
|
|
((region, image) for region, image in imports if not buckets[region]),
|
|
)]
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(copy_image,
|
|
clients=clients[region],
|
|
censored=clients[censored],
|
|
image=image,
|
|
image_id=results[(region, image)]):
|
|
(censored, image)
|
|
for region, censored, image in copies}
|
|
copied = {futures[x]: x.result() for x in as_completed(futures)}
|
|
results.update(copied)
|
|
|
|
# Finalise images
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(finalise_image,
|
|
clients=clients[region],
|
|
image=image,
|
|
image_id=results[(region, image)]):
|
|
(region, image)
|
|
for region, image in imports}
|
|
done = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Remove temporary buckets
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(delete_temp_bucket,
|
|
clients=clients[region],
|
|
func=funcs[region],
|
|
bucket=buckets[region]): region
|
|
for region in regions if buckets[region]}
|
|
done = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Remove temporary functions
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(delete_temp_function,
|
|
clients=clients[region],
|
|
func=funcs[region]): region
|
|
for region in regions if funcs[region]}
|
|
done = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Show created images
|
|
for region, image in imports:
|
|
image_id = results[(region, image)]
|
|
print("%s %s (%s) %s" % (region, image.name, image.family, image_id))
|