diff --git a/contrib/ci/README.rst b/contrib/ci/README.rst --- a/contrib/ci/README.rst +++ b/contrib/ci/README.rst @@ -139,8 +139,36 @@ This function will update job records in DynamoDB to record that a job has started/finished/aborted. +This component contains a *start job* Lambda function, which can be +invoked with the Job ID of a job that someone wishes to start. It +tries to ensure the job has run. In the case of an expired spot instance +request, it will try again. + The Terraform code for this component lives in ``job_executor.tf``. +Spot Instance Request Monitor +----------------------------- + +The *spot instance request monitor* is a component for monitoring the +state of spot instance requests. + +We utilize EC2 spot instances to run jobs. Spot instances, unlike +on-demand instances, can't be launched directly. Instead, you create +a request for spot instances and this request is eventually fulfilled +by EC2, if possible. Often, the request is fulfilled immediately and +an EC2 instance launches within a few seconds. + +But sometimes a spot instance request fails. This is often due to no +spot instances being available at that time. This is where this component +plays a part. + +The *spot instance request monitor* is a Lambda function that is +periodically invoked via a CloudWatch Event. It scans all jobs currently +waiting on a spot instance request. If the spot instance request +couldn't be fulfilled, it calls out to the *start job* Lambda function +to tell it to try to reschedule it. Theoretically, the job should +eventually start. + Worker ------ diff --git a/contrib/ci/lambda_functions/ci.py b/contrib/ci/lambda_functions/ci.py --- a/contrib/ci/lambda_functions/ci.py +++ b/contrib/ci/lambda_functions/ci.py @@ -68,6 +68,32 @@ start_pending_job(ec2, job_table, job_id) +def handle_start_job(event, context): + """Handler for ci-start-job function.""" + ec2 = boto3.client('ec2') + dynamodb = boto3.resource('dynamodb') + + job_table = dynamodb.Table(os.environ['DYNAMODB_JOB_TABLE']) + + job_id = event['job_id'] + print('received request to start job %s' % job_id) + + start_pending_job(ec2, job_table, job_id) + + +def handle_spot_instance_request_monitor(event, context): + """Handler to invoke spot instance request monitor.""" + ec2 = boto3.client('ec2') + dynamodb = boto3.resource('dynamodb') + lambda_client = boto3.client('lambda') + + job_table = dynamodb.Table(os.environ['DYNAMODB_JOB_TABLE']) + start_job_function = os.environ['LAMBDA_START_JOB_FUNCTION'] + + monitor_spot_instance_requests(ec2, job_table, lambda_client, + start_job_function) + + def handle_job_result_s3_artifact(event, context): """Handler called when a new S3 object job artifact is uploaded.""" dynamodb = boto3.resource('dynamodb') @@ -455,14 +481,70 @@ job_id = job['job_id'] print('requesting spot instance for job %s' % job_id) + availability_zones = [ + az['ZoneName'] + for az in ec2.describe_availability_zones()['AvailabilityZones'] + if az['State'] == 'available'] + # Fresh job request. if job['execution_state'] == 'pending': # Pick an availability zone randomly. - availability_zones = [ - az['ZoneName'] - for az in ec2.describe_availability_zones()['AvailabilityZones'] - if az['State'] == 'available'] availability_zone = random.choice(availability_zones) + + # Looks like we previously tried to launch a spot instance for this + # job. Examine the state of that request and make sure we can + # replace it. + elif job['execution_state'] == 'spot-instance-requested': + spot_instance_requests = ec2.describe_spot_instance_requests( + SpotInstanceRequestIds=[job['spot_instance_request_id']], + )['SpotInstanceRequests'] + + # This should never happen assuming this code path doesn't run after + # the spot request was finalized, which should never happen, since + # we trigger things periodically. + if not spot_instance_requests: + print('unable to find spot instance request for job %s' % job_id) + return + + sir = spot_instance_requests[0] + + # If the spot request expired, try again on to the next availability + # zone. + if (sir['State'] == 'cancelled' + and sir['Status']['Code'] == 'schedule-expired'): + print('previous spot request for job %s expired; ' + 'trying different az' % job_id) + + previous_az = sir['LaunchSpecification']['Placement']['AvailabilityZone'] + previous_index = availability_zones.index(previous_az) + try: + availability_zone = availability_zones[previous_index + 1] + except IndexError: + availability_zone = availability_zones[0] + else: + print('unhandled spot instance request state for job %s: ' + '%s; %s: %s' % ( + job_id, + sir['State'], + sir['Status']['Code'], + sir['Status']['Message'])) + print('cancelling job %s' % job_id) + job_table.update_item( + Key={'job_id': job_id}, + UpdateExpression=( + 'set execution_state = :state, ' + 'cancelled_reason = :reason' + ), + ExpressionAttributeValues={ + ':state': 'cancelled', + ':reason': 'spot state: %s; %s' % ( + sir['State'], + sir['Status']['Code'], + ), + }, + ) + return + else: print('unhandled execution_state: %s' % job['execution_state']) return @@ -485,10 +567,13 @@ # we can just retry the job. # # The max bid price is the on-demand price. So in the typical case we save - # $$$. If we're unlucky we pay the on-demand rate. You can't lose. + # $$$. If we're unlucky we pay the on-demand rate. You can't lose. Unless + # there are no available spot instances. But we handle this by setting a + # short request validity window and retrying in a different availability + # zone. Eventually we should find someone willing to satisfy our request. res = ec2.request_spot_instances( BlockDurationMinutes=60, - ValidUntil=datetime.datetime.utcnow() + datetime.timedelta(minutes=10), + ValidUntil=datetime.datetime.utcnow() + datetime.timedelta(minutes=1), LaunchSpecification=launch_spec, ) @@ -511,6 +596,55 @@ ) +def monitor_spot_instance_requests(ec2, job_table, lambda_client, + start_job_function): + """Looks for stale spot instance requests and updates state accordingly.""" + print('monitoring for stale spot instance requests') + + # We could call the EC2 API directly. But we want DynamoDB to be our + # source of truth. So key off its state. + res = job_table.query( + IndexName='execution_state', + KeyConditionExpression=Key('execution_state').eq('spot-instance-requested'), + Select='ALL_PROJECTED_ATTRIBUTES', + ) + print('found %d jobs in spot-instance-requested state' % len(res['Items'])) + + for item in res['Items']: + print('job %s is in spot-instance-requested' % item['job_id']) + + # If the spot instance request is expired, retrigger scheduling. + request_id = item['spot_instance_request_id'] + print('checking state of %s' % request_id) + spot_instance_requests = ec2.describe_spot_instance_requests( + SpotInstanceRequestIds=[request_id], + )['SpotInstanceRequests'] + + if not spot_instance_requests: + print('could not find %s; weird' % request_id) + continue + + sir = spot_instance_requests[0] + + print('spot instance request %s is in state %s: %s' % ( + request_id, sir['State'], sir['Status']['Code'])) + + if (sir['State'] == 'cancelled' + and sir['Status']['Code'] == 'schedule-expired'): + print('spot instance request %s for job %s has expired; ' + 'retrying scheduling' % (request_id, item['job_id'])) + + payload = json.dumps({ + 'job_id': item['job_id'], + }) + + lambda_client.invoke( + FunctionName=start_job_function, + InvocationType='Event', + Payload=payload, + ) + + def react_to_instance_state_change(job_table, instance, state): """React to a CI worker instance state change.""" now = decimal.Decimal(time.time()) diff --git a/contrib/ci/lambda_functions/web.py b/contrib/ci/lambda_functions/web.py --- a/contrib/ci/lambda_functions/web.py +++ b/contrib/ci/lambda_functions/web.py @@ -164,7 +164,8 @@ else: skip_count = 'n/a' - if job_info['execution_state'] in ('pending', 'spot-instance-requested', 'running'): + if job_info['execution_state'] in ( + 'pending', 'spot-instance-requested', 'cancelled', 'running'): job_state = job_info['execution_state'] elif job_info['execution_state'] == 'done': exit_clean = job_info.get('exit_clean') diff --git a/contrib/ci/terraform/job_executor.tf b/contrib/ci/terraform/job_executor.tf --- a/contrib/ci/terraform/job_executor.tf +++ b/contrib/ci/terraform/job_executor.tf @@ -204,3 +204,60 @@ principal = "events.amazonaws.com" source_arn = aws_cloudwatch_event_rule.trigger_instance_state_change.arn } + +resource "aws_cloudwatch_log_group" "lambda_ci_start_job" { + name = "/aws/lambda/${aws_lambda_function.ci_start_job.function_name}" + retention_in_days = 7 +} + +resource "aws_iam_role" "lambda_ci_start_job" { + name = "lambda-ci-start-job" + description = "For Lambda function to trigger job start" + assume_role_policy = data.aws_iam_policy_document.assume_role_lambda.json +} + +# Lambda function for starting a job. This is similar to `run_pending_job` +# except it isn't a handler for SQS events. +resource "aws_lambda_function" "ci_start_job" { + function_name = "ci-start-job" + description = "Starts a scheduled CI job" + filename = data.archive_file.lambda_ci.output_path + handler = "ci.handle_start_job" + source_code_hash = data.archive_file.lambda_ci.output_base64sha256 + runtime = "python3.7" + timeout = 60 + role = aws_iam_role.lambda_ci_start_job.arn + environment { + variables = { + DYNAMODB_JOB_TABLE = aws_dynamodb_table.ci_job.name + } + } +} + +# Inherit the policy from run_pending_job. +resource "aws_iam_role_policy" "lambda_ci_start_job_run_pending_job" { + role = aws_iam_role.lambda_ci_start_job.name + name = "run-pending-job" + policy = data.aws_iam_policy_document.lambda_ci_run_pending_job.json +} + +# Add supplement with additional policy. +data "aws_iam_policy_document" "lambda_ci_start_job" { + # Allow Lambda function to write CloudWatch events. + statement { + effect = "Allow" + actions = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents", + ] + resources = [ + aws_cloudwatch_log_group.lambda_ci_start_job.arn] + } +} + +resource "aws_iam_role_policy" "lambda_ci_start_job" { + role = aws_iam_role.lambda_ci_start_job.name + name = aws_iam_role.lambda_ci_start_job.name + policy = data.aws_iam_policy_document.lambda_ci_start_job.json +} diff --git a/contrib/ci/terraform/spot_instance_request_monitor.tf b/contrib/ci/terraform/spot_instance_request_monitor.tf new file mode 100644 --- /dev/null +++ b/contrib/ci/terraform/spot_instance_request_monitor.tf @@ -0,0 +1,94 @@ +# Defines resources to monitor spot instance requests. + +resource "aws_cloudwatch_log_group" "lambda_ci_spot_instance_request_monitor" { + name = "/aws/lambda/${aws_lambda_function.ci_spot_instance_request_monitor.function_name}" + retention_in_days = 7 +} + +resource "aws_iam_role" "lambda_ci_spot_instance_request_monitor" { + name = "lambda-ci-spot-instance-request-monitor" + description = "For Lambda function monitoring spot instance requests" + assume_role_policy = data.aws_iam_policy_document.assume_role_lambda.json +} + +# Function that monitors spot instance requests and retries failed ones. +resource "aws_lambda_function" "ci_spot_instance_request_monitor" { + function_name = "ci-spot-instance-request-monitor" + description = "Monitors spot instance requests and triggers activity" + filename = data.archive_file.lambda_ci.output_path + handler = "ci.handle_spot_instance_request_monitor" + source_code_hash = data.archive_file.lambda_ci.output_base64sha256 + runtime = "python3.7" + timeout = 60 + role = aws_iam_role.lambda_ci_spot_instance_request_monitor.arn + environment { + variables = { + DYNAMODB_JOB_TABLE = aws_dynamodb_table.ci_job.name + LAMBDA_START_JOB_FUNCTION = aws_lambda_function.ci_start_job.function_name + } + } +} + +data "aws_iam_policy_document" "lambda_ci_spot_instance_request_monitor" { + # Allow Lambda function to write CloudWatch events. + statement { + effect = "Allow" + actions = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents", + ] + resources = [aws_cloudwatch_log_group.lambda_ci_spot_instance_request_monitor.arn] + } + # Allow querying spot instance requests. + statement { + effect = "Allow" + actions = [ + "ec2:DescribeSpotInstanceRequests", + ] + resources = ["*"] + } + # Allow fetching job state from DynamoDB. + statement { + effect = "Allow" + actions = [ + "dynamodb:Query", + ] + resources = [ + aws_dynamodb_table.ci_job.arn, + "${aws_dynamodb_table.ci_job.arn}/*", + ] + } + # Allow invoking the start job Lambda function. + statement { + effect = "Allow" + actions = ["lambda:InvokeFunction"] + resources = [aws_lambda_function.ci_start_job.arn] + } +} + +resource "aws_iam_role_policy" "lambda_ci_spot_instance_request_monitor" { + role = aws_iam_role.lambda_ci_spot_instance_request_monitor.name + name = aws_iam_role.lambda_ci_spot_instance_request_monitor.name + policy = data.aws_iam_policy_document.lambda_ci_spot_instance_request_monitor.json +} + +# Periodically trigger the Lambda function so state is continuously monitored. +resource "aws_cloudwatch_event_rule" "trigger_ci_spot_instance_request_monitor" { + name = "trigger-ci-spot-instance-request-monitor" + description = "Trigger monitoring of spot instance requests" + schedule_expression = "rate(1 minute)" +} + +resource "aws_cloudwatch_event_target" "ci_spot_instance_request_monitor" { + rule = aws_cloudwatch_event_rule.trigger_ci_spot_instance_request_monitor.name + arn = aws_lambda_function.ci_spot_instance_request_monitor.arn +} + +resource "aws_lambda_permission" "ci_spot_instance_request_monitor_allow_cloudwatch" { + statement_id = "AllowExecutionFromCloudWatch" + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.ci_spot_instance_request_monitor.function_name + principal = "events.amazonaws.com" + source_arn = aws_cloudwatch_event_rule.trigger_ci_spot_instance_request_monitor.arn +} diff --git a/contrib/ci/terraform/storage.tf b/contrib/ci/terraform/storage.tf --- a/contrib/ci/terraform/storage.tf +++ b/contrib/ci/terraform/storage.tf @@ -55,8 +55,21 @@ name = "job_id" type = "S" } + attribute { + name = "execution_state" + type = "S" + } hash_key = "job_id" + + # This allows us to easily query for jobs in a specific state. + global_secondary_index { + name = "execution_state" + hash_key = "execution_state" + range_key = "job_id" + projection_type = "ALL" + } + } # Tracks results for individual tests in each job.