diff --git a/contrib/ci/lambda_functions/ci.py b/contrib/ci/lambda_functions/ci.py --- a/contrib/ci/lambda_functions/ci.py +++ b/contrib/ci/lambda_functions/ci.py @@ -54,16 +54,17 @@ def handle_pending_job(event, context): """Handler for starting a job from an SQS message.""" ec2 = boto3.client('ec2') + dynamodb = boto3.resource('dynamodb') + + job_table = dynamodb.Table(os.environ['DYNAMODB_JOB_TABLE']) for record in event['Records']: body = record['body'] data = json.loads(body) - user_data_template = data['user_data_template'] - user_data_params = data['user_data_params'] - ec2_instance_config = data['ec2_instance_launch_config'] + job_id = data['job_id'] - start_pending_job(ec2, user_data_template, user_data_params, ec2_instance_config) + start_pending_job(ec2, job_table, job_id) def handle_job_result_s3_artifact(event, context): @@ -409,12 +410,6 @@ 'SecurityGroups': ['hg-linux-dev-1'], } - job_params = { - 'user_data_template': LINUX_USER_DATA, - 'user_data_params': user_data_params, - 'ec2_instance_launch_config': config, - } - schedule_time = decimal.Decimal(time.time()) print('registering job in DynamoDB') @@ -426,17 +421,35 @@ 'build_number': build_number, 'execution_state': 'pending', 'schedule_time': schedule_time, + # We encode as JSON because DynamoDB doesn't like some types + # like empty strings. + 'start_params': json.dumps({ + 'user_data_template': LINUX_USER_DATA, + 'user_data_params': user_data_params, + 'ec2_instance_launch_config': config, + }, sort_keys=True), }) print('adding job to pending queue') sqs.send_message( QueueUrl=sqs_url, - MessageBody=json.dumps(job_params, sort_keys=True) + MessageBody=json.dumps({'job_id': job_id}) ) -def start_pending_job(ec2, user_data_template, user_data_params, ec2_instance_config): +def start_pending_job(ec2, job_table, job_id): """Called to request the start of a pending job.""" + res = job_table.get_item(Key={'job_id': job_id}, ConsistentRead=True) + if 'Item' not in res: + print('unable to find job %s' % job_id) + return + + job = res['Item'] + start_params = json.loads(job['start_params']) + user_data_template = start_params['user_data_template'] + user_data_params = start_params['user_data_params'] + ec2_instance_config = start_params['ec2_instance_launch_config'] + user_data = user_data_template.format(**user_data_params) print('requesting spot instance for job %s' % user_data_params['job_id']) diff --git a/contrib/ci/terraform/job_executor.tf b/contrib/ci/terraform/job_executor.tf --- a/contrib/ci/terraform/job_executor.tf +++ b/contrib/ci/terraform/job_executor.tf @@ -16,6 +16,11 @@ runtime = "python3.7" timeout = 60 role = aws_iam_role.lambda_ci_run_pending_job.arn + environment { + variables = { + DYNAMODB_JOB_TABLE = aws_dynamodb_table.ci_job.name + } + } } resource "aws_cloudwatch_log_group" "lambda_ci_run_pending_job" { @@ -63,6 +68,16 @@ ] resources = ["*"] } + # Allow querying job state in DynamoDB. + statement { + effect = "Allow" + actions = [ + "dynamodb:GetItem", + ] + resources = [ + aws_dynamodb_table.ci_job.arn, + ] + } } resource "aws_iam_role_policy" "lambda_ci_handle_pending_job" {