#!/usr/bin/env python3 import argparse from base64 import b64encode from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import date from hashlib import sha256 from itertools import count import subprocess import boto3 BLOCKSIZE = 512 * 1024 def detect_architecture(image): """Detect CPU architecture""" mdir = subprocess.run(['mdir', '-b', '-i', image, '::/EFI/BOOT'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if any(b'BOOTAA64.EFI' in x for x in mdir.stdout.splitlines()): return 'arm64' return 'x86_64' def create_snapshot(region, description, image): """Create an EBS snapshot""" client = boto3.client('ebs', region_name=region) snapshot = client.start_snapshot(VolumeSize=1, Description=description) snapshot_id = snapshot['SnapshotId'] with open(image, 'rb') as fh: for block in count(): data = fh.read(BLOCKSIZE) if not data: break data = data.ljust(BLOCKSIZE, b'\0') checksum = b64encode(sha256(data).digest()).decode() client.put_snapshot_block(SnapshotId=snapshot_id, BlockIndex=block, BlockData=data, DataLength=BLOCKSIZE, Checksum=checksum, ChecksumAlgorithm='SHA256') client.complete_snapshot(SnapshotId=snapshot_id, ChangedBlocksCount=block) return snapshot_id def import_image(region, name, architecture, image, public, overwrite): """Import an AMI image""" client = boto3.client('ec2', region_name=region) resource = boto3.resource('ec2', region_name=region) description = '%s (%s)' % (name, architecture) images = client.describe_images(Filters=[{'Name': 'name', 'Values': [description]}]) if overwrite and images['Images']: images = images['Images'][0] image_id = images['ImageId'] snapshot_id = images['BlockDeviceMappings'][0]['Ebs']['SnapshotId'] resource.Image(image_id).deregister() resource.Snapshot(snapshot_id).delete() snapshot_id = create_snapshot(region=region, description=description, image=image) client.get_waiter('snapshot_completed').wait(SnapshotIds=[snapshot_id]) image = client.register_image(Architecture=architecture, BlockDeviceMappings=[{ 'DeviceName': '/dev/sda1', 'Ebs': { 'SnapshotId': snapshot_id, 'VolumeType': 'standard', }, }], EnaSupport=True, Name=description, RootDeviceName='/dev/sda1', SriovNetSupport='simple', VirtualizationType='hvm') image_id = image['ImageId'] client.get_waiter('image_available').wait(ImageIds=[image_id]) if public: resource.Image(image_id).modify_attribute(Attribute='launchPermission', OperationType='add', UserGroups=['all']) return image_id def launch_link(region, image_id): """Construct a web console launch link""" return ("https://console.aws.amazon.com/ec2/v2/home?" "region=%s#LaunchInstanceWizard:ami=%s" % (region, image_id)) # Parse command-line arguments parser = argparse.ArgumentParser(description="Import AWS EC2 image (AMI)") parser.add_argument('--name', '-n', help="Image name") parser.add_argument('--public', '-p', action='store_true', help="Make image public") parser.add_argument('--overwrite', action='store_true', help="Overwrite any existing image with same name") parser.add_argument('--region', '-r', action='append', help="AWS region(s)") parser.add_argument('--wiki', '-w', metavar='FILE', help="Generate Dokuwiki table") parser.add_argument('image', nargs='+', help="iPXE disk image") args = parser.parse_args() # Detect CPU architectures architectures = {image: detect_architecture(image) for image in args.image} # Use default name if none specified if not args.name: args.name = 'iPXE (%s)' % date.today().strftime('%Y-%m-%d') # Use all regions if none specified if not args.region: args.region = sorted(x['RegionName'] for x in boto3.client('ec2').describe_regions()['Regions']) # Use one thread per import to maximise parallelism imports = [(region, image) for region in args.region for image in args.image] with ThreadPoolExecutor(max_workers=len(imports)) as executor: futures = {executor.submit(import_image, region=region, name=args.name, architecture=architectures[image], image=image, public=args.public, overwrite=args.overwrite): (region, image) for region, image in imports} results = {futures[future]: future.result() for future in as_completed(futures)} # Construct Dokuwiki table wikitab = ["^ AWS region ^ CPU architecture ^ AMI ID ^\n"] + list( "| ''%s'' | ''%s'' | ''[[%s|%s]]'' |\n" % ( region, architectures[image], launch_link(region, results[(region, image)]), results[(region, image)], ) for region, image in imports) if args.wiki: with open(args.wiki, 'wt') as fh: fh.writelines(wikitab) # Show created images for region, image in imports: print("%s %s %s %s" % ( region, image, architectures[image], results[(region, image)] ))