diff --git a/mercurial/localrepo.py b/mercurial/localrepo.py --- a/mercurial/localrepo.py +++ b/mercurial/localrepo.py @@ -11,6 +11,7 @@ import hashlib import os import random +import sys import time import weakref @@ -167,6 +168,49 @@ resref.set(getattr(self.local, name)(*args, **opts)) yield resref.value +@zi.implementer(repository.ipeercommandexecutor) +class localcommandexecutor(object): + def __init__(self, peer): + self._peer = peer + self._sent = False + self._closed = False + + def __enter__(self): + return self + + def __exit__(self, exctype, excvalue, exctb): + self.close() + + def callcommand(self, command, args): + if self._sent: + raise error.ProgrammingError('callcommand() cannot be used after ' + 'sendcommands()') + + if self._closed: + raise error.ProgrammingError('callcommand() cannot be used after ' + 'close()') + + # We don't need to support anything fancy. Just call the named + # method on the peer and return a resolved future. + fn = getattr(self._peer, pycompat.sysstr(command)) + + f = pycompat.futures.Future() + + try: + result = fn(**args) + except Exception: + f.set_exception_info(*sys.exc_info()[1:]) + else: + f.set_result(result) + + return f + + def sendcommands(self): + self._sent = True + + def close(self): + self._closed = True + class localpeer(repository.peer): '''peer for a local repo; reflects only the most recent API''' @@ -286,6 +330,9 @@ # Begin of peer interface. + def commandexecutor(self): + return localcommandexecutor(self) + def iterbatch(self): return localiterbatcher(self) diff --git a/mercurial/repository.py b/mercurial/repository.py --- a/mercurial/repository.py +++ b/mercurial/repository.py @@ -278,7 +278,8 @@ being issued. """ -class ipeerbase(ipeerconnection, ipeercapabilities, ipeercommands): +class ipeerbase(ipeerconnection, ipeercapabilities, ipeercommands, + ipeerrequests): """Unified interface for peer repositories. All peer instances must conform to this interface. diff --git a/mercurial/setdiscovery.py b/mercurial/setdiscovery.py --- a/mercurial/setdiscovery.py +++ b/mercurial/setdiscovery.py @@ -228,7 +228,12 @@ % (roundtrips, len(undecided), len(sample))) # indices between sample and externalized version must match sample = list(sample) - yesno = remote.known(dag.externalizeall(sample)) + + with remote.commandexecutor() as e: + yesno = e.callcommand('known', { + 'nodes': dag.externalizeall(sample), + }).result() + full = True if sample: diff --git a/mercurial/wireprotov1peer.py b/mercurial/wireprotov1peer.py --- a/mercurial/wireprotov1peer.py +++ b/mercurial/wireprotov1peer.py @@ -8,12 +8,15 @@ from __future__ import absolute_import import hashlib +import sys from .i18n import _ from .node import ( bin, ) - +from .thirdparty.zope import ( + interface as zi, +) from . import ( bundle2, changegroup as changegroupmod, @@ -177,6 +180,93 @@ return ';'.join(cmds) +@zi.implementer(repository.ipeercommandexecutor) +class peerexecutor(object): + def __init__(self, peer): + self._peer = peer + self._sent = False + self._closed = False + self._calls = [] + + def __enter__(self): + return self + + def __exit__(self, exctype, excvalee, exctb): + self.close() + + def callcommand(self, command, args): + if self._sent: + raise error.ProgrammingError('callcommand() cannot be used ' + 'after commands are sent') + + if self._closed: + raise error.ProgrammingError('callcommand() cannot be used ' + 'after close()') + + # Commands are dispatched through methods on the peer. + fn = getattr(self._peer, pycompat.sysstr(command), None) + + if not fn: + raise error.ProgrammingError( + 'cannot call command %s: method of same name not available ' + 'on peer' % command) + + # Commands are either batchable or they aren't. If a command + # isn't batchable, we send it immediately because the executor + # can no longer accept new commands after a non-batchable command. + # If a command is batchable, we queue it for later. + + if getattr(fn, 'batchable', False): + pass + else: + if self._calls: + raise error.ProgrammingError( + '%s is not batchable and cannot be called on a command ' + 'executor along with other commands' % command) + + # We don't support batching yet. So resolve it immediately. + f = pycompat.futures.Future() + self._calls.append((command, args, fn, f)) + self.sendcommands() + return f + + def sendcommands(self): + if self._sent: + return + + if not self._calls: + return + + self._sent = True + + calls = self._calls + # Mainly to destroy references to futures. + self._calls = None + + if len(calls) == 1: + command, args, fn, f = calls[0] + + # Future was cancelled. Ignore it. + if not f.set_running_or_notify_cancel(): + return + + try: + result = fn(**pycompat.strkwargs(args)) + except Exception: + f.set_exception_info(*sys.exc_info()[1:]) + else: + f.set_result(result) + + return + + raise error.ProgrammingError('support for multiple commands not ' + 'yet implemented') + + def close(self): + self.sendcommands() + + self._closed = True + class wirepeer(repository.legacypeer): """Client-side interface for communicating with a peer repository. @@ -185,6 +275,9 @@ See also httppeer.py and sshpeer.py for protocol-specific implementations of this interface. """ + def commandexecutor(self): + return peerexecutor(self) + # Begin of ipeercommands interface. def iterbatch(self): diff --git a/tests/test-check-interfaces.py b/tests/test-check-interfaces.py --- a/tests/test-check-interfaces.py +++ b/tests/test-check-interfaces.py @@ -23,6 +23,7 @@ vfs as vfsmod, wireprotoserver, wireprototypes, + wireprotov1peer, wireprotov2server, ) @@ -102,6 +103,14 @@ localrepo.localpeer) checkzobject(localrepo.localpeer(dummyrepo())) + ziverify.verifyClass(repository.ipeercommandexecutor, + localrepo.localcommandexecutor) + checkzobject(localrepo.localcommandexecutor(None)) + + ziverify.verifyClass(repository.ipeercommandexecutor, + wireprotov1peer.peerexecutor) + checkzobject(wireprotov1peer.peerexecutor(None)) + ziverify.verifyClass(repository.ipeerbaselegacycommands, sshpeer.sshv1peer) checkzobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, dummypipe(),