diff --git a/.hgignore b/.hgignore --- a/.hgignore +++ b/.hgignore @@ -30,6 +30,7 @@ tests/htmlcov build contrib/chg/chg +contrib/ci/terraform/.terraform contrib/hgsh/hgsh contrib/vagrant/.vagrant dist diff --git a/contrib/ci/README.rst b/contrib/ci/README.rst new file mode 100644 --- /dev/null +++ b/contrib/ci/README.rst @@ -0,0 +1,241 @@ +============ +Mercurial CI +============ + +This directory defines a CI system for the Mercurial Project. + +Architecture +============ + +The core of the CI system is built on top of Amazon Web Services. It +consists of a number of *serverless* components which monitor the +Mercurial Project, react to changes, store and provide access to +results, etc. + +The CI system consists of the following primary components: + +* Data storage in DynamoDB and S3. +* Lightweight compute via Lambda functions. +* Execution of heavyweight tasks (e.g. running tests) on ephemeral EC2 + instances. +* HTTP API and HTML endpoint via API Gateway / Lambda functions. + +In addition, the following AWS resources are utilized: + +* IAM for defining roles and access policies for various entities. +* SQS for event queuing. +* CloudWatch for logging. + +Components +========== + +Storage +------- + +The CI system utilizes DynamoDB and S3 for storage. DynamoDB is utilized +as a fast source-of-truth system for most data. S3 is mostly utilized as +a temporary storage location for larger artifacts, such as the raw output +logs for individual jobs. + +The Terraform code for this component lives in ``storage.tf``. + +Repo Poller +----------- + +The system monitors configured Mercurial repositories for new pushes. It +does this by periodically invoking a Lambda function via a CloudWatch event +trigger. This Lambda function will make HTTP requests to the monitored +repository and then record any observed changes to DynamoDB. + +The Terraform code for this component lives in ``repo_poll.tf``. + +Repo Change Reactor +------------------- + +The *repo change reactor* is a component responsible for reacting to +repository change events (e.g. repository pushes). + +The primary role of this component is to determine what work needs to be +done and to schedule it. + +This component consists of a Lambda function which is triggered by DynamoDB +changes initiated by the *Repo Poller*. This function will inspect the +state of the world and generate pending *jobs* for that change event. +It will add new job records to DynamoDB and will register ready-to-run +jobs on an SQS queue. + +The Terraform code for this component lives in ``repo_change_reactor.tf``. + +Job Executor +------------ + +The *job executor* is a component responsible for executing and managing +pending jobs. It essentially takes the work queued by the *Repo Change +Reactor* and sets it in action. + +The *job executor* mainly consists of some Lambda functions. + +The *run pending job* Lambda function is subscribed to the SQS queue +for pending jobs. When this function is invoked, it attempts to launch +a new instance to run the requested job, whose launch configuration is +defined in data in the SQS message. If the launch is successful, the +new instance boots up and starts work on its own. If not, the Lambda +function raises an exception and the message is placed back in the +SQS queue, where it will be automatically retried later. Eventually, +a message *times out* and will be dropped. + +This component also contains an *instance state change* Lambda function. +This function is invoked whenever an EC2 instance's state change. e.g. +when a new instance is created, stops, or terminates. The role of this +function is to keep accounting for running jobs and instances up-to-date. +This function will update job records in DynamoDB to record that a +job has started/finished/aborted. + +The Terraform code for this component lives in ``job_executor.tf``. + +Worker +------ + +The *worker* component is the entity doing compute-heavy work. It is +essentially a single EC2 instance. + +The Terraform code for this component lives in ``worker.tf``. + +Job Result Reactor +------------------ + +The *Job Result Reactor* component centers around a Lambda function +which is called when an S3 object is created. The role of this component +is to react to artifacts uploaded by individual jobs. + +The Lambda function receives information about the uploaded S3 object. +From this, the job ID can be derived and job state as well as test +result state can be updated in DynamoDB. + +The Terraform code for this component lives in ``job_result_reactor.tf``. + +Web +--- + +The *web* component provides an HTTP endpoint for the CI system. + +The main functionality for the *web* component is implemented by a +Lambda function, which is fronted by an API Gateway, which is configured +to use a hostname backed by a legitimate x509 certificate so the +service can be publicly exposed and will work well with browsers. + +The Terraform code for this component lives in ``web.tf``. + +AWS Account Management +====================== + +We currently assume that the AWS account used to run Mercurial CI +is operated within a larger organization. + +Terraform references resources in this parent account. Specifically, +the resources in ``account_manager.tf`` allow a Lambda function in +this parent account to assume a role in this account which gives it +unlimited powers. What's happening here is the parent account +periodically scans this account for misbehavior or other activities +which could cause badness, such as large cost expenditures. The +account manager in the parent account does things like terminate +long-running EC2 instances. + +Implementation Notes +==================== + +Instance Lifecycle +------------------ + +Each job is executed on a brand new EC2 instance. Once that job is +done, the instance shuts down and terminates. + +This is a good model from security and determinism perspectives because +we don't have to worry about previous state on the instance since each +instance is launched from a known initial state (an AMI). As long as the +IAM instance policy is locked down, even malicious code can't do much +in terms of privileged operations - just generic compute. Generic compute +can still be abused of course. But there's no way for us to prevent that +abuse: CI is RCE as a service. + +A downside to new instances for every job is that there is a non-trivial +amount of overhead to obtain an instance and boot into its OS - often a +few dozen seconds. Ideally we'd have a pool of idle workers waiting to +run new jobs. + +But there's a significant reason we use ephemeral, on-demand instances: +cost. If we had instances sitting around, they'd accrue a substantial +bill. On EC2, you are billed by the second for instance use (at least +for most Linux AMIs - Windows and Linux distros with a support contract +are billed by the hour). As long as the overhead for instance startup +and shutdown are minimal and the overall utilization of the CI system +is low, we still out drastically ahead by launching instances on demand. + +Another benefit of launching a new instance per job is that we can scale +out to infinite job parallelism. Instead of managing the worker pool size, +we can just spin up instances when they are needed and dispose of them +after. And all jobs will complete sooner. + +Since our instances are very transient, we use EC2 spot instances. This +allows us to cut substantial costs versus the on-demand rate at most +times. And the EC2 spot instances we use are rarely terminated by Amazon, +so our failure rate for utilizing them is slow. This is well worth the +trade-off. + +Known Issues and Limitations +---------------------------- + +Job running current knows nothing about building AMIs. We should teach +the system to build AMIs using Mercurial's in-repo functionality for +doing so. This will require teaching the in-repo code to not purge old +AMIs, as this could create issues when multiple branches have CI +triggered at the same time. It should be possible to add a tag to the +AMI to denote an expiration time or some such so that we can purge old +or unused AMIs automatically. + +There's no machine API for accessing CI results. We should definitely +build a JSON API or something along those lines. + +The ``result.json`` file produced by Mercurial's test harness doesn't +contain as much detail as the raw output. Notably missing is the skip +reason and annotations for test failures when there is no diff +(e.g. timeout). We should extend the format to contain this metadata. + +We have no mechanism to retrigger a job. This requires some form of +authentication to prevent abuse. + +We have no mechanism to trigger CI on arbitrary diffs. We would like +to provide some kind of *try server* where you can submit a diff and +the system builds it. Again, this requires some form of authentication. + +We have no mechanism to choose which jobs to execute. We probably want +to build this because there is no need to execute all jobs all the time. + +Development Workflow +==================== + +The Terraform code and current production environment currently assumes +it is running in an AWS account attached to Gregory Szorc's main AWS +account. The details of this account can be found in the ``init.tf`` file. + +Terraform requires Terraform 0.12 or newer. You will also need to invoke +``terraform`` such that it can locate AWS credentials with admin +privileges to the AWS account being provisioned. Typically, you configure +``~/.aws/config`` and ``~/.aws/credentials`` to define a profile plus +credentials and set e.g. ``AWS_PROFILE=hg`` to use the ``hg`` AWS profile +with Terraform. + +Assuming the AWS credentials are set up:: + + $ cd terraform + $ terraform init + $ terraform apply + +The ``terraform/`` directory contains all the AWS resources, including +triggers between various resources. (e.g. changes invoking Lambda +functions). + +Most of the business logic lives in Lambda functions. These can be found +in the ``lambda_functions/`` directory. A typical workflow is to make +changes then run ``terraform apply``, ideally against a development AWS +profile. diff --git a/contrib/ci/lambda_functions/ci.py b/contrib/ci/lambda_functions/ci.py new file mode 100644 --- /dev/null +++ b/contrib/ci/lambda_functions/ci.py @@ -0,0 +1,569 @@ +# ci.py - Lambda functions for Mercurial CI +# +# Copyright 2019 Gregory Szorc +# +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. +# no-check-code because Python 3 native. + +import base64 +import datetime +import decimal +import json +import os +import time +import urllib.request + +import boto3 + + +def handle_poll_repo(event, context): + """Handler for polling a repository and recording pushes.""" + dynamodb = boto3.resource('dynamodb') + poll_table = dynamodb.Table(os.environ['DYNAMODB_REPO_POLL_TABLE']) + push_table = dynamodb.Table(os.environ['DYNAMODB_PUSH_TABLE']) + + repo_url = os.environ['REPO_URL'].rstrip('/') + poll_revs = os.environ['POLL_REVS'].split() + + poll_repo(poll_table, push_table, repo_url, poll_revs) + + +def handle_schedule_from_push(event, context): + """Handler for scheduling CI for a repo node via DynamoDB changes.""" + for record in event['Records']: + keys = record['dynamodb']['Keys'] + print('received %s event for %s %s' % ( + record['eventName'], keys['repo']['S'], keys['push_id']['S'])) + + if record['eventName'] not in ('INSERT', 'MODIFY'): + continue + + record = record['dynamodb']['NewImage'] + schedule_ci(record['repo_url']['S'], record['repo']['S'], record['node']['S']) + + +def handle_pending_job(event, context): + """Handler for starting a job from an SQS message.""" + ec2 = boto3.client('ec2') + + for record in event['Records']: + body = record['body'] + + data = json.loads(body) + user_data_template = data['user_data_template'] + user_data_params = data['user_data_params'] + ec2_instance_config = data['ec2_instance_launch_config'] + + start_pending_job(ec2, user_data_template, user_data_params, ec2_instance_config) + + +def handle_job_result_s3_artifact(event, context): + """Handler called when a new S3 object job artifact is uploaded.""" + dynamodb = boto3.resource('dynamodb') + s3 = boto3.resource('s3') + + job_table = dynamodb.Table(os.environ['DYNAMODB_JOB_TABLE']) + test_result_table = dynamodb.Table(os.environ['DYNAMODB_TEST_RESULT_TABLE']) + + for record in event['Records']: + # We assume the key was uploaded to the proper location. This may + # not be safe. But to resolve the principal initiating the change or its + # IP address to an EC2 instance might be too expensive. + key = s3.Object(record['s3']['bucket']['name'], record['s3']['object']['key']) + + process_job_artifact(job_table, test_result_table, key) + + +def handle_instance_state_change(event, context): + instance_id = event['detail']['instance-id'] + state = event['detail']['state'] + print('received %s for %s' % (state, instance_id)) + + ec2 = boto3.resource('ec2') + dynamodb = boto3.resource('dynamodb') + + instance = ec2.Instance(instance_id) + + # We only care about events for ci-worker instances. + if not instance.iam_instance_profile: + print('no IAM instance profile defined; ignoring') + return + + if not instance.iam_instance_profile['Arn'].endswith('/ci-worker'): + print('not a CI worker; ignoring') + return + + job_table = dynamodb.Table(os.environ['DYNAMODB_JOB_TABLE']) + + react_to_instance_state_change(job_table, instance, state) + + +def next_build_number(job_table, repo, node, job_name): + """Find the next available build number for a job given its unique name.""" + + build_number = 0 + + res = job_table.scan( + ProjectionExpression='repo, node, job_name, build_number', + FilterExpression='repo = :repo AND node = :node AND job_name = :name', + ExpressionAttributeValues={ + ':repo': repo, + ':node': node, + ':name': job_name, + }, + ) + + for entry in res['Items']: + if entry['build_number'] >= build_number: + build_number = int(entry['build_number']) + 1 + + return build_number + + +def poll_repo(poll_table, push_table, repo_url, poll_revs): + """Poll a repository for new changes and record them.""" + repo_name = repo_url.split('/')[-1] + + print('polling %s at %s' % (repo_name, repo_url)) + + new_state = { + 'last_poll': datetime.datetime.utcnow().isoformat(), + 'repo': repo_name, + 'repo_url': repo_url, + 'revs': {}, + } + + node_info = {} + + for rev in poll_revs: + url = '%s/json-rev/%s' % (repo_url, rev) + + req = urllib.request.Request(url, headers={'User-Agent': "Greg's Repo Poller"}) + res = urllib.request.urlopen(req, timeout=10) + cset = json.load(res) + + print('%s resolves to %s' % (rev, cset['node'])) + new_state['revs'][rev] = cset['node'] + + node_info[cset['node']] = { + 'branch': cset['branch'], + 'user': cset['user'], + 'message': cset['desc'], + } + + res = poll_table.get_item(Key={'repo': repo_name}) + if 'Item' in res: + last_state = res['Item'] + else: + last_state = { + 'revs': {}, + } + + for rev, new_node in sorted(new_state['revs'].items()): + old_node = last_state['revs'].get(rev) + + if new_node == old_node: + continue + + info = node_info[new_node] + print('revision %s updated; old=%s; new=%s' % (rev, old_node, new_node)) + + # Insert the push record into DynamoDB. + print('recording push in DynamoDB') + push_table.put_item(Item={ + # Partition key. + 'repo': repo_name, + # Range key. Sort by date. Break ties by poll revision. + 'push_id': '%s-%s' % (new_state['last_poll'], rev), + 'repo_url': repo_url, + 'repo_name': repo_name, + 'poll_rev': rev, + 'push_date': new_state['last_poll'], + 'node': new_node, + 'branch': info['branch'], + 'user': info['user'], + 'message': info['message'], + }) + + print('updating poll state') + poll_table.put_item(Item=new_state) + + +def schedule_ci(repo_url, repo, node): + print('scheduling CI for revision %s on %s' % (node, repo_url)) + dynamodb = boto3.resource('dynamodb') + ec2 = boto3.resource('ec2') + s3 = boto3.resource('s3') + sqs = boto3.client('sqs') + + job_table = dynamodb.Table(os.environ['DYNAMODB_JOB_TABLE']) + bucket = s3.Bucket(os.environ['S3_BUCKET']) + sqs_url = os.environ['SQS_URL'] + + # TODO we should build AMIs using in-repo code so all jobs are using an + # appropriate AMI for the revision. + for image in ec2.images.filter(Owners=['self']): + if image.name == 'hg-linux-dev-debian9': + schedule_linux_ci(job_table, sqs, sqs_url, bucket, repo_url, repo, node, image, 'debian9') + elif image.name == 'hg-linux-dev-debian10': + schedule_linux_ci(job_table, sqs, sqs_url, bucket, repo_url, repo, node, image, 'debian10') + elif image.name == 'hg-linux-dev-ubuntu18.04': + schedule_linux_ci(job_table, sqs, sqs_url, bucket, repo_url, repo, node, image, 'ubuntu18.04') + elif image.name == 'hg-linux-dev-ubuntu19.04': + schedule_linux_ci(job_table, sqs, sqs_url, bucket, repo_url, repo, node, image, 'ubuntu19.04') + + +RUN_TESTS_LINUX = ''' +#!/bin/bash + +HG=/hgdev/venv-bootstrap/bin/hg + +cd /hgwork/src + +${HG} pull -r $2 $1 +${HG} log -r $2 +${HG} up $2 + +export TMPDIR=/hgwork/tmp +cd tests +time $3 ./run-tests.py --json 2>&1 | tee output.log + +aws s3 cp --content-type text/plain --acl public-read output.log $4/output.log +# The JSON file has a prefix to allow loading in web browsers. +tail -c +13 report.json > report-truncated.json +aws s3 cp --content-type application/json --acl public-read report-truncated.json $4/report.json +'''.lstrip() + + +LINUX_USER_DATA = ''' +#cloud-config + +# TAG Name ci-worker +# TAG job_id {job_id} +# TAG repo_url {repo_url} +# TAG repo {repo} +# TAG node {node} +# TAG job_name {job_name} +# TAG build_number {build_number} +# TAG s3_bucket {s3_bucket} +# TAG s3_prefix {s3_prefix} + +repo_update: false +repo_upgrade: false + +write_files: + - path: /run-tests-linux + owner: hg:hg + permissions: '0755' + encoding: b64 + content: {run_tests_linux_b64} + +runcmd: + - mkdir /hgwork + - mkdir /hgwork/tmp + - chown -R hg:hg /hgwork + - sudo -u hg -g hg rsync -a /hgdev/src /hgwork/ + - sudo -u hg -g hg /run-tests-linux {repo_url} {node} {python} s3://{s3_bucket}/{s3_prefix} 2>&1 | tee /ci.log + - aws s3 cp --content-type text/plain --acl public-read /ci.log s3://{s3_bucket}/{s3_prefix}/ci.log + - echo done > done + - aws s3 cp --content-type text/plain --acl public-read done s3://{s3_bucket}/{s3_prefix}/done + +power_state: + delay: now + mode: poweroff + +'''.lstrip() + + +def schedule_linux_ci(job_table, sqs, sqs_url, bucket, repo_url, repo, node, image, os_prefix): + block_device_mappings = [ + { + 'DeviceName': image.block_device_mappings[0]['DeviceName'], + 'Ebs': { + 'DeleteOnTermination': True, + 'VolumeSize': 12, + 'VolumeType': 'gp2', + }, + } + ] + + run_tests_linux_b64 = base64.b64encode(RUN_TESTS_LINUX.encode('utf-8')).decode('ascii') + + jobs = ( + ('system-python2', '/usr/bin/python2'), + ('system-python3', '/usr/bin/python3'), + ('cpython-2.7', '/hgdev/pyenv/shims/python2.7'), + ('cpython-3.5', '/hgdev/pyenv/shims/python3.5'), + ('cpython-3.6', '/hgdev/pyenv/shims/python3.6'), + ('cpython-3.7', '/hgdev/pyenv/shims/python3.7'), + ('cpython-3.8', '/hgdev/pyenv/shims/python3.8'), + ) + + for job_name, python in jobs: + job_name = '%s-%s' % (os_prefix, job_name) + build_number = next_build_number(job_table, repo, node, job_name) + job_id = '%s-%s-%s-%d' % (repo, node, job_name, build_number) + + bucket_prefix = 'jobs/%s/%s/%s/%d' % (repo, node, job_name, build_number) + + # Unfortunately we cannot set tags on spot instance requests. + # So we encode tags in user data and parse these at launch time to + # turn into proper tags. + + user_data_params = dict( + job_id=job_id, + repo=repo, + repo_url=repo_url, + node=node, + job_name=job_name, + build_number=build_number, + python=python, + run_tests_linux_b64=run_tests_linux_b64, + s3_bucket=bucket.name, + s3_prefix=bucket_prefix, + ) + + config = { + 'BlockDeviceMappings': block_device_mappings, + 'EbsOptimized': True, + 'IamInstanceProfile': {'Name': 'ci-worker'}, + 'ImageId': image.id, + 'InstanceType': 'c5.9xlarge', + 'SecurityGroups': ['hg-linux-dev-1'], + } + + job_params = { + 'user_data_template': LINUX_USER_DATA, + 'user_data_params': user_data_params, + 'ec2_instance_launch_config': config, + } + + schedule_time = decimal.Decimal(time.time()) + + print('registering job in DynamoDB') + job_table.put_item(Item={ + 'job_id': job_id, + 'repo': repo, + 'node': node, + 'job_name': job_name, + 'build_number': build_number, + 'execution_state': 'pending', + 'schedule_time': schedule_time, + }) + + print('adding job to pending queue') + sqs.send_message( + QueueUrl=sqs_url, + MessageBody=json.dumps(job_params, sort_keys=True) + ) + + +def start_pending_job(ec2, user_data_template, user_data_params, ec2_instance_config): + """Called to request the start of a pending job.""" + user_data = user_data_template.format(**user_data_params) + + print('requesting spot instance for job %s' % user_data_params['job_id']) + + launch_spec = dict(ec2_instance_config) + launch_spec['UserData'] = base64.b64encode(user_data.encode('utf-8')).decode('utf-8') + + # Spot instances are substantially cheaper but can be terminated at will + # by Amazon. That's fine. We're a CI system. If the instance is terminated, + # we can just retry the job. + # + # The max bid price is the on-demand price. So in the typical case we save + # $$$. If we're unlucky we pay the on-demand rate. You can't lose. + ec2.request_spot_instances( + BlockDurationMinutes=60, + ValidUntil=datetime.datetime.utcnow() + datetime.timedelta(minutes=10), + LaunchSpecification=launch_spec, + ) + + +def react_to_instance_state_change(job_table, instance, state): + """React to a CI worker instance state change.""" + now = decimal.Decimal(time.time()) + + # CI workers advertise their job info via tags. However, the tags cannot + # be set for spot instances and are instead encoded in user data. So when + # a spot instance starts, detect that here and set the tags so they can be + # seen by future handlers. + tags = {t['Key']: t['Value'] for t in instance.tags or []} + + if state == 'pending' and 'job_id' not in tags: + print('fetching UserData to parse tags') + user_data = instance.describe_attribute(Attribute='userData')['UserData']['Value'] + user_data = base64.b64decode(user_data.encode('utf-8')).decode('utf-8') + + set_tags = [] + + for line in user_data.splitlines(): + if not line.startswith('# TAG '): + continue + + kv = line[len('# TAG '):].strip() + name, value = kv.split(' ', 1) + tags[name] = value + set_tags.append({ + 'Key': name, + 'Value': value, + }) + + if set_tags: + print('setting new tags on instance %s: %s' % (instance.instance_id, set_tags)) + instance.create_tags(Tags=set_tags) + + job_id = tags['job_id'] + + # New instance/job seen. Record that. + if state == 'pending': + # Spot instances can't have tags at launch time. So we encode tags in user + # data, where they can always be parsed. + + print('recording running state for job %s' % job_id) + job_table.update_item( + Key={'job_id': job_id}, + UpdateExpression=( + 'set execution_state = :state, ' + 'instance_id = :instance_id, ' + 'start_time = :start_time, ' + 'exit_clean = :exit_clean' + ), + ExpressionAttributeValues={ + ':state': 'running', + ':instance_id': instance.instance_id, + ':start_time': now, + ':exit_clean': False, + }, + ) + return + + elif state != 'shutting-down': + return + + # Instance is shutting down. Job is done. Update the state change + # and index results from S3. + print('recording finished results from job %s' % job_id) + + job_table.update_item( + Key={'job_id': job_id}, + UpdateExpression='set execution_state = :state, end_time = :end_time', + ExpressionAttributeValues={ + ':state': 'done', + ':end_time': now, + }, + ) + + +def process_job_artifact(job_table, test_result_table, key): + """Process an S3 key representing a job artifact.""" + print('processing S3 object %s' % key.key) + + # `key` should be `jobs///// + parts = key.key.split('/') + if parts[0] != 'jobs': + print('ignoring artifact not tied to a specific job: %s' % key.key) + return + + if len(parts) < 6: + print('key does not have enough parts: %s; ignoring' % key.key) + return + + repo, node, job_name, build_number = parts[1:5] + artifact_name = '/'.join(parts[5:]) + + job_id = '%s-%s-%s-%s' % (repo, node, job_name, build_number) + + # Verify the job ID is known. + res = job_table.get_item(Key={'job_id': job_id}, ProjectionExpression='instance_id') + if 'Item' not in res: + print('unable to find job id (%s) for artifact: %s' % (job_id, key.key)) + return + + if artifact_name == 'report.json': + process_report_json(job_table, test_result_table, job_id, repo, node, + job_name, key) + elif artifact_name == 'output.log': + output_log_url = '%s/%s/%s' % ( + key.meta.client.meta.endpoint_url, + key.Bucket().name, + key.key, + ) + + job_table.update_item( + Key={'job_id': job_id}, + UpdateExpression='set output_log_url = :url', + ExpressionAttributeValues={':url': output_log_url}, + ) + # This is written when the task shuts down cleanly. + elif artifact_name == 'done': + job_table.update_item( + Key={'job_id': job_id}, + UpdateExpression='set exit_clean = :c', + ExpressionAttributeValues={':c': True}, + ) + else: + print('ignoring artifact %s' % artifact_name) + + +def process_report_json(job_table, test_result_table, job_id, repo, node, + job_name, key): + """Process a `report.json` file emitted from Mercurial's test harness.""" + print('retrieving S3 object %s' % key.key) + results = json.load(key.get()['Body']) + + overall = all(v['result'] in ('success', 'skip') for v in results.values()) + + test_count = 0 + pass_count = 0 + skipped = set() + failed = set() + + for k, v in results.items(): + test_count += 1 + + if v['result'] == 'success': + pass_count += 1 + elif v['result'] == 'skip': + skipped.add(k) + else: + failed.add(k) + + # Record job metadata. + job_table.update_item( + Key={'job_id': job_id}, + UpdateExpression=( + 'set overall_result = :overall_result,' + 'test_count = :test_count, ' + 'pass_count = :pass_count, ' + 'skip_count = :skip_count, ' + 'fail_count = :fail_count' + ), + ExpressionAttributeValues={ + ':overall_result': overall, + ':test_count': test_count, + ':pass_count': pass_count, + ':skip_count': len(skipped), + ':fail_count': len(failed), + }, + ) + + # And write each test result into the results table. + with test_result_table.batch_writer() as batch: + for k, v in results.items(): + v['job_id'] = job_id + v['repo'] = repo + v['node'] = node + v['job_name'] = job_name + v['test_name'] = k + + # Empty strings are not allowed. Normalize to None. + v['diff'] = v['diff'] or None + + # Normalize float strings to floats. + for kk in ('csys', 'cuser', 'end', 'start', 'time'): + if v[kk]: + v[kk] = decimal.Decimal(v[kk]) + + batch.put_item(Item=v) diff --git a/contrib/ci/lambda_functions/web.py b/contrib/ci/lambda_functions/web.py new file mode 100644 --- /dev/null +++ b/contrib/ci/lambda_functions/web.py @@ -0,0 +1,410 @@ +# web.py - Web component of Mercurial CI +# +# Copyright 2019 Gregory Szorc +# +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. + +# no-check-code because Python 3 native. + +import datetime +import html +import os + +import boto3 +from boto3.dynamodb.conditions import ( + Key, +) + +e = html.escape + + +HTML_HEADERS = { + 'Content-Security-Policy': "default-src: https:; img-src 'self'; script-src 'self'; style-src 'self' 'unsafe-inline'; frame-ancestors 'none'", + 'Content-Type': 'text/html; charset=utf-8', + 'Strict-Transport-Security': 'max-age=63072000', + 'X-Content-Type-Options': 'nosniff', + 'X-Frame-Options': 'DENY', + 'X-XSS-Protection': '1; mode=block', +} + +LONG_TEST_THRESHOLD = 20.0 + + +def handler(event, context): + path = event['path'] + print('received request for %s' % path) + + dynamodb = boto3.resource('dynamodb') + + repo_poll_table = dynamodb.Table(os.environ['DYNAMODB_REPO_POLL_TABLE']) + push_table = dynamodb.Table(os.environ['DYNAMODB_PUSH_TABLE']) + job_table = dynamodb.Table(os.environ['DYNAMODB_JOB_TABLE']) + test_result_table = dynamodb.Table(os.environ['DYNAMODB_TEST_RESULT_TABLE']) + + if path == '/': + return render_main(repo_poll_table, push_table, job_table) + elif path.startswith('/job-info/'): + job_id = path[len('/job-info/'):] + return render_job_info(job_table, test_result_table, job_id) + else: + return { + 'statusCode': 404, + 'headers': HTML_HEADERS, + 'body': 'not found', + } + + +def render_main(repo_poll_table, push_table, job_table): + html = [ + '', + '', + '', + ] + + for repo_entry in repo_poll_table.scan(Select='ALL_ATTRIBUTES')['Items']: + repo_name = repo_entry['repo'] + repo_url = repo_entry['repo_url'] + + html.append('

