diff --git a/mercurial/setdiscovery.py b/mercurial/setdiscovery.py --- a/mercurial/setdiscovery.py +++ b/mercurial/setdiscovery.py @@ -155,11 +155,14 @@ sample = _limitsample(ownheads, initialsamplesize) # indices between sample and externalized version must match sample = list(sample) - batch = remote.iterbatch() - batch.heads() - batch.known(dag.externalizeall(sample)) - batch.submit() - srvheadhashes, yesno = batch.results() + + with remote.commandexecutor() as e: + fheads = e.callcommand('heads', {}) + fknown = e.callcommand('known', { + 'nodes': dag.externalizeall(sample), + }) + + srvheadhashes, yesno = fheads.result(), fknown.result() if cl.tip() == nullid: if srvheadhashes != [nullid]: diff --git a/mercurial/wireprotov1peer.py b/mercurial/wireprotov1peer.py --- a/mercurial/wireprotov1peer.py +++ b/mercurial/wireprotov1peer.py @@ -10,6 +10,7 @@ import contextlib import hashlib import sys +import weakref from .i18n import _ from .node import ( @@ -188,6 +189,9 @@ self._sent = False self._closed = False self._calls = [] + self._futures = weakref.WeakSet() + self._responseexecutor = None + self._responsef = None def callcommand(self, command, args): if self._sent: @@ -217,6 +221,7 @@ 'non-batchable command') f = pycompat.futures.Future() + self._futures.add(f) self._calls.append((command, args, fn, f)) @@ -231,6 +236,7 @@ self._sent = True + # A single command is easy. We call it synchronously. if len(self._calls) == 1: command, args, fn, f = self._calls[0] @@ -247,14 +253,99 @@ return - raise error.ProgrammingError('support for multiple commands not ' - 'yet implemented') + # Batch commands are a bit harder. First, we have to deal with the + # @batchable coroutine. That's a bit annoying. Furthermore, we also + # need to preserve streaming. i.e. it should be possible for the + # futures to resolve as data is coming in off the wire without having + # to wait for the final byte of the final response. We do this by + # spinning up a thread to read the responses. + + requests = [] + states = [] + + for command, args, fn, f in self._calls: + # Future was cancelled. Ignore it. + if not f.set_running_or_notify_cancel(): + continue + + try: + batchable = fn.batchable(fn.__self__, + **pycompat.strkwargs(args)) + except Exception: + f.set_exception_info(*sys.exc_info()[1:]) + return + + # Encoded arguments and future holding remote result. + try: + encodedargs, fremote = next(batchable) + except Exception: + f.set_exception_info(*sys.exc_info()[1:]) + return + + requests.append((command, encodedargs)) + states.append((command, f, batchable, fremote)) + + if not requests: + return + + # This will emit responses in order they were executed. + wireresults = self._peer._submitbatch(requests) + + # The use of a thread pool executor here is a bit weird for something + # that only spins up a single thread. However, thread management is + # hard and it is easy to encounter race conditions, deadlocks, etc. + # concurrent.futures already solves these problems and its thread pool + # executor has minimal overhead. So we use it. + self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) + self._responsef = self._responseexecutor.submit(self._readbatchresponse, + states, wireresults) def close(self): self.sendcommands() + if self._closed: + return + self._closed = True + if not self._responsef: + return + + # We need to wait on our in-flight response and then shut down the + # executor once we have a result. + try: + self._responsef.result() + finally: + self._responseexecutor.shutdown(wait=True) + self._responsef = None + self._responseexecutor = None + + # If any of our futures are still in progress, mark them as + # errored. Otherwise a result() could wait indefinitely. + for f in self._futures: + if not f.done(): + f.set_exception(error.ResponseError( + _('unfulfilled batch command response'))) + + self._futures = None + + def _readbatchresponse(self, states, wireresults): + # Executes in a thread to read data off the wire. + + for command, f, batchable, fremote in states: + # Grab raw result off the wire and teach the internal future + # about it. + remoteresult = next(wireresults) + fremote.set(remoteresult) + + # And ask the coroutine to decode that value. + try: + result = next(batchable) + except Exception: + f.set_exception_info(*sys.exc_info()[1:]) + else: + f.set_result(result) + class wirepeer(repository.legacypeer): """Client-side interface for communicating with a peer repository.