diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -386,6 +386,56 @@ if done: break +def createbytesresponseframesfromgen(stream, requestid, gen, + maxframesize=DEFAULT_MAX_FRAME_SIZE): + overall = cbor.dumps({b'status': b'ok'}, canonical=True) + + yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=FLAG_COMMAND_RESPONSE_CONTINUATION, + payload=overall) + + cb = util.chunkbuffer(gen) + + flags = 0 + + while True: + chunk = cb.read(maxframesize) + if not chunk: + break + + yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=flags, + payload=chunk) + + flags |= FLAG_COMMAND_RESPONSE_CONTINUATION + + flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION + flags |= FLAG_COMMAND_RESPONSE_EOS + yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=flags, + payload=b'') + +def createcommanderrorresponse(stream, requestid, message, args=None): + m = { + b'status': b'error', + b'error': { + b'message': message, + } + } + + if args: + m[b'error'][b'args'] = args + + overall = cbor.dumps(m, canonical=True) + + yield stream.makeframe(requestid=requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=FLAG_COMMAND_RESPONSE_EOS, + payload=overall) + def createerrorframe(stream, requestid, msg, errtype): # TODO properly handle frame size limits. assert len(msg) <= DEFAULT_MAX_FRAME_SIZE @@ -634,6 +684,19 @@ 'framegen': result, } + def oncommandresponsereadygen(self, stream, requestid, gen): + """Signal that a bytes response is ready, with data as a generator.""" + ensureserverstream(stream) + + def sendframes(): + for frame in createbytesresponseframesfromgen(stream, requestid, + gen): + yield frame + + self._activecommands.remove(requestid) + + return self._handlesendframes(sendframes()) + def oninputeof(self): """Signals that end of input has been received. @@ -655,13 +718,39 @@ 'framegen': makegen(), } + def _handlesendframes(self, framegen): + if self._deferoutput: + self._bufferedframegens.append(framegen) + return 'noop', {} + else: + return 'sendframes', { + 'framegen': framegen, + } + def onservererror(self, stream, requestid, msg): ensureserverstream(stream) - return 'sendframes', { - 'framegen': createerrorframe(stream, requestid, msg, - errtype='server'), - } + def sendframes(): + for frame in createerrorframe(stream, requestid, msg, + errtype='server'): + yield frame + + self._activecommands.remove(requestid) + + return self._handlesendframes(sendframes()) + + def oncommanderror(self, stream, requestid, message, args=None): + """Called when a command encountered an error before sending output.""" + ensureserverstream(stream) + + def sendframes(): + for frame in createcommanderrorresponse(stream, requestid, message, + args): + yield frame + + self._activecommands.remove(requestid) + + return self._handlesendframes(sendframes()) def makeoutputstream(self): """Create a stream to be used for sending data to the client.""" diff --git a/mercurial/wireprototypes.py b/mercurial/wireprototypes.py --- a/mercurial/wireprototypes.py +++ b/mercurial/wireprototypes.py @@ -106,6 +106,22 @@ def __init__(self, v): self.value = v +class v2errorresponse(object): + """Represents a command error for version 2 transports.""" + def __init__(self, message, args=None): + self.message = message + self.args = args + +class v2streamingresponse(object): + """A response whose data is supplied by a generator. + + The generator can either consist of data structures to CBOR + encode or a stream of already-encoded bytes. + """ + def __init__(self, gen, compressible=True): + self.gen = gen + self.compressible = compressible + # list of nodes encoding / decoding def decodelist(l, sep=' '): if l: diff --git a/mercurial/wireprotov2server.py b/mercurial/wireprotov2server.py --- a/mercurial/wireprotov2server.py +++ b/mercurial/wireprotov2server.py @@ -306,6 +306,15 @@ action, meta = reactor.oncommandresponseready(outstream, command['requestid'], encoded) + elif isinstance(rsp, wireprototypes.v2streamingresponse): + action, meta = reactor.oncommandresponsereadygen(outstream, + command['requestid'], + rsp.gen) + elif isinstance(rsp, wireprototypes.v2errorresponse): + action, meta = reactor.oncommanderror(outstream, + command['requestid'], + rsp.message, + rsp.args) else: action, meta = reactor.onservererror( _('unhandled response type from wire proto command'))