diff --git a/contrib/ci/lambda_functions/ci.py b/contrib/ci/lambda_functions/ci.py --- a/contrib/ci/lambda_functions/ci.py +++ b/contrib/ci/lambda_functions/ci.py @@ -11,6 +11,7 @@ import decimal import json import os +import random import time import urllib.request import uuid @@ -428,6 +429,7 @@ 'user_data_params': user_data_params, 'ec2_instance_launch_config': config, }, sort_keys=True), + 'spot_instance_request_count': 0, }) print('adding job to pending queue') @@ -445,6 +447,26 @@ return job = res['Item'] + request_spot_instance_for_job(ec2, job_table, job) + + +def request_spot_instance_for_job(ec2, job_table, job): + """Request a spot instance to start a job.""" + job_id = job['job_id'] + print('requesting spot instance for job %s' % job_id) + + # Fresh job request. + if job['execution_state'] == 'pending': + # Pick an availability zone randomly. + availability_zones = [ + az['ZoneName'] + for az in ec2.describe_availability_zones()['AvailabilityZones'] + if az['State'] == 'available'] + availability_zone = random.choice(availability_zones) + else: + print('unhandled execution_state: %s' % job['execution_state']) + return + start_params = json.loads(job['start_params']) user_data_template = start_params['user_data_template'] user_data_params = start_params['user_data_params'] @@ -452,9 +474,10 @@ user_data = user_data_template.format(**user_data_params) - print('requesting spot instance for job %s' % user_data_params['job_id']) - launch_spec = dict(ec2_instance_config) + launch_spec['Placement'] = { + 'AvailabilityZone': availability_zone, + } launch_spec['UserData'] = base64.b64encode(user_data.encode('utf-8')).decode('utf-8') # Spot instances are substantially cheaper but can be terminated at will @@ -463,12 +486,30 @@ # # The max bid price is the on-demand price. So in the typical case we save # $$$. If we're unlucky we pay the on-demand rate. You can't lose. - ec2.request_spot_instances( + res = ec2.request_spot_instances( BlockDurationMinutes=60, ValidUntil=datetime.datetime.utcnow() + datetime.timedelta(minutes=10), LaunchSpecification=launch_spec, ) + spot_instance_request_id = res['SpotInstanceRequests'][0]['SpotInstanceRequestId'] + print('spot instance request id: %s' % spot_instance_request_id) + + print('recording spot instance state for job %s' % job_id) + job_table.update_item( + Key={'job_id': job_id}, + UpdateExpression=( + 'set execution_state = :state, ' + 'spot_instance_request_id = :sir, ' + 'spot_instance_request_count = spot_instance_request_count + :incr' + ), + ExpressionAttributeValues={ + ':state': 'spot-instance-requested', + ':sir': spot_instance_request_id, + ':incr': 1, + } + ) + def react_to_instance_state_change(job_table, instance, state): """React to a CI worker instance state change.""" diff --git a/contrib/ci/lambda_functions/web.py b/contrib/ci/lambda_functions/web.py --- a/contrib/ci/lambda_functions/web.py +++ b/contrib/ci/lambda_functions/web.py @@ -164,7 +164,7 @@ else: skip_count = 'n/a' - if job_info['execution_state'] in ('pending', 'running'): + if job_info['execution_state'] in ('pending', 'spot-instance-requested', 'running'): job_state = job_info['execution_state'] elif job_info['execution_state'] == 'done': exit_clean = job_info.get('exit_clean') diff --git a/contrib/ci/terraform/job_executor.tf b/contrib/ci/terraform/job_executor.tf --- a/contrib/ci/terraform/job_executor.tf +++ b/contrib/ci/terraform/job_executor.tf @@ -68,11 +68,12 @@ ] resources = ["*"] } - # Allow querying job state in DynamoDB. + # Allow querying and updating job state in DynamoDB. statement { effect = "Allow" actions = [ "dynamodb:GetItem", + "dynamodb:UpdateItem", ] resources = [ aws_dynamodb_table.ci_job.arn,