diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py --- a/mercurial/httppeer.py +++ b/mercurial/httppeer.py @@ -513,7 +513,9 @@ reactor = wireprotoframing.clientreactor(hasmultiplesend=False, buffersends=True) - handler = wireprotov2peer.clienthandler(ui, reactor) + handler = wireprotov2peer.clienthandler(ui, reactor, + opener=opener, + requestbuilder=requestbuilder) url = '%s/%s' % (apiurl, permission) diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py --- a/mercurial/wireprotov2peer.py +++ b/mercurial/wireprotov2peer.py @@ -13,8 +13,12 @@ from . import ( encoding, error, + pycompat, sslutil, + url as urlmod, + util, wireprotoframing, + wireprototypes, ) from .utils import ( cborutil, @@ -112,9 +116,10 @@ events occur. """ - def __init__(self, requestid, command): + def __init__(self, requestid, command, fromredirect=False): self.requestid = requestid self.command = command + self.fromredirect = fromredirect # Whether all remote input related to this command has been # received. @@ -132,6 +137,7 @@ self._pendingevents = [] self._decoder = cborutil.bufferingdecoder() self._seeninitial = False + self._redirect = None def _oninputcomplete(self): with self._lock: @@ -146,10 +152,19 @@ with self._lock: for o in self._decoder.getavailable(): - if not self._seeninitial: + if not self._seeninitial and not self.fromredirect: self._handleinitial(o) continue + # We should never see an object after a content redirect, + # as the spec says the main status object containing the + # content redirect is the only object in the stream. Fail + # if we see a misbehaving server. + if self._redirect: + raise error.Abort(_('received unexpected response data ' + 'after content redirect; the remote is ' + 'buggy')) + self._pendingevents.append(o) self._serviceable.set() @@ -160,7 +175,16 @@ return elif o[b'status'] == b'redirect': - raise error.Abort(_('redirect responses not yet supported')) + l = o[b'location'] + self._redirect = wireprototypes.alternatelocationresponse( + url=l[b'url'], + mediatype=l[b'mediatype'], + size=l.get(b'size'), + fullhashes=l.get(b'fullhashes'), + fullhashseed=l.get(b'fullhashseed'), + serverdercerts=l.get(b'serverdercerts'), + servercadercerts=l.get(b'servercadercerts')) + return atoms = [{'msg': o[b'error'][b'message']}] if b'args' in o[b'error']: @@ -214,13 +238,17 @@ with the higher-level peer API. """ - def __init__(self, ui, clientreactor): + def __init__(self, ui, clientreactor, opener=None, + requestbuilder=util.urlreq.request): self._ui = ui self._reactor = clientreactor self._requests = {} self._futures = {} self._responses = {} + self._redirects = [] self._frameseof = False + self._opener = opener or urlmod.opener(ui) + self._requestbuilder = requestbuilder def callcommand(self, command, args, f, redirect=None): """Register a request to call a command. @@ -269,7 +297,12 @@ self._ui.note(_('received %r\n') % frame) self._processframe(frame) - if self._frameseof: + # Also try to read the first redirect. + if self._redirects: + if not self._processredirect(*self._redirects[0]): + self._redirects.pop(0) + + if self._frameseof and not self._redirects: return None return True @@ -318,10 +351,27 @@ # This can raise. The caller can handle it. response._onresponsedata(meta['data']) + # If we got a content redirect response, we want to fetch it and + # expose the data as if we received it inline. But we also want to + # keep our internal request accounting in order. Our strategy is to + # basically put meaningful response handling on pause until EOS occurs + # and the stream accounting is in a good state. At that point, we follow + # the redirect and replace the response object with its data. + + redirect = response._redirect + handlefuture = False if redirect else True + if meta['eos']: response._oninputcomplete() del self._requests[frame.requestid] + if redirect: + self._followredirect(frame.requestid, redirect) + return + + if not handlefuture: + return + # If the command has a decoder, we wait until all input has been # received before resolving the future. Otherwise we resolve the # future immediately. @@ -336,6 +386,82 @@ self._futures[frame.requestid].set_result(decoded) del self._futures[frame.requestid] + def _followredirect(self, requestid, redirect): + """Called to initiate redirect following for a request.""" + self._ui.note(_('(following redirect to %s)\n') % redirect.url) + + # TODO handle framed responses. + if redirect.mediatype != b'application/mercurial-cbor': + raise error.Abort(_('cannot handle redirects for the %s media type') + % redirect.mediatype) + + if redirect.fullhashes: + self._ui.warn(_('(support for validating hashes on content ' + 'redirects not supported)\n')) + + if redirect.serverdercerts or redirect.servercadercerts: + self._ui.warn(_('(support for pinning server certificates on ' + 'content redirects not supported)\n')) + + headers = { + r'Accept': redirect.mediatype, + } + + req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers) + + try: + res = self._opener.open(req) + except util.urlerr.httperror as e: + if e.code == 401: + raise error.Abort(_('authorization failed')) + raise + except util.httplib.HTTPException as e: + self._ui.debug('http error requesting %s\n' % req.get_full_url()) + self._ui.traceback() + raise IOError(None, e) + + urlmod.wrapresponse(res) + + # The existing response object is associated with frame data. Rather + # than try to normalize its state, just create a new object. + oldresponse = self._responses[requestid] + self._responses[requestid] = commandresponse(requestid, + oldresponse.command, + fromredirect=True) + + self._redirects.append((requestid, res)) + + def _processredirect(self, rid, res): + """Called to continue processing a response from a redirect.""" + response = self._responses[rid] + + try: + data = res.read(32768) + response._onresponsedata(data) + + # We're at end of stream. + if not data: + response._oninputcomplete() + + if rid not in self._futures: + return + + if response.command not in COMMAND_DECODERS: + self._futures[rid].set_result(response.objects()) + del self._futures[rid] + elif response._inputcomplete: + decoded = COMMAND_DECODERS[response.command](response.objects()) + self._futures[rid].set_result(decoded) + del self._futures[rid] + + return bool(data) + + except BaseException as e: + self._futures[rid].set_exception(e) + del self._futures[rid] + response._oninputcomplete() + return False + def decodebranchmap(objs): # Response should be a single CBOR map of branch name to array of nodes. bm = next(objs) diff --git a/tests/test-wireproto-content-redirects.t b/tests/test-wireproto-content-redirects.t --- a/tests/test-wireproto-content-redirects.t +++ b/tests/test-wireproto-content-redirects.t @@ -1354,8 +1354,33 @@ s> 0\r\n s> \r\n received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) - abort: redirect responses not yet supported - [255] + (following redirect to http://*:$HGPORT/api/simplecache/c045a581599d58608efd3d93d8129841f2af04a0) (glob) + s> GET /api/simplecache/c045a581599d58608efd3d93d8129841f2af04a0 HTTP/1.1\r\n + s> Accept-Encoding: identity\r\n + s> accept: application/mercurial-cbor\r\n + s> host: *:$HGPORT\r\n (glob) + s> user-agent: Mercurial debugwireproto\r\n + s> \r\n + s> makefile('rb', None) + s> HTTP/1.1 200 OK\r\n + s> Server: testing stub value\r\n + s> Date: $HTTP_DATE$\r\n + s> Content-Type: application/mercurial-cbor\r\n + s> Content-Length: 91\r\n + s> \r\n + s> \xa1Jtotalitems\x01\xa2DnodeT\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&AGparents\x82T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00 + response: gen[ + { + b'totalitems': 1 + }, + { + b'node': b'\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&A', + b'parents': [ + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ] + } + ] $ cat error.log $ killdaemons.py