%s

' % (e(repo_url, quote=True), e(repo_name))) + + res = push_table.query( + KeyConditionExpression=Key('repo').eq(repo_name), + Select='ALL_ATTRIBUTES', + Limit=10, + ScanIndexForward=False, + ) + for push in res['Items']: + html.append(push_info(push, repo_url)) + + # Now find all jobs for this push. + + cset_results = {} + + for entry in get_jobs_metdata(job_table, repo_name, push['node']): + job_name = entry['job_name'] + build_number = entry['build_number'] + + if job_name not in cset_results: + cset_results[job_name] = {} + + job_results = cset_results[job_name] + + job_results[build_number] = entry + + html.extend([ + '', + '', + '', + '', + '', + '', + '', + '', + '', + '', + '', + '', + '', + '', + ]) + + for job_name, job_results in sorted(cset_results.items()): + for build_number, job_info in sorted(job_results.items()): + if 'output_log_url' in job_info: + output = 'output log' % e(job_info['output_log_url']) + else: + output = '' + + schedule_time = datetime.datetime.utcfromtimestamp(job_info['schedule_time']) + + if 'start_time' in job_info: + start_time = datetime.datetime.utcfromtimestamp(job_info['start_time']) + start_delay = '%ds' % (start_time - schedule_time).total_seconds() + else: + start_delay = 'n/a' + + if 'end_time' in job_info: + end_time = datetime.datetime.utcfromtimestamp(job_info['end_time']) + execution_time = '%ds' % (end_time - start_time).total_seconds() + else: + execution_time = 'n/a' + + if 'test_count' in job_info: + test_count = '%d' % job_info['test_count'] + else: + test_count = 'n/a' + + if 'pass_count' in job_info: + pass_count = '%d' % job_info['pass_count'] + else: + pass_count = 'n/a' + + if 'fail_count' in job_info: + fail_count = '%d' % job_info['fail_count'] + else: + fail_count = 'n/a' + + if 'skip_count' in job_info: + skip_count = '%d' % job_info['skip_count'] + else: + skip_count = 'n/a' + + if job_info['execution_state'] in ('pending', 'running'): + job_state = job_info['execution_state'] + elif job_info['execution_state'] == 'done': + exit_clean = job_info.get('exit_clean') + if exit_clean is None: + job_state = 'unknown' + elif exit_clean is True: + job_state = 'completed' + elif exit_clean is False: + job_state = 'aborted' + else: + raise Exception('unhandled exit_clean: %s' % exit_clean) + else: + raise Exception('unhandled execution_state: %s' % job_info['execution_state']) + + if execution_time != 'n/a': + execution_entry = '%s' % ( + e(job_info['job_id'], quote=True), e(execution_time)) + else: + execution_entry = e(execution_time) + + if fail_count not in ('n/a', '0'): + fail_entry = '%s' % ( + e(job_info['job_id'], quote=True), e(fail_count)) + else: + fail_entry = e(fail_count) + + if skip_count not in ('n/a', '0'): + skip_entry = '%s' % ( + e(job_info['job_id'], quote=True), e(skip_count)) + else: + skip_entry = e(skip_count) + + html.extend([ + '', + '' % e(job_name), + '' % ( + e(job_info['job_id'], quote=True), build_number), + '' % e(job_state), + '' % schedule_time.isoformat(), + '' % start_delay, + '' % execution_entry, + '' % test_count, + '' % pass_count, + '' % fail_entry, + '' % skip_entry, + '' % output, + '' + ]) + + html.append('
Job NameRunJob StateScheduled AtStart DelayExecution TimeTotal TestsPassedFailedSkippedArtifacts
%s%d%s%s%s%s%s%s%s%s%s
') + + html.extend([ + '', + '', + ]) + + return { + 'statusCode': 200, + 'headers': HTML_HEADERS, + 'body': ''.join(html), + } + + +def render_job_info(job_table, test_result_table, job_id): + html = [ + '', + 'Job %s' % e(job_id), + '

