diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -365,75 +365,6 @@ if done: break -def createcommandresponseframesfrombytes(stream, requestid, data, - maxframesize=DEFAULT_MAX_FRAME_SIZE): - """Create a raw frame to send a bytes response from static bytes input. - - Returns a generator of bytearrays. - """ - # Automatically send the overall CBOR response map. - overall = b''.join(cborutil.streamencode({b'status': b'ok'})) - if len(overall) > maxframesize: - raise error.ProgrammingError('not yet implemented') - - # Simple case where we can fit the full response in a single frame. - if len(overall) + len(data) <= maxframesize: - flags = FLAG_COMMAND_RESPONSE_EOS - yield stream.makeframe(requestid=requestid, - typeid=FRAME_TYPE_COMMAND_RESPONSE, - flags=flags, - payload=overall + data) - return - - # It's easier to send the overall CBOR map in its own frame than to track - # offsets. - yield stream.makeframe(requestid=requestid, - typeid=FRAME_TYPE_COMMAND_RESPONSE, - flags=FLAG_COMMAND_RESPONSE_CONTINUATION, - payload=overall) - - offset = 0 - while True: - chunk = data[offset:offset + maxframesize] - offset += len(chunk) - done = offset == len(data) - - if done: - flags = FLAG_COMMAND_RESPONSE_EOS - else: - flags = FLAG_COMMAND_RESPONSE_CONTINUATION - - yield stream.makeframe(requestid=requestid, - typeid=FRAME_TYPE_COMMAND_RESPONSE, - flags=flags, - payload=chunk) - - if done: - break - -def createbytesresponseframesfromgen(stream, requestid, gen, - maxframesize=DEFAULT_MAX_FRAME_SIZE): - """Generator of frames from a generator of byte chunks. - - This assumes that another frame will follow whatever this emits. i.e. - this always emits the continuation flag and never emits the end-of-stream - flag. - """ - cb = util.chunkbuffer(gen) - flags = FLAG_COMMAND_RESPONSE_CONTINUATION - - while True: - chunk = cb.read(maxframesize) - if not chunk: - break - - yield stream.makeframe(requestid=requestid, - typeid=FRAME_TYPE_COMMAND_RESPONSE, - flags=flags, - payload=chunk) - - flags |= FLAG_COMMAND_RESPONSE_CONTINUATION - def createcommandresponseokframe(stream, requestid): overall = b''.join(cborutil.streamencode({b'status': b'ok'})) @@ -1020,30 +951,6 @@ return meth(frame) - def oncommandresponseready(self, stream, requestid, data): - """Signal that a bytes response is ready to be sent to the client. - - The raw bytes response is passed as an argument. - """ - ensureserverstream(stream) - - def sendframes(): - for frame in createcommandresponseframesfrombytes(stream, requestid, - data): - yield frame - - self._activecommands.remove(requestid) - - result = sendframes() - - if self._deferoutput: - self._bufferedframegens.append(result) - return 'noop', {} - else: - return 'sendframes', { - 'framegen': result, - } - def oncommandresponsereadyobjects(self, stream, requestid, objs): """Signal that objects are ready to be sent to the client. @@ -1053,6 +960,10 @@ """ ensureserverstream(stream) + # A more robust solution would be to check for objs.{next,__next__}. + if isinstance(objs, list): + objs = iter(objs) + # We need to take care over exception handling. Uncaught exceptions # when generating frames could lead to premature end of the frame # stream and the possibility of the server or client process getting diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py --- a/tests/test-wireproto-serverreactor.py +++ b/tests/test-wireproto-serverreactor.py @@ -225,19 +225,22 @@ results.append(self._sendsingleframe( reactor, ffs(b'1 1 stream-begin command-request new ' b"cbor:{b'name': b'command'}"))) - result = reactor.oncommandresponseready(outstream, 1, b'response1') + result = reactor.oncommandresponsereadyobjects( + outstream, 1, [b'response1']) self.assertaction(result, b'sendframes') list(result[1][b'framegen']) results.append(self._sendsingleframe( reactor, ffs(b'1 1 stream-begin command-request new ' b"cbor:{b'name': b'command'}"))) - result = reactor.oncommandresponseready(outstream, 1, b'response2') + result = reactor.oncommandresponsereadyobjects( + outstream, 1, [b'response2']) self.assertaction(result, b'sendframes') list(result[1][b'framegen']) results.append(self._sendsingleframe( reactor, ffs(b'1 1 stream-begin command-request new ' b"cbor:{b'name': b'command'}"))) - result = reactor.oncommandresponseready(outstream, 1, b'response3') + result = reactor.oncommandresponsereadyobjects( + outstream, 1, [b'response3']) self.assertaction(result, b'sendframes') list(result[1][b'framegen']) @@ -364,10 +367,13 @@ list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) outstream = reactor.makeoutputstream() - result = reactor.oncommandresponseready(outstream, 1, b'response') + result = reactor.oncommandresponsereadyobjects( + outstream, 1, [b'response']) self.assertaction(result, b'sendframes') self.assertframesequal(result[1][b'framegen'], [ - b'1 2 stream-begin command-response eos %sresponse' % OK, + b'1 2 stream-begin command-response continuation %s' % OK, + b'1 2 0 command-response continuation cbor:b"response"', + b'1 2 0 command-response eos ', ]) def testmultiframeresponse(self): @@ -380,12 +386,16 @@ list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) outstream = reactor.makeoutputstream() - result = reactor.oncommandresponseready(outstream, 1, first + second) + result = reactor.oncommandresponsereadyobjects( + outstream, 1, [first + second]) self.assertaction(result, b'sendframes') self.assertframesequal(result[1][b'framegen'], [ b'1 2 stream-begin command-response continuation %s' % OK, + b'1 2 0 command-response continuation Y\x80d', b'1 2 0 command-response continuation %s' % first, - b'1 2 0 command-response eos %s' % second, + b'1 2 0 command-response continuation %s' % second, + b'1 2 0 command-response continuation ', + b'1 2 0 command-response eos ' ]) def testservererror(self): @@ -412,12 +422,15 @@ self.assertaction(results[0], b'runcommand') outstream = reactor.makeoutputstream() - result = reactor.oncommandresponseready(outstream, 1, b'response') + result = reactor.oncommandresponsereadyobjects( + outstream, 1, [b'response']) self.assertaction(result, b'noop') result = reactor.oninputeof() self.assertaction(result, b'sendframes') self.assertframesequal(result[1][b'framegen'], [ - b'1 2 stream-begin command-response eos %sresponse' % OK, + b'1 2 stream-begin command-response continuation %s' % OK, + b'1 2 0 command-response continuation cbor:b"response"', + b'1 2 0 command-response eos ', ]) def testmultiplecommanddeferresponse(self): @@ -427,15 +440,21 @@ list(sendcommandframes(reactor, instream, 3, b'command2', {})) outstream = reactor.makeoutputstream() - result = reactor.oncommandresponseready(outstream, 1, b'response1') + result = reactor.oncommandresponsereadyobjects( + outstream, 1, [b'response1']) self.assertaction(result, b'noop') - result = reactor.oncommandresponseready(outstream, 3, b'response2') + result = reactor.oncommandresponsereadyobjects( + outstream, 3, [b'response2']) self.assertaction(result, b'noop') result = reactor.oninputeof() self.assertaction(result, b'sendframes') self.assertframesequal(result[1][b'framegen'], [ - b'1 2 stream-begin command-response eos %sresponse1' % OK, - b'3 2 0 command-response eos %sresponse2' % OK, + b'1 2 stream-begin command-response continuation %s' % OK, + b'1 2 0 command-response continuation cbor:b"response1"', + b'1 2 0 command-response eos ', + b'3 2 0 command-response continuation %s' % OK, + b'3 2 0 command-response continuation cbor:b"response2"', + b'3 2 0 command-response eos ', ]) def testrequestidtracking(self): @@ -447,16 +466,22 @@ # Register results for commands out of order. outstream = reactor.makeoutputstream() - reactor.oncommandresponseready(outstream, 3, b'response3') - reactor.oncommandresponseready(outstream, 1, b'response1') - reactor.oncommandresponseready(outstream, 5, b'response5') + reactor.oncommandresponsereadyobjects(outstream, 3, [b'response3']) + reactor.oncommandresponsereadyobjects(outstream, 1, [b'response1']) + reactor.oncommandresponsereadyobjects(outstream, 5, [b'response5']) result = reactor.oninputeof() self.assertaction(result, b'sendframes') self.assertframesequal(result[1][b'framegen'], [ - b'3 2 stream-begin command-response eos %sresponse3' % OK, - b'1 2 0 command-response eos %sresponse1' % OK, - b'5 2 0 command-response eos %sresponse5' % OK, + b'3 2 stream-begin command-response continuation %s' % OK, + b'3 2 0 command-response continuation cbor:b"response3"', + b'3 2 0 command-response eos ', + b'1 2 0 command-response continuation %s' % OK, + b'1 2 0 command-response continuation cbor:b"response1"', + b'1 2 0 command-response eos ', + b'5 2 0 command-response continuation %s' % OK, + b'5 2 0 command-response continuation cbor:b"response5"', + b'5 2 0 command-response eos ', ]) def testduplicaterequestonactivecommand(self): @@ -477,7 +502,7 @@ instream = framing.stream(1) list(sendcommandframes(reactor, instream, 1, b'command1', {})) outstream = reactor.makeoutputstream() - reactor.oncommandresponseready(outstream, 1, b'response') + reactor.oncommandresponsereadyobjects(outstream, 1, [b'response']) # We've registered the response but haven't sent it. From the # perspective of the reactor, the command is still active. @@ -494,7 +519,7 @@ instream = framing.stream(1) list(sendcommandframes(reactor, instream, 1, b'command1', {})) outstream = reactor.makeoutputstream() - res = reactor.oncommandresponseready(outstream, 1, b'response') + res = reactor.oncommandresponsereadyobjects(outstream, 1, [b'response']) list(res[1][b'framegen']) results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))