diff --git a/hgext/s3wireprotocache.py b/hgext/s3wireprotocache.py new file mode 100644 --- /dev/null +++ b/hgext/s3wireprotocache.py @@ -0,0 +1,287 @@ +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. + +'''S3 content redirect caching for wire protocol v2 + +Connor Sheehan + +This extension implements an S3 cacher for CBOR encoded objects +in the version 2 wire protocol. Server operators can provide AWS +credentials and the name of a bucket which will act as the cache. +The cacher will take a key and send an HTTP HEAD request to AWS, +which will throw a 404 error if the object does not exist (ie +a cache miss). In the event the object exists in S3, a presigned +url is generated and a content redirect response is issued to +the client. + +:: + + [extensions] + s3wireprotocache = + [s3wireprotocache] + # configure auth + access_key_id = accesskeyid + secret_access_key = secretaccesskey + # set the bucket to cache objects in + bucket = cachebucket + # specify S3 domains as redirect targets + redirecttargets = https://s3-us-west-2.amazonaws.com/,\ + https://s3-us-east-2.amazonaws.com/ + + # specify region (optional, will query AWS if empty) + region = us-east-2 + + # set minimum object size in bytes (optional) + minimumobjectsize = 500 + + # set object ACL in S3 (optional, default public-read) + cacheacl = private + + # specify alternative endpoint url (optional, testing) + endpoint_url = http://localhost:12345/ +''' + +from __future__ import absolute_import + +from mercurial import ( + extensions, + registrar, + repository, + wireprototypes, + wireprotov2server, +) +from mercurial.utils import ( + interfaceutil, +) + +import boto3 +import botocore.exceptions + +configtable = {} +configitem = registrar.configitem(configtable) + +configitem('s3wireprotocache', 'access_key_id', + default=None) +configitem('s3wireprotocache', 'bucket', + default=None) +configitem('s3wireprotocache', 'cacheacl', + default='public-read') +configitem('s3wireprotocache', 'endpoint_url', + default=None) +configitem('s3wireprotocache', 'minimumobjectsize', + default=None) +configitem('s3wireprotocache', 'redirecttargets', + default=None) +configitem('s3wireprotocache', 'secret_access_key', + default=None) + +def create_redirect_url(client, bucket, key): + '''Returns a bytes redirect url for the object `key` in + `bucket`. + ''' + params = { + 'Bucket': bucket, + 'Key': key, + } + url = client.generate_presigned_url('get_object', + Params=params) + + return bytes(url) + +def put_s3(client, bucket, key, object, cacheacl): + '''Puts the `object` into the S3 `bucket` as `key`. + ''' + params = { + 'ACL': cacheacl, + 'Body': object, + 'Bucket': bucket, + 'ContentEncoding': b'application/mercurial-cbor', + 'Key': key, + } + + client.put_object(**params) + +def is_s3cache_hit(client, bucket, key): + '''Returns `True` if the key is present in the S3 cache + bucket. + ''' + try: + params = { + 'Bucket': bucket, + 'Key': key, + } + + # If this doesn't throw, the object exists + client.head_object(**params) + return True + + except botocore.exceptions.ClientError as e: + # 404 indicated the object does not exist + if e.response['Error']['Code'] == '404': + return False + + # Throw other boto3 errors for logging + # by caller + raise + +@interfaceutil.implementer(repository.iwireprotocolcommandcacher) +class s3wireprotocache(object): + def __init__(self, ui, command, encodefn, redirecttargets, redirecthashes, + req): + self.ui = ui + self.encodefn = encodefn + + self.redirecttargets = redirecttargets + self.redirecthashes = redirecthashes + + self.req = req + self.key = None + + # Auth config + self.access_key_id = ui.config('s3wireprotocache', + 'access_key_id') + self.secret_access_key = ui.config('s3wireprotocache', + 'secret_access_key') + self.s3_endpoint_url = ui.config('s3wireprotocache', + 'endpoint_url') + + clientparams = { + 'aws_access_key_id': self.access_key_id, + 'aws_secret_access_key': self.secret_access_key, + } + + # Alternative endpoint for testing + if self.s3_endpoint_url: + clientparams['endpoint_url'] = self.s3_endpoint_url + + self.s3 = boto3.client('s3', **clientparams) + + # Bucket name and region + self.bucket = ui.config('s3wireprotocache', 'bucket') + self.region = ui.config('s3wireprotocache', 'region') or \ + self.s3.get_bucket_location(Bucket=self.bucket) + + self.cacheacl = ui.config('s3wireprotocache', 'cacheacl') + + self.minimumobjectsize = ui.configint('s3wireprotocache', + 'minimumobjectsize') + + # Append objects here to be cached during `onfinished` + self.buffered = [] + + # Indicates if the result was a cache hit or miss + self.cachehit = False + + ui.log('s3wireprotocache', 's3 cacher constructed for %s\n', command) + + def __enter__(self): + return self + + def __exit__(self, exctype, excvalue, exctb): + if exctype: + self.ui.log('s3wireprotocache', 'cacher exiting due to error\n') + + def adjustcachekeystate(self, state): + if self.s3_endpoint_url: # testing backdoor + del state[b'repo'] + return + + def setcachekey(self, key): + '''Set the cache key for future lookup + ''' + self.key = key + return True + + def lookup(self): + '''Lookup the previously set key within the cache + ''' + try: + self.cachehit = is_s3cache_hit(self.s3, self.bucket, self.key) + + if self.cachehit: + self.ui.log('s3wireprotocache', 'cache hit for %s\n' % self.key) + + url = create_redirect_url(self.s3, self.bucket, self.key) + + response = wireprototypes.alternatelocationresponse( + mediatype=b'application/mercurial-cbor', + url=url, + ) + + return {'objs': [response]} + else: + self.ui.log('s3wireprotocache', 'cache miss for %s\n', self.key) + + except botocore.exceptions.ClientError as e: + self.ui.log('s3wireprotocache', 'boto3 errored out: %s\n' % e) + + return None + + def onobject(self, obj): + '''Buffers the object to be inserted into the cache, + if the key was not a cache hit + ''' + if not self.cachehit: + self.buffered.extend(self.encodefn(obj)) + yield obj + + def onfinished(self): + '''Inserts buffered objects into the cache + ''' + if not self.buffered: + return [] + + # Check the size of the object and assert it reaches minimum object size + entry = b''.join(self.buffered) + if len(entry) < self.minimumobjectsize: + self.ui.log('s3wireprotocache', + 'obj size (%s) is below minimum of %s; not caching\n' + % (len(entry), self.minimumobjectsize)) + return [] + + self.ui.log('s3wireprotocache', 'storing cache entry for %s\n' + % self.key) + put_s3(self.s3, self.bucket, self.key, entry, self.cacheacl) + + return [] + +def parse_lowest_level_domain(redirect): + '''Grabs the lowest level domain from + a redirect target. + ''' + return redirect.replace('https://', '').split('.')[0] + +def getadvertisedredirecttargets(orig, repo, proto): + '''Converts list of comma separated redirect targets + urls to the advertised redirect target format + ''' + ui = repo.ui + + redirectconf = ui.config('s3wireprotocache', 'redirecttargets') + redirects = redirectconf.split(',') + + redirects = [ + { + 'name': bytes(parse_lowest_level_domain(r)), + 'protocol': b'https', + 'snirequired': True, + 'tlsversions': [b'1.1', b'1.2'], + 'uris': bytes(r), + } + for r in redirects + ] + + return redirects + +def makeresponsecacher(orig, repo, proto, command, args, objencoderfn, + redirecttargets, redirecthashes): + '''Monkey-patch function to provide custom response cacher + ''' + return s3wireprotocache(repo.ui, command, objencoderfn, + redirecttargets, redirecthashes, proto._req) + +def extsetup(ui): + extensions.wrapfunction(wireprotov2server, 'makeresponsecacher', + makeresponsecacher) + extensions.wrapfunction(wireprotov2server, 'getadvertisedredirecttargets', + getadvertisedredirecttargets) diff --git a/tests/hghave.py b/tests/hghave.py --- a/tests/hghave.py +++ b/tests/hghave.py @@ -806,3 +806,25 @@ except (ImportError, AttributeError): pass return False + +@check('motoserver', 'moto AWS mock server') +def has_s3(): + '''Assert the boto3 mock library `moto` is available, + as well as the `Flask` dependency which enables running + a mock S3 server + ''' + try: + import moto + moto.mock_s3 + + import flask + flask.Flask + + import simplejson + simplejson.__version__ + + return True + except (ImportError, AttributeError): + pass + return False + diff --git a/tests/test-help.t b/tests/test-help.t --- a/tests/test-help.t +++ b/tests/test-help.t @@ -373,6 +373,8 @@ purge command to delete untracked files from the working directory relink recreates hardlinks between repository clones + s3wireprotocache + S3 content redirect caching for wire protocol v2 schemes extend schemes with shortcuts to repository swarms share share a common history between several working directories shelve save and restore changes to the working directory diff --git a/tests/test-s3wireprotocache.t b/tests/test-s3wireprotocache.t new file mode 100644 --- /dev/null +++ b/tests/test-s3wireprotocache.t @@ -0,0 +1,219 @@ +#require motoserver + + $ . $TESTDIR/wireprotohelpers.sh + +Set up the mock S3 server, create a bucket + + $ moto_server -p 15467 s3 >> mocks3.log 2>&1 & + $ MOTO_PID=$! + >>> import boto3 + >>> s3 = boto3.client('s3', + ... aws_access_key_id='dummyaccessid', + ... aws_secret_access_key='dummysecretkey', + ... endpoint_url='http://localhost:15467/') + >>> _ = s3.create_bucket( + ... ACL='public-read', + ... Bucket='testbucket') + + $ cat >> $HGRCPATH << EOF + > [extensions] + > blackbox = + > [blackbox] + > track = s3wireprotocache + > EOF + $ hg init server + $ enablehttpv2 server + $ cd server + $ cat >> .hg/hgrc << EOF + > [extensions] + > s3wireprotocache = + > [s3wireprotocache] + > access_key_id = dummyaccessid + > secret_access_key = dummysecretkey + > bucket = testbucket + > redirecttargets = http://localhost:15467/ + > endpoint_url = http://localhost:15467/ + > EOF + + $ echo a0 > a + $ echo b0 > b + $ hg -q commit -A -m 'commit 0' + $ echo a1 > a + $ hg commit -m 'commit 1' + $ echo b1 > b + $ hg commit -m 'commit 2' + $ echo a2 > a + $ echo b2 > b + $ hg commit -m 'commit 3' + + $ hg log -G -T '{rev}:{node} {desc}' + @ 3:50590a86f3ff5d1e9a1624a7a6957884565cc8e8 commit 3 + | + o 2:4d01eda50c6ac5f7e89cbe1880143a32f559c302 commit 2 + | + o 1:4432d83626e8a98655f062ec1f2a43b07f7fbbb0 commit 1 + | + o 0:3390ef850073fbc2f0dfff2244342c8e9229013a commit 0 + + $ hg --debug debugindex -m + rev linkrev nodeid p1 p2 + 0 0 992f4779029a3df8d0666d00bb924f69634e2641 0000000000000000000000000000000000000000 0000000000000000000000000000000000000000 + 1 1 a988fb43583e871d1ed5750ee074c6d840bbbfc8 992f4779029a3df8d0666d00bb924f69634e2641 0000000000000000000000000000000000000000 + 2 2 a8853dafacfca6fc807055a660d8b835141a3bb4 a988fb43583e871d1ed5750ee074c6d840bbbfc8 0000000000000000000000000000000000000000 + 3 3 3fe11dfbb13645782b0addafbe75a87c210ffddc a8853dafacfca6fc807055a660d8b835141a3bb4 0000000000000000000000000000000000000000 + + $ hg serve -p $HGPORT -d --pid-file hg.pid -E error.log + $ HGSERVEPID=`cat hg.pid` + + $ cat hg.pid > $DAEMON_PIDS + $ printf "\n" >> $DAEMON_PIDS + $ echo $MOTO_PID >> $DAEMON_PIDS + +Performing the same request twice should produce the same result, +with the first request caching the response in S3 and the second +result coming as an S3 redirect + + $ sendhttpv2peer << EOF + > command manifestdata + > nodes eval:[b'\x99\x2f\x47\x79\x02\x9a\x3d\xf8\xd0\x66\x6d\x00\xbb\x92\x4f\x69\x63\x4e\x26\x41'] + > tree eval:b'' + > fields eval:[b'parents'] + > EOF + creating http peer for wire protocol version 2 + sending manifestdata command + response: gen[ + { + b'totalitems': 1 + }, + { + b'node': b'\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&A', + b'parents': [ + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ] + } + ] + + $ sendhttpv2peer << EOF + > command manifestdata + > nodes eval:[b'\x99\x2f\x47\x79\x02\x9a\x3d\xf8\xd0\x66\x6d\x00\xbb\x92\x4f\x69\x63\x4e\x26\x41'] + > tree eval:b'' + > fields eval:[b'parents'] + > EOF + creating http peer for wire protocol version 2 + sending manifestdata command + response: gen[ + { + b'totalitems': 1 + }, + { + b'node': b'\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&A', + b'parents': [ + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ] + } + ] + +Sending different request doesn't yield cache hit. + + $ sendhttpv2peer << EOF + > command manifestdata + > nodes eval:[b'\x99\x2f\x47\x79\x02\x9a\x3d\xf8\xd0\x66\x6d\x00\xbb\x92\x4f\x69\x63\x4e\x26\x41', b'\xa9\x88\xfb\x43\x58\x3e\x87\x1d\x1e\xd5\x75\x0e\xe0\x74\xc6\xd8\x40\xbb\xbf\xc8'] + > tree eval:b'' + > fields eval:[b'parents'] + > EOF + creating http peer for wire protocol version 2 + sending manifestdata command + response: gen[ + { + b'totalitems': 2 + }, + { + b'node': b'\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&A', + b'parents': [ + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ] + }, + { + b'node': b'\xa9\x88\xfbCX>\x87\x1d\x1e\xd5u\x0e\xe0t\xc6\xd8@\xbb\xbf\xc8', + b'parents': [ + b'\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&A', + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ] + } + ] + +Setting minimumobjectsize will make small requests avoid caching + + $ cat >> .hg/hgrc << EOL + > minimumobjectsize = 50000 + > EOL + $ kill $HGSERVEPID + $ hg serve -p $HGPORT -d --pid-file hg.pid -E error.log + $ printf "\n" >> $DAEMON_PIDS + $ cat hg.pid >> $DAEMON_PIDS + + $ sendhttpv2peer << EOF + > command manifestdata + > nodes eval:[b'\xa9\x88\xfb\x43\x58\x3e\x87\x1d\x1e\xd5\x75\x0e\xe0\x74\xc6\xd8\x40\xbb\xbf\xc8'] + > tree eval:b'' + > fields eval:[b'parents'] + > EOF + creating http peer for wire protocol version 2 + sending manifestdata command + response: gen[ + { + b'totalitems': 1 + }, + { + b'node': b'\xa9\x88\xfbCX>\x87\x1d\x1e\xd5u\x0e\xe0t\xc6\xd8@\xbb\xbf\xc8', + b'parents': [ + b'\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&A', + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ] + } + ] + +Blackbox logs should indicate our desired results + + $ cat .hg/blackbox.log + *> s3 cacher constructed for manifestdata (glob) + *> cache miss for 47abb8efa5f01b8964d74917793ad2464db0fa2c (glob) + *> storing cache entry for 47abb8efa5f01b8964d74917793ad2464db0fa2c (glob) + *> s3 cacher constructed for manifestdata (glob) + *> cache hit for 47abb8efa5f01b8964d74917793ad2464db0fa2c (glob) + *> s3 cacher constructed for manifestdata (glob) + *> cache miss for 37326a83e9843f15161fce9d1e92d06b795d5e8e (glob) + *> storing cache entry for 37326a83e9843f15161fce9d1e92d06b795d5e8e (glob) + *> s3 cacher constructed for manifestdata (glob) + *> cache miss for a5291ad0e900bc65a180a494f63587b5705f282f (glob) + *> obj size (91) is below minimum of 50000; not caching (glob) + +Server error logs should be empty + + $ cat error.log + +S3 logs should show hits/misses/redirects. Use a regex to parse the +presigned URL, since the order of query string parameters is not +deterministic. + + $ cat ../mocks3.log + * Running on http://$LOCALIP:15467/ (Press CTRL+C to quit) + * "PUT /testbucket HTTP/1.1" 200 - (glob) + * "GET /testbucket?location HTTP/1.1" 200 - (glob) + * "HEAD /testbucket/47abb8efa5f01b8964d74917793ad2464db0fa2c HTTP/1.1" 404 - (glob) + * "PUT /testbucket/47abb8efa5f01b8964d74917793ad2464db0fa2c HTTP/1.1" 200 - (glob) + * "GET /testbucket?location HTTP/1.1" 200 - (glob) + * "HEAD /testbucket/47abb8efa5f01b8964d74917793ad2464db0fa2c HTTP/1.1" 200 - (glob) + \$LOCALIP - - \[\$LOGDATE\$\] "GET /testbucket/47abb8efa5f01b8964d74917793ad2464db0fa2c\?(Signature=.+&?|Expires=\d+&?|AWSAccessKeyId=dummyaccessid&?){3} HTTP/1.1" 200 - (re) + * "GET /testbucket?location HTTP/1.1" 200 - (glob) + * "HEAD /testbucket/37326a83e9843f15161fce9d1e92d06b795d5e8e HTTP/1.1" 404 - (glob) + * "PUT /testbucket/37326a83e9843f15161fce9d1e92d06b795d5e8e HTTP/1.1" 200 - (glob) + * "GET /testbucket?location HTTP/1.1" 200 - (glob) + * "HEAD /testbucket/a5291ad0e900bc65a180a494f63587b5705f282f HTTP/1.1" 404 - (glob) + + $ killdaemons.py + $ rm .hg/blackbox.log + $ rm ../mocks3.log