diff --git a/mercurial/debugcommands.py b/mercurial/debugcommands.py --- a/mercurial/debugcommands.py +++ b/mercurial/debugcommands.py @@ -3240,7 +3240,7 @@ res = e.callcommand(command, args).result() if isinstance(res, wireprotov2peer.commandresponse): - val = list(res.cborobjects()) + val = res.objects() ui.status(_('response: %s\n') % stringutil.pprint(val, bprefix=True, indent=2)) else: diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py --- a/mercurial/wireprotov2peer.py +++ b/mercurial/wireprotov2peer.py @@ -7,11 +7,12 @@ from __future__ import absolute_import +import threading + from .i18n import _ from . import ( encoding, error, - util, wireprotoframing, ) from .utils import ( @@ -34,20 +35,101 @@ return b''.join(chunks) class commandresponse(object): - """Represents the response to a command request.""" + """Represents the response to a command request. + + Instances track the state of the command and hold its results. + + An external entity is required to update the state of the object when + events occur. + """ def __init__(self, requestid, command): self.requestid = requestid self.command = command - self.b = util.bytesio() + # Whether all remote input related to this command has been + # received. + self._inputcomplete = False + + # We have a lock that is acquired when important object state is + # mutated. This is to prevent race conditions between 1 thread + # sending us new data and another consuming it. + self._lock = threading.RLock() + + # An event is set when state of the object changes. This event + # is waited on by the generator emitting objects. + self._serviceable = threading.Event() + + self._pendingevents = [] + self._decoder = cborutil.bufferingdecoder() + self._seeninitial = False + + def _oninputcomplete(self): + with self._lock: + self._inputcomplete = True + self._serviceable.set() + + def _onresponsedata(self, data): + available, readcount, wanted = self._decoder.decode(data) + + if not available: + return + + with self._lock: + for o in self._decoder.getavailable(): + if not self._seeninitial: + self._handleinitial(o) + continue + + self._pendingevents.append(o) + + self._serviceable.set() - def cborobjects(self): - """Obtain decoded CBOR objects from this response.""" - self.b.seek(0) + def _handleinitial(self, o): + self._seeninitial = True + if o[b'status'] == 'ok': + return + + atoms = [{'msg': o[b'error'][b'message']}] + if b'args' in o[b'error']: + atoms[0]['args'] = o[b'error'][b'args'] + + raise error.RepoError(formatrichmessage(atoms)) + + def objects(self): + """Obtained decoded objects from this response. + + This is a generator of data structures that were decoded from the + command response. + + Obtaining the next member of the generator may block due to waiting + on external data to become available. - for v in cborutil.decodeall(self.b.getvalue()): - yield v + If the server encountered an error in the middle of serving the data + or if another error occurred, an exception may be raised when + advancing the generator. + """ + while True: + # TODO this can infinite loop if self._inputcomplete is never + # set. We likely want to tie the lifetime of this object/state + # to that of the background thread receiving frames and updating + # our state. + self._serviceable.wait(1.0) + + with self._lock: + self._serviceable.clear() + + # Make copies because objects could be mutated during + # iteration. + stop = self._inputcomplete + pending = list(self._pendingevents) + self._pendingevents[:] = [] + + for o in pending: + yield o + + if stop: + break class clienthandler(object): """Object to handle higher-level client activities. @@ -80,6 +162,8 @@ rid = request.requestid self._requests[rid] = request self._futures[rid] = f + # TODO we need some kind of lifetime on response instances otherwise + # objects() may deadlock. self._responses[rid] = commandresponse(rid, command) return iter(()) @@ -119,8 +203,12 @@ if action == 'error': e = error.RepoError(meta['message']) + if frame.requestid in self._responses: + self._responses[frame.requestid]._oninputcomplete() + if frame.requestid in self._futures: self._futures[frame.requestid].set_exception(e) + del self._futures[frame.requestid] else: raise e @@ -141,39 +229,32 @@ self._processresponsedata(frame, meta, response) except BaseException as e: self._futures[frame.requestid].set_exception(e) + del self._futures[frame.requestid] + response._oninputcomplete() else: raise error.ProgrammingError( 'unhandled action from clientreactor: %s' % action) def _processresponsedata(self, frame, meta, response): - # This buffers all data until end of stream is received. This - # is bad for performance. - # TODO make response data streamable - response.b.write(meta['data']) + # This can raise. The caller can handle it. + response._onresponsedata(meta['data']) if meta['eos']: - # If the command has a decoder, resolve the future to the - # decoded value. Otherwise resolve to the rich response object. - decoder = COMMAND_DECODERS.get(response.command) - - # TODO consider always resolving the overall status map. - if decoder: - objs = response.cborobjects() - - overall = next(objs) + response._oninputcomplete() + del self._requests[frame.requestid] - if overall['status'] == 'ok': - self._futures[frame.requestid].set_result(decoder(objs)) - else: - atoms = [{'msg': overall['error']['message']}] - if 'args' in overall['error']: - atoms[0]['args'] = overall['error']['args'] - e = error.RepoError(formatrichmessage(atoms)) - self._futures[frame.requestid].set_exception(e) - else: - self._futures[frame.requestid].set_result(response) + # If the command has a decoder, we wait until all input has been + # received before resolving the future. Otherwise we resolve the + # future immediately. + if frame.requestid not in self._futures: + return - del self._requests[frame.requestid] + if response.command not in COMMAND_DECODERS: + self._futures[frame.requestid].set_result(response.objects()) + del self._futures[frame.requestid] + elif response._inputcomplete: + decoded = COMMAND_DECODERS[response.command](response.objects()) + self._futures[frame.requestid].set_result(decoded) del self._futures[frame.requestid] def decodebranchmap(objs): diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t --- a/tests/test-http-api-httpv2.t +++ b/tests/test-http-api-httpv2.t @@ -225,10 +225,7 @@ s> 0\r\n s> \r\n received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) - response: [ - { - b'status': b'ok' - }, + response: gen[ b'customreadonly bytes response' ] diff --git a/tests/test-wireproto-command-capabilities.t b/tests/test-wireproto-command-capabilities.t --- a/tests/test-wireproto-command-capabilities.t +++ b/tests/test-wireproto-command-capabilities.t @@ -349,10 +349,7 @@ s> 0\r\n s> \r\n received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) - response: [ - { - b'status': b'ok' - }, + response: gen[ { b'commands': { b'branchmap': {