diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py --- a/mercurial/httppeer.py +++ b/mercurial/httppeer.py @@ -13,7 +13,9 @@ import os import socket import struct +import sys import tempfile +import weakref from .i18n import _ from .thirdparty import ( @@ -31,7 +33,6 @@ statichttprepo, url as urlmod, util, - wireproto, wireprotoframing, wireprototypes, wireprotov1peer, @@ -517,8 +518,262 @@ def _abort(self, exception): raise exception +def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests): + reactor = wireprotoframing.clientreactor(hasmultiplesend=False, + buffersends=True) + + url = '%s/%s' % (apiurl, permission) + + if len(requests) > 1: + url += '/multirequest' + else: + url += '/%s' % requests[0][0] + + # Request ID to (request, future) + requestmap = {} + + for command, args, f in requests: + request, action, meta = reactor.callcommand(command, args) + assert action == 'noop' + + requestmap[request.requestid] = (request, f) + + action, meta = reactor.flushcommands() + assert action == 'sendframes' + + # TODO stream this. + body = b''.join(map(bytes, meta['framegen'])) + + # TODO modify user-agent to reflect v2 + headers = { + r'Accept': wireprotov2server.FRAMINGTYPE, + r'Content-Type': wireprotov2server.FRAMINGTYPE, + } + + req = requestbuilder(pycompat.strurl(url), body, headers) + req.add_unredirected_header(r'Content-Length', r'%d' % len(body)) + + try: + res = opener.open(req) + except urlerr.httperror as e: + if e.code == 401: + raise error.Abort(_('authorization failed')) + + raise + except httplib.HTTPException as e: + ui.traceback() + raise IOError(None, e) + + return reactor, requestmap, res + +class queuedcommandfuture(pycompat.futures.Future): + """Wraps result() on command futures to trigger submission on call.""" + + def result(self, timeout=None): + if self.done(): + return pycompat.futures.Future.result(self, timeout) + + self._peerexecutor.sendcommands() + + # sendcommands() will restore the original __class__ and self.result + # will resolve to Future.result. + return self.result(timeout) + +@zi.implementer(repository.ipeercommandexecutor) +class httpv2executor(object): + def __init__(self, ui, opener, requestbuilder, apiurl, descriptor): + self._ui = ui + self._opener = opener + self._requestbuilder = requestbuilder + self._apiurl = apiurl + self._descriptor = descriptor + self._sent = False + self._closed = False + self._neededpermissions = set() + self._calls = [] + self._futures = weakref.WeakSet() + self._responseexecutor = None + self._responsef = None + + def __enter__(self): + return self + + def __exit__(self, exctype, excvalue, exctb): + self.close() + + def callcommand(self, command, args): + if self._sent: + raise error.ProgrammingError('callcommand() cannot be used after ' + 'commands are sent') + + if self._closed: + raise error.ProgrammingError('callcommand() cannot be used after ' + 'close()') + + # The service advertises which commands are available. So if we attempt + # to call an unknown command or pass an unknown argument, we can screen + # for this. + if command not in self._descriptor['commands']: + raise error.ProgrammingError( + 'wire protocol command %s is not available' % command) + + cmdinfo = self._descriptor['commands'][command] + unknownargs = set(args.keys()) - set(cmdinfo.get('args', {})) + + if unknownargs: + raise error.ProgrammingError( + 'wire protocol command %s does not accept argument: %s' % ( + command, ', '.join(sorted(unknownargs)))) + + self._neededpermissions |= set(cmdinfo['permissions']) + + # TODO we /could/ also validate types here, since the API descriptor + # includes types... + + f = pycompat.futures.Future() + + # Monkeypatch it so result() triggers sendcommands(), otherwise result() + # could deadlock. + f.__class__ = queuedcommandfuture + f._peerexecutor = self + + self._futures.add(f) + self._calls.append((command, args, f)) + + return f + + def sendcommands(self): + if self._sent: + return + + if not self._calls: + return + + self._sent = True + + # Unhack any future types so caller sees a clean type and so we + # break reference cycle. + for f in self._futures: + if isinstance(f, queuedcommandfuture): + f.__class__ = pycompat.futures.Future + f._peerexecutor = None + + # Mark the future as running and filter out cancelled futures. + calls = [(command, args, f) + for command, args, f in self._calls + if f.set_running_or_notify_cancel()] + + # Clear out references, prevent improper object usage. + self._calls = None + + if not calls: + return + + permissions = set(self._neededpermissions) + + if 'push' in permissions and 'pull' in permissions: + permissions.remove('pull') + + if len(permissions) > 1: + raise error.RepoError(_('cannot make request requiring multiple ' + 'permissions: %s') % + _(', ').join(sorted(permissions))) + + permission = { + 'push': 'rw', + 'pull': 'ro', + }[permissions.pop()] + + reactor, requests, resp = sendv2request( + self._ui, self._opener, self._requestbuilder, self._apiurl, + permission, calls) + + # TODO we probably want to validate the HTTP code, media type, etc. + + self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) + self._responsef = self._responseexecutor.submit(self._handleresponse, + reactor, + requests, + resp) + + def close(self): + if self._closed: + return + + self.sendcommands() + + self._closed = True + + if not self._responsef: + return + + try: + self._responsef.result() + finally: + self._responseexecutor.shutdown(wait=True) + self._responsef = None + self._responseexecutor = None + + # If any of our futures are still in progress, mark them as + # errored, otherwise a result() could wait indefinitely. + for f in self._futures: + if not f.done(): + f.set_exception(error.ResponseError( + _('unfulfilled command response'))) + + self._futures = None + + def _handleresponse(self, reactor, requests, resp): + # Called in a thread to read the response. + + results = {k: [] for k in requests} + + while True: + frame = wireprotoframing.readframe(resp) + if frame is None: + break + + self._ui.note(_('received %r\n') % frame) + + # Guard against receiving a frame with a request ID that we + # didn't issue. This should never happen. + request, f = requests.get(frame.requestid, [None, None]) + + action, meta = reactor.onframerecv(frame) + + if action == 'responsedata': + assert request.requestid == meta['request'].requestid + + result = results[request.requestid] + + if meta['cbor']: + payload = util.bytesio(meta['data']) + + decoder = cbor.CBORDecoder(payload) + while payload.tell() + 1 < len(meta['data']): + try: + result.append(decoder.decode()) + except Exception: + f.set_exception_info(*sys.exc_info()[1:]) + continue + else: + result.append(meta['data']) + + if meta['eos']: + f.set_result(result) + del results[request.requestid] + + else: + e = error.ProgrammingError('unhandled action: %s' % action) + + if f: + f.set_exception(e) + else: + raise e + # TODO implement interface for version 2 peers -@zi.implementer(repository.ipeerconnection, repository.ipeercapabilities) +@zi.implementer(repository.ipeerconnection, repository.ipeercapabilities, + repository.ipeerrequests) class httpv2peer(object): def __init__(self, ui, repourl, apipath, opener, requestbuilder, apidescriptor): @@ -529,6 +784,7 @@ self._url = repourl self._apipath = apipath + self._apiurl = '%s/%s' % (repourl, apipath) self._opener = opener self._requestbuilder = requestbuilder self._descriptor = apidescriptor @@ -580,85 +836,13 @@ # End of ipeercapabilities. - # TODO require to be part of a batched primitive, use futures. def _call(self, name, **args): - """Call a wire protocol command with arguments.""" - - # Having this early has a side-effect of importing wireprotov2server, - # which has the side-effect of ensuring commands are registered. - - # TODO modify user-agent to reflect v2. - headers = { - r'Accept': wireprotov2server.FRAMINGTYPE, - r'Content-Type': wireprotov2server.FRAMINGTYPE, - } - - # TODO permissions should come from capabilities results. - permission = wireproto.commandsv2[name].permission - if permission not in ('push', 'pull'): - raise error.ProgrammingError('unknown permission type: %s' % - permission) - - permission = { - 'push': 'rw', - 'pull': 'ro', - }[permission] - - url = '%s/%s/%s/%s' % (self._url, self._apipath, permission, name) - - # TODO this should be part of a generic peer for the frame-based - # protocol. - reactor = wireprotoframing.clientreactor(hasmultiplesend=False, - buffersends=True) - - request, action, meta = reactor.callcommand(name, args) - assert action == 'noop' - - action, meta = reactor.flushcommands() - assert action == 'sendframes' + with self.commandexecutor() as e: + return e.callcommand(name, args).result() - body = b''.join(map(bytes, meta['framegen'])) - req = self._requestbuilder(pycompat.strurl(url), body, headers) - req.add_unredirected_header(r'Content-Length', r'%d' % len(body)) - - # TODO unify this code with httppeer. - try: - res = self._opener.open(req) - except urlerr.httperror as e: - if e.code == 401: - raise error.Abort(_('authorization failed')) - - raise - except httplib.HTTPException as e: - self.ui.traceback() - raise IOError(None, e) - - # TODO validate response type, wrap response to handle I/O errors. - # TODO more robust frame receiver. - results = [] - - while True: - frame = wireprotoframing.readframe(res) - if frame is None: - break - - self.ui.note(_('received %r\n') % frame) - - action, meta = reactor.onframerecv(frame) - - if action == 'responsedata': - if meta['cbor']: - payload = util.bytesio(meta['data']) - - decoder = cbor.CBORDecoder(payload) - while payload.tell() + 1 < len(meta['data']): - results.append(decoder.decode()) - else: - results.append(meta['data']) - else: - error.ProgrammingError('unhandled action: %s' % action) - - return results + def commandexecutor(self): + return httpv2executor(self.ui, self._opener, self._requestbuilder, + self._apiurl, self._descriptor) # Registry of API service names to metadata about peers that handle it. #