diff --git a/mercurial/localrepo.py b/mercurial/localrepo.py --- a/mercurial/localrepo.py +++ b/mercurial/localrepo.py @@ -7,10 +7,12 @@ from __future__ import absolute_import +import contextlib import errno import hashlib import os import random +import sys import time import weakref @@ -167,6 +169,44 @@ resref.set(getattr(self.local, name)(*args, **opts)) yield resref.value +@zi.implementer(repository.ipeercommandexecutor) +class localcommandexecutor(object): + def __init__(self, peer): + self._peer = peer + self._sent = False + self._closed = False + + def callcommand(self, command, args): + if self._sent: + raise error.ProgrammingError('callcommand() cannot be used after ' + 'sendcommands()') + + if self._closed: + raise error.ProgrammingError('callcommand() cannot be used after ' + 'close()') + + # We don't need to support anything fancy. Just call the named + # method on the peer and return a resolved future. + fn = getattr(self._peer, pycompat.sysstr(command)) + + f = pycompat.futures.Future() + + try: + result = fn(**args) + except Exception: + e, tb = sys.exc_info()[1:] + f.set_exception_info(e, tb) + else: + f.set_result(result) + + return f + + def sendcommands(self): + self._sent = True + + def close(self): + self._closed = True + class localpeer(repository.peer): '''peer for a local repo; reflects only the most recent API''' @@ -286,6 +326,14 @@ # Begin of peer interface. + @contextlib.contextmanager + def commandexecutor(self): + executor = localcommandexecutor(self) + try: + yield executor + finally: + executor.close() + def iterbatch(self): return localiterbatcher(self) diff --git a/mercurial/repository.py b/mercurial/repository.py --- a/mercurial/repository.py +++ b/mercurial/repository.py @@ -268,7 +268,8 @@ being issued. """ -class ipeerbase(ipeerconnection, ipeercapabilities, ipeercommands): +class ipeerbase(ipeerconnection, ipeercapabilities, ipeercommands, + ipeerrequests): """Unified interface for peer repositories. All peer instances must conform to this interface. diff --git a/mercurial/setdiscovery.py b/mercurial/setdiscovery.py --- a/mercurial/setdiscovery.py +++ b/mercurial/setdiscovery.py @@ -228,7 +228,14 @@ % (roundtrips, len(undecided), len(sample))) # indices between sample and externalized version must match sample = list(sample) - yesno = remote.known(dag.externalizeall(sample)) + + with remote.commandexecutor() as e: + fyesno = e.callcommand('known', { + 'nodes': dag.externalizeall(sample), + }) + + yesno = fyesno.result() + full = True if sample: diff --git a/mercurial/wireprotov1peer.py b/mercurial/wireprotov1peer.py --- a/mercurial/wireprotov1peer.py +++ b/mercurial/wireprotov1peer.py @@ -7,13 +7,17 @@ from __future__ import absolute_import +import contextlib import hashlib +import sys from .i18n import _ from .node import ( bin, ) - +from .thirdparty.zope import ( + interface as zi, +) from . import ( bundle2, changegroup as changegroupmod, @@ -177,6 +181,80 @@ return ';'.join(cmds) +@zi.implementer(repository.ipeercommandexecutor) +class peerexecutor(object): + def __init__(self, peer): + self._peer = peer + self._sent = False + self._closed = False + self._calls = [] + + def callcommand(self, command, args): + if self._sent: + raise error.ProgrammingError('callcommand() cannot be used ' + 'after sendcommands()') + + if self._closed: + raise error.ProgrammingError('callcommand() cannot be used ' + 'after close()') + + # Commands typically have methods on the peer + fn = getattr(self._peer, pycompat.sysstr(command), None) + + if fn: + # Not all commands are batchable. So verify we don't attempt + # to batch non-batchable commands. + isbatchable = getattr(fn, 'batchable', False) + + if not isbatchable and self._calls: + raise error.ProgrammingError( + '%s is not batchable and cannot be called on a command ' + 'executor along with other commands' % command) + + if self._calls and not self._calls[-1][2]: + raise error.ProgrammingError( + '%s cannot be called on a command executor after a ' + 'non-batchable command') + + f = pycompat.futures.Future() + + self._calls.append((command, args, fn, f)) + + return f + + def sendcommands(self): + if self._sent: + return + + if not self._calls: + return + + self._sent = True + + if len(self._calls) == 1: + command, args, fn, f = self._calls[0] + + # Future was cancelled. Ignore it. + if not f.set_running_or_notify_cancel(): + return + + try: + result = fn(**pycompat.strkwargs(args)) + except Exception: + f.set_exception_info(*sys.exc_info()[1:]) + else: + f.set_result(result) + + return + + raise error.ProgrammingError('support for multiple commands not ' + 'yet implemented') + + def close(self): + self.sendcommands() + + self._closed = True + class wirepeer(repository.legacypeer): """Client-side interface for communicating with a peer repository. @@ -185,6 +263,14 @@ See also httppeer.py and sshpeer.py for protocol-specific implementations of this interface. """ + @contextlib.contextmanager + def commandexecutor(self): + executor = peerexecutor(self) + try: + yield executor + finally: + executor.close() + # Begin of ipeercommands interface. def iterbatch(self): diff --git a/tests/test-check-interfaces.py b/tests/test-check-interfaces.py --- a/tests/test-check-interfaces.py +++ b/tests/test-check-interfaces.py @@ -23,6 +23,7 @@ vfs as vfsmod, wireprotoserver, wireprototypes, + wireprotov1peer, wireprotov2server, ) @@ -102,6 +103,14 @@ localrepo.localpeer) checkzobject(localrepo.localpeer(dummyrepo())) + ziverify.verifyClass(repository.ipeercommandexecutor, + localrepo.localcommandexecutor) + checkzobject(localrepo.localcommandexecutor(None)) + + ziverify.verifyClass(repository.ipeercommandexecutor, + wireprotov1peer.peerexecutor) + checkzobject(wireprotov1peer.peerexecutor(None)) + ziverify.verifyClass(repository.ipeerbaselegacycommands, sshpeer.sshv1peer) checkzobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, dummypipe(),