Job %s

' % e(job_id), + ] + + res = job_table.get_item(Key={'job_id': job_id}) + if 'Item' not in res: + return { + 'statusCode': 404, + 'headers': HTML_HEADERS, + 'body': 'job not found', + } + + job = res['Item'] + + schedule_time = datetime.datetime.utcfromtimestamp(job['schedule_time']) + if 'start_time' in job: + start_time = datetime.datetime.utcfromtimestamp(job['start_time']).isoformat() + else: + start_time = 'n/a' + if 'end_time' in job: + end_time = datetime.datetime.utcfromtimestamp(job['end_time']).isoformat() + else: + end_time = 'n/a' + + html.extend([ + '', + '' % e(job['repo']), + '' % e(job['node']), + '' % e(job['job_name']), + '' % e(schedule_time.isoformat()), + '' % e(start_time), + '' % e(end_time), + '
Repo:%s
Node:%s
Name:%s
Scheduled At:%s
Started At:%s
Finished At:%s
', + ]) + + test_results = list(get_test_results(test_result_table, job_id)) + + if job.get('fail_count') not in (None, 0): + html.append('

Failed Tests

') + + failed_tests = [t for t in test_results if t['result'] == 'failure'] + + html.append('
    ') + + for result in failed_tests: + html.append('
  • %s
  • ' % ( + e(result['test_name'], quote=True), e(result['test_name']))) + + html.append('
') + + for result in failed_tests: + html.extend([ + '

%s

' % ( + e(result['test_name'], quote=True), e(result['test_name'])), + '
%s
' % e(result['diff'] or ''), + ]) + + if job.get('skip_count') not in (None, 0): + html.append('

Skipped Tests

') + html.append('
    ') + + for result in test_results: + if result['result'] != 'skip': + continue + + html.append('
  • %s
  • ' % e(result['test_name'])) + + html.append('
') + + html.append('

Long Tests

') + if test_results: + html.extend([ + '', + '', + ]) + + for result in sorted(test_results, key=lambda x: x['time'], reverse=True): + if result['time'] < LONG_TEST_THRESHOLD: + break + + html.append('' % ( + result['time'], e(result['test_name']))) + + html.append('
DurationTest
%.1fs%s
') + else: + html.append('

No test results

') + + html.append('

Timeline

') + + if test_results: + html.append('' % ( + len(test_results) * 20, + max(t['end'] for t in test_results) + 400, + )) + + y_offset = 0 + + for result in sorted(test_results, key=lambda x: x['start']): + duration = result['end'] - result['start'] + + html.extend([ + '' % ( + int(result['start']), y_offset), + # Need to add 1 otherwise 0 won't render. + '' % (int(duration) + 1), + '%s (%.2fs)' % ( + int(duration) + 6, e(result['test_name']), + result['end'] - result['start']), + '', + ]) + + y_offset += 20 + + html.append('') + else: + html.append('

No test results

') + + html.append('') + + return { + 'statusCode': 200, + 'headers': HTML_HEADERS, + 'body': ''.join(html) + } + + +def get_jobs_metdata(job_table, repo, node): + """Obtain jobs records for a revision.""" + exclusive_start_key = None + + while True: + # Passing ExclusiveStartKey=None doesn't work :( + extra = {} + if exclusive_start_key: + extra['ExclusiveStartKey'] = exclusive_start_key + + res = job_table.scan( + Select='ALL_ATTRIBUTES', + FilterExpression='repo = :repo AND node = :node', + ExpressionAttributeValues={ + ':repo': repo, + ':node': node, + }, + **extra + ) + for entry in res['Items']: + yield entry + + if 'LastEvaluatedKey' not in res: + return + + exclusive_start_key = res['LastEvaluatedKey'] + + +def get_test_results(test_result_table, job_id): + exclusive_start_key = None + + while True: + extra = {} + if exclusive_start_key: + extra['ExclusiveStartKey'] = exclusive_start_key + + res = test_result_table.query( + KeyConditionExpression=Key('job_id').eq(job_id), + Select='ALL_ATTRIBUTES', + **extra + ) + + for item in res['Items']: + yield item + + if not res.get('LastEvaluatedKey'): + return + + exclusive_start_key = res['LastEvaluatedKey'] + + +def push_info(push, repo_url): + cset_url = '%s/rev/%s' % (repo_url, push['node']) + + return ''.join([ + '

Changeset %s

' % ( + e(cset_url, quote=True), e(push['node'])), + '

branch: %s

' % e(push['branch']), + '

author: %s

' % e(push['user']), + '

description: %s

' % e(push['message'].splitlines()[0]), + ]) diff --git a/contrib/ci/terraform/account_manager.tf b/contrib/ci/terraform/account_manager.tf new file mode 100644 --- /dev/null +++ b/contrib/ci/terraform/account_manager.tf @@ -0,0 +1,76 @@ +# This file defines resources that allows an external / parent account to +# manage this account. + +# Assume role policy which can be used by the root user in the +# parent account. +data "aws_iam_policy_document" "assume_role_admin_from_parent" { + statement { + effect = "Allow" + principals { + type = "AWS" + identifiers = ["arn:aws:iam::${var.parent_account_id}:root"] + } + actions = ["sts:AssumeRole"] + } +} + +# Allows the purging lambda function from the parent account to assume roles. +data "aws_iam_policy_document" "assume_role_account_manager" { + statement { + effect = "Allow" + principals { + type = "AWS" + identifiers = [var.parent_account_manager_role_arn] + } + actions = ["sts:AssumeRole"] + } +} + +# Allows the account manager to do anything. +data "aws_iam_policy_document" "account_manager" { + statement { + effect = "Allow" + actions = ["*"] + resources = ["*"] + } +} + +resource "aws_iam_role" "account_manager" { + name = "account-manager" + description = "Assumed to audit and clean up this account" + assume_role_policy = data.aws_iam_policy_document.assume_role_account_manager.json +} + +output "iam_role_account_manager_arn" { + value = aws_iam_role.account_manager.arn +} + +resource "aws_iam_role_policy" "account_manager" { + role = aws_iam_role.account_manager.name + name = aws_iam_role.account_manager.name + policy = data.aws_iam_policy_document.account_manager.json +} + +# Allow parent account to reach into our Terraform state. +data "aws_iam_policy_document" "parent_account_terraform_access" { + statement { + effect = "Allow" + actions = [ + "s3:ListBucket", + "s3:GetObject", + ] + principals { + type = "AWS" + identifiers = ["arn:aws:iam::${var.parent_account_id}:user/gps"] + } + resources = [ + aws_s3_bucket.private.arn, + "${aws_s3_bucket.private.arn}/terraform/*", + ] + } +} + +resource "aws_s3_bucket_policy" "parent_account_terraform_access" { + bucket = aws_s3_bucket.private.bucket + policy = data.aws_iam_policy_document.parent_account_terraform_access.json +} diff --git a/contrib/ci/terraform/cloudwatch.tf b/contrib/ci/terraform/cloudwatch.tf new file mode 100644 --- /dev/null +++ b/contrib/ci/terraform/cloudwatch.tf @@ -0,0 +1,4 @@ +resource "aws_cloudwatch_log_group" "ssm_run_power_shell_script" { + name = "/aws/ssm/AWS-RunPowerShellScript" + retention_in_days = 7 +} diff --git a/contrib/ci/terraform/iam.tf b/contrib/ci/terraform/iam.tf new file mode 100644 --- /dev/null +++ b/contrib/ci/terraform/iam.tf @@ -0,0 +1,52 @@ +# Generic policy to allow an EC2 service to assume a role. +data "aws_iam_policy_document" "assume_role_ec2" { + statement { + effect = "Allow" + principals { + type = "Service" + identifiers = ["ec2.amazonaws.com"] + } + actions = ["sts:AssumeRole"] + } +} + +# Generic policy to allow a Lambda function to assume a role. +data "aws_iam_policy_document" "assume_role_lambda" { + statement { + effect = "Allow" + principals { + type = "Service" + identifiers = ["lambda.amazonaws.com"] + } + actions = ["sts:AssumeRole"] + } +} + +resource "aws_iam_role" "admin" { + name = "admin" + description = "Full administrator access" + assume_role_policy = data.aws_iam_policy_document.assume_role_admin_from_parent.json +} + +resource "aws_iam_role_policy_attachment" "admin-administrator" { + role = aws_iam_role.admin.name + policy_arn = "arn:aws:iam::aws:policy/AdministratorAccess" +} + +resource "aws_iam_group" "admins" { + name = "admins" +} + +resource "aws_iam_group_policy_attachment" "admins-administrator" { + group = aws_iam_group.admins.name + policy_arn = "arn:aws:iam::aws:policy/AdministratorAccess" +} + +resource "aws_iam_user" "hg" { + name = "hg" +} + +resource "aws_iam_user_group_membership" "hg-group-admins" { + user = aws_iam_user.hg.name + groups = [aws_iam_group.admins.name] +} diff --git a/contrib/ci/terraform/init.tf b/contrib/ci/terraform/init.tf new file mode 100644 --- /dev/null +++ b/contrib/ci/terraform/init.tf @@ -0,0 +1,42 @@ +provider "archive" {} + +terraform { + backend "s3" { + bucket = "mercurial-ci-private" + key = "terraform/hg.tfstate" + region = "us-west-2" + } +} + +variable "account_id" { + # gregoryszorc-hg + default = "585867089697" +} + +variable "parent_account_id" { + # gregoryszorc + default = "381522727988" +} + +variable "parent_account_manager_role_arn" { + default = "arn:aws:iam::381522727988:role/lambda-hg-account-manage" +} + +variable "ci_hostname" { + # Route53 defined in parent account. + default = "ci.hg.gregoryszorc.com" +} + +resource "aws_iam_account_alias" "alias" { + account_alias = "gregoryszorc-hg" +} + +provider "aws" { + region = "us-west-2" +} + +data "archive_file" "lambda_ci" { + type = "zip" + output_path = "${path.root}/../../../build/lambda_ci.zip" + source_dir = "${path.root}/../lambda_functions" +} diff --git a/contrib/ci/terraform/job_executor.tf b/contrib/ci/terraform/job_executor.tf new file mode 100644 --- /dev/null +++ b/contrib/ci/terraform/job_executor.tf @@ -0,0 +1,190 @@ +# Defines resources for executing jobs. + +resource "aws_iam_role" "lambda_ci_run_pending_job" { + name = "lambda-ci-run-pending-job" + description = "For Lambda function to launch EC2 instances for pending jobs" + assume_role_policy = data.aws_iam_policy_document.assume_role_lambda.json +} + +# Function reacting to a pending job in the queue and trying to start it. +resource "aws_lambda_function" "ci_run_pending_job" { + function_name = "ci-run-pending-job" + description = "Reacts to pending job events on SQS queue" + filename = data.archive_file.lambda_ci.output_path + handler = "ci.handle_pending_job" + source_code_hash = data.archive_file.lambda_ci.output_base64sha256 + runtime = "python3.7" + timeout = 60 + role = aws_iam_role.lambda_ci_run_pending_job.arn +} + +resource "aws_cloudwatch_log_group" "lambda_ci_run_pending_job" { + name = "/aws/lambda/${aws_lambda_function.ci_run_pending_job.function_name}" + retention_in_days = 7 +} + +data "aws_iam_policy_document" "lambda_ci_run_pending_job" { + # Allow Lambda function to write CloudWatch events. + statement { + effect = "Allow" + actions = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents", + ] + resources = [aws_cloudwatch_log_group.lambda_ci_run_pending_job.arn] + } + # Allow querying job state in S3 and putting objects there. + statement { + effect = "Allow" + actions = [ + "s3:ListBucket", + "s3:PutObject", + "s3:PutObjectAcl", + ] + resources = ["${aws_s3_bucket.mercurial-ci.arn}/jobs/*"] + } + # Allow modifying SQS queue. + statement { + effect = "Allow" + actions = [ + "sqs:ReceiveMessage", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes", + ] + resources = [aws_sqs_queue.ci_pending_jobs.arn] + } + # Allow querying EC2 state and launching instances to run jobs. + statement { + effect = "Allow" + actions = [ + "ec2:*", + "iam:*", + ] + resources = ["*"] + } +} + +resource "aws_iam_role_policy" "lambda_ci_handle_pending_job" { + role = aws_iam_role.lambda_ci_run_pending_job.name + name = aws_iam_role.lambda_ci_run_pending_job.name + policy = data.aws_iam_policy_document.lambda_ci_run_pending_job.json +} + +# Hook up SQS to Lambda function. +resource "aws_lambda_event_source_mapping" "ci_run_pending_job_from_sqs" { + event_source_arn = aws_sqs_queue.ci_pending_jobs.arn + function_name = aws_lambda_function.ci_run_pending_job.arn + batch_size = 1 +} + +# We have another Lambda function for reacting to state changes in worker +# instances. + +resource "aws_cloudwatch_log_group" "lambda_ci_instance_state_change" { + name = "/aws/lambda/${aws_lambda_function.ci_instance_state_change.function_name}" + retention_in_days = 7 +} + +resource "aws_iam_role" "lambda_ci_instance_state_change" { + name = "lambda-ci-instance-state-change" + description = "For Lambda function reacting to instance state changes" + assume_role_policy = data.aws_iam_policy_document.assume_role_lambda.json +} + +resource "aws_lambda_function" "ci_instance_state_change" { + function_name = "ci-instance-state-change" + description = "Reacts to EC2 instances changing state" + filename = data.archive_file.lambda_ci.output_path + handler = "ci.handle_instance_state_change" + source_code_hash = data.archive_file.lambda_ci.output_base64sha256 + runtime = "python3.7" + timeout = 120 + role = aws_iam_role.lambda_ci_instance_state_change.arn + environment { + variables = { + DYNAMODB_JOB_TABLE = aws_dynamodb_table.ci_job.name + } + } +} + +data "aws_iam_policy_document" "lambda_ci_instance_state_change" { + # Allow Lambda function to write CloudWatch events. + statement { + effect = "Allow" + actions = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents", + ] + resources = [aws_cloudwatch_log_group.lambda_ci_instance_state_change.arn] + } + # Allow querying EC2 instance state. + statement { + effect = "Allow" + actions = [ + "ec2:CreateTags", + "ec2:DescribeInstanceAttribute", + "ec2:DescribeInstances", + ] + resources = ["*"] + } + # Allow updating job state in DynamoDB. + statement { + effect = "Allow" + actions = [ + "dynamodb:BatchWriteItem", + "dynamodb:PutItem", + "dynamodb:UpdateItem", + ] + resources = [ + aws_dynamodb_table.ci_job.arn, + aws_dynamodb_table.ci_test_result.arn, + ] + } + # Allow reading S3 keys to retrieve job state. + statement { + effect = "Allow" + actions = [ + "s3:GetObject", + ] + resources = ["${aws_s3_bucket.mercurial-ci.arn}/jobs/*"] + } +} + +resource "aws_iam_role_policy" "lambda_ci_instance_state_change" { + role = aws_iam_role.lambda_ci_instance_state_change.name + name = aws_iam_role.lambda_ci_instance_state_change.name + policy = data.aws_iam_policy_document.lambda_ci_instance_state_change.json +} + +# CloudWatch Event Rule that fires whenever an instance state changes. +resource "aws_cloudwatch_event_rule" "trigger_instance_state_change" { + name = "trigger-instance-state-change" + description = "Signals when an EC2 instance state is changing" + event_pattern = < -X contrib/automation/ \ + > -X contrib/ci/ \ > -X contrib/packaging/hgpackaging/ \ > -X contrib/packaging/inno/ \ > -X contrib/packaging/wix/ \