diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -308,10 +308,24 @@ wantframe Indicates that nothing of interest happened and the server is waiting on more frames from the client before anything interesting can be done. + + noop + Indicates no additional action is required. """ - def __init__(self): + def __init__(self, deferoutput=False): + """Construct a new server reactor. + + ``deferoutput`` can be used to indicate that no output frames should be + instructed to be sent until input has been exhausted. In this mode, + events that would normally generate output frames (such as a command + response being ready) will instead defer instructing the consumer to + send those frames. This is useful for half-duplex transports where the + sender cannot receive until all data has been transmitted. + """ + self._deferoutput = deferoutput self._state = 'idle' + self._bufferedframegens = [] self._activecommand = None self._activeargs = None self._activedata = None @@ -344,8 +358,33 @@ The raw bytes response is passed as an argument. """ + framegen = createbytesresponseframesfrombytes(data) + + if self._deferoutput: + self._bufferedframegens.append(framegen) + return 'noop', {} + else: + return 'sendframes', { + 'framegen': framegen, + } + + def oninputeof(self): + """Signals that end of input has been received. + + No more frames will be received. All pending activity should be + completed. + """ + if not self._deferoutput or not self._bufferedframegens: + return 'noop', {} + + # If we buffered all our responses, emit those. + def makegen(): + for gen in self._bufferedframegens: + for frame in gen: + yield frame + return 'sendframes', { - 'framegen': createbytesresponseframesfrombytes(data), + 'framegen': makegen(), } def onapplicationerror(self, msg): diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py --- a/mercurial/wireprotoserver.py +++ b/mercurial/wireprotoserver.py @@ -401,6 +401,10 @@ states.append(json.dumps((action, meta), sort_keys=True, separators=(', ', ': '))) + action, meta = reactor.oninputeof() + meta['action'] = action + states.append(json.dumps(meta, sort_keys=True, separators=(', ',': '))) + res.status = b'200 OK' res.headers[b'Content-Type'] = b'text/plain' res.setbodybytes(b'\n'.join(states)) @@ -411,7 +415,10 @@ Called when the HTTP request contains unified frame-based protocol frames for evaluation. """ - reactor = wireprotoframing.serverreactor() + # TODO Some HTTP clients are full duplex and can receive data before + # the entire request is transmitted. Figure out a way to indicate support + # for that so we can opt into full duplex mode. + reactor = wireprotoframing.serverreactor(deferoutput=True) seencommand = False while True: @@ -448,6 +455,19 @@ raise error.ProgrammingError( 'unhandled action from frame processor: %s' % action) + action, meta = reactor.oninputeof() + if action == 'sendframes': + # We assume we haven't started sending the response yet. If we're + # wrong, the response type will raise an exception. + res.status = b'200 OK' + res.headers[b'Content-Type'] = FRAMINGTYPE + res.setbodygen(meta['framegen']) + elif action == 'noop': + pass + else: + raise error.ProgrammingError('unhandled action from frame processor: %s' + % action) + def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor, command): """Dispatch a wire protocol command made from HTTPv2 requests. @@ -504,6 +524,8 @@ if action == 'sendframes': res.setbodygen(meta['framegen']) + elif action == 'noop': + pass else: raise error.ProgrammingError('unhandled event from reactor: %s' % action) diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t --- a/tests/test-http-api-httpv2.t +++ b/tests/test-http-api-httpv2.t @@ -401,7 +401,7 @@ s> Server: testing stub value\r\n s> Date: $HTTP_DATE$\r\n s> Content-Type: text/plain\r\n - s> Content-Length: 291\r\n + s> Content-Length: 310\r\n s> \r\n s> received: 1 2 command1\n s> ["wantframe", {"state": "command-receiving-args"}]\n @@ -409,6 +409,7 @@ s> ["wantframe", {"state": "command-receiving-args"}]\n s> received: 2 2 \x04\x00\x03\x00bar1val\n s> ["runcommand", {"args": {"bar1": "val", "foo": "val1"}, "command": "command1", "data": null}]\n - s> received: + s> received: \n + s> {"action": "noop"} $ cat error.log diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py --- a/tests/test-wireproto-serverreactor.py +++ b/tests/test-wireproto-serverreactor.py @@ -9,8 +9,8 @@ ffs = framing.makeframefromhumanstring -def makereactor(): - return framing.serverreactor() +def makereactor(deferoutput=False): + return framing.serverreactor(deferoutput=deferoutput) def sendframes(reactor, gen): """Send a generator of frame bytearray to a reactor. @@ -95,6 +95,9 @@ 'data': None, }) + result = reactor.oninputeof() + self.assertaction(result, 'noop') + def test1argument(self): reactor = makereactor() results = list(sendcommandframes(reactor, b'mycommand', @@ -310,6 +313,37 @@ b'error-response application some message', ]) + def test1commanddeferresponse(self): + """Responses when in deferred output mode are delayed until EOF.""" + reactor = makereactor(deferoutput=True) + results = list(sendcommandframes(reactor, b'mycommand', {})) + self.assertEqual(len(results), 1) + self.assertaction(results[0], 'runcommand') + + result = reactor.onbytesresponseready(b'response') + self.assertaction(result, 'noop') + result = reactor.oninputeof() + self.assertaction(result, 'sendframes') + self.assertframesequal(result[1]['framegen'], [ + b'bytes-response eos response', + ]) + + def testmultiplecommanddeferresponse(self): + reactor = makereactor(deferoutput=True) + list(sendcommandframes(reactor, b'command1', {})) + list(sendcommandframes(reactor, b'command2', {})) + + result = reactor.onbytesresponseready(b'response1') + self.assertaction(result, 'noop') + result = reactor.onbytesresponseready(b'response2') + self.assertaction(result, 'noop') + result = reactor.oninputeof() + self.assertaction(result, 'sendframes') + self.assertframesequal(result[1]['framegen'], [ + b'bytes-response eos response1', + b'bytes-response eos response2' + ]) + if __name__ == '__main__': import silenttestrunner silenttestrunner.main(__name__)