diff --git a/mercurial/setdiscovery.py b/mercurial/setdiscovery.py --- a/mercurial/setdiscovery.py +++ b/mercurial/setdiscovery.py @@ -155,11 +155,14 @@ sample = _limitsample(ownheads, initialsamplesize) # indices between sample and externalized version must match sample = list(sample) - batch = remote.iterbatch() - batch.heads() - batch.known(dag.externalizeall(sample)) - batch.submit() - srvheadhashes, yesno = batch.results() + + with remote.commandexecutor() as e: + fheads = e.callcommand('heads', {}) + fknown = e.callcommand('known', { + 'nodes': dag.externalizeall(sample), + }) + + srvheadhashes, yesno = fheads.result(), fknown.result() if cl.tip() == nullid: if srvheadhashes != [nullid]: diff --git a/mercurial/wireprotov1peer.py b/mercurial/wireprotov1peer.py --- a/mercurial/wireprotov1peer.py +++ b/mercurial/wireprotov1peer.py @@ -9,6 +9,7 @@ import hashlib import sys +import weakref from .i18n import _ from .node import ( @@ -180,6 +181,26 @@ return ';'.join(cmds) +class unsentfuture(pycompat.futures.Future): + """A Future variation to represent an unsent command. + + Because we buffer commands and don't submit them immediately, calling + ``result()`` on an unsent future could deadlock. Futures for buffered + commands are represented by this type, which wraps ``result()`` to + call ``sendcommands()``. + """ + + def result(self, timeout=None): + if self.done(): + return pycompat.futures.Future.result(self, timeout) + + self._peerexecutor.sendcommands() + + # This looks like it will infinitely recurse. However, + # sendcommands() should modify __class__. This call serves as a check + # on that. + return self.result(timeout) + @zi.implementer(repository.ipeercommandexecutor) class peerexecutor(object): def __init__(self, peer): @@ -187,6 +208,9 @@ self._sent = False self._closed = False self._calls = [] + self._futures = weakref.WeakSet() + self._responseexecutor = None + self._responsef = None def __enter__(self): return self @@ -214,20 +238,35 @@ # Commands are either batchable or they aren't. If a command # isn't batchable, we send it immediately because the executor # can no longer accept new commands after a non-batchable command. - # If a command is batchable, we queue it for later. + # If a command is batchable, we queue it for later. But we have + # to account for the case of a non-batchable command arriving after + # a batchable one and refuse to service it. + + def addcall(): + f = pycompat.futures.Future() + self._futures.add(f) + self._calls.append((command, args, fn, f)) + return f if getattr(fn, 'batchable', False): - pass + f = addcall() + + # But since we don't issue it immediately, we wrap its result() + # to trigger sending so we avoid deadlocks. + f.__class__ = unsentfuture + f._peerexecutor = self else: if self._calls: raise error.ProgrammingError( '%s is not batchable and cannot be called on a command ' 'executor along with other commands' % command) - # We don't support batching yet. So resolve it immediately. - f = pycompat.futures.Future() - self._calls.append((command, args, fn, f)) - self.sendcommands() + f = addcall() + + # Non-batchable commands can never coexist with another command + # in this executor. So send the command immediately. + self.sendcommands() + return f def sendcommands(self): @@ -239,10 +278,18 @@ self._sent = True + # Unhack any future types so caller seens a clean type and to break + # cycle between us and futures. + for f in self._futures: + if isinstance(f, unsentfuture): + f.__class__ = pycompat.futures.Future + f._peerexecutor = None + calls = self._calls # Mainly to destroy references to futures. self._calls = None + # Simple case of a single command. We call it synchronously. if len(calls) == 1: command, args, fn, f = calls[0] @@ -259,14 +306,99 @@ return - raise error.ProgrammingError('support for multiple commands not ' - 'yet implemented') + # Batch commands are a bit harder. First, we have to deal with the + # @batchable coroutine. That's a bit annoying. Furthermore, we also + # need to preserve streaming. i.e. it should be possible for the + # futures to resolve as data is coming in off the wire without having + # to wait for the final byte of the final response. We do this by + # spinning up a thread to read the responses. + + requests = [] + states = [] + + for command, args, fn, f in calls: + # Future was cancelled. Ignore it. + if not f.set_running_or_notify_cancel(): + continue + + try: + batchable = fn.batchable(fn.__self__, + **pycompat.strkwargs(args)) + except Exception: + f.set_exception_info(*sys.exc_info()[1:]) + return + + # Encoded arguments and future holding remote result. + try: + encodedargs, fremote = next(batchable) + except Exception: + f.set_exception_info(*sys.exc_info()[1:]) + return + + requests.append((command, encodedargs)) + states.append((command, f, batchable, fremote)) + + if not requests: + return + + # This will emit responses in order they were executed. + wireresults = self._peer._submitbatch(requests) + + # The use of a thread pool executor here is a bit weird for something + # that only spins up a single thread. However, thread management is + # hard and it is easy to encounter race conditions, deadlocks, etc. + # concurrent.futures already solves these problems and its thread pool + # executor has minimal overhead. So we use it. + self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) + self._responsef = self._responseexecutor.submit(self._readbatchresponse, + states, wireresults) def close(self): self.sendcommands() + if self._closed: + return + self._closed = True + if not self._responsef: + return + + # We need to wait on our in-flight response and then shut down the + # executor once we have a result. + try: + self._responsef.result() + finally: + self._responseexecutor.shutdown(wait=True) + self._responsef = None + self._responseexecutor = None + + # If any of our futures are still in progress, mark them as + # errored. Otherwise a result() could wait indefinitely. + for f in self._futures: + if not f.done(): + f.set_exception(error.ResponseError( + _('unfulfilled batch command response'))) + + self._futures = None + + def _readbatchresponse(self, states, wireresults): + # Executes in a thread to read data off the wire. + + for command, f, batchable, fremote in states: + # Grab raw result off the wire and teach the internal future + # about it. + remoteresult = next(wireresults) + fremote.set(remoteresult) + + # And ask the coroutine to decode that value. + try: + result = next(batchable) + except Exception: + f.set_exception_info(*sys.exc_info()[1:]) + else: + f.set_result(result) + class wirepeer(repository.legacypeer): """Client-side interface for communicating with a peer repository.