peer.py hardly contained any code. The code it did contain was
generic to the version 1 peer interface or specific to the
local repository peer.
So code has been moved to wireprotov1peer and localrepo, as
appropriate.
| durin42 |
| hg-reviewers |
peer.py hardly contained any code. The code it did contain was
generic to the version 1 peer interface or specific to the
local repository peer.
So code has been moved to wireprotov1peer and localrepo, as
appropriate.
| Lint Skipped |
| Unit Tests Skipped |
| Path | Packages | |||
|---|---|---|---|---|
| M | hgext/infinitepush/__init__.py (5 lines) | |||
| M | mercurial/localrepo.py (18 lines) | |||
| D | M | mercurial/peer.py (100 lines) | ||
| M | mercurial/wireprotov1peer.py (77 lines) | |||
| M | tests/test-batching.py (15 lines) |
| Status | Author | Revision | |
|---|---|---|---|
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg | ||
| Closed | indygreg |
| commands, | commands, | ||||
| discovery, | discovery, | ||||
| encoding, | encoding, | ||||
| error, | error, | ||||
| exchange, | exchange, | ||||
| extensions, | extensions, | ||||
| hg, | hg, | ||||
| localrepo, | localrepo, | ||||
| peer, | |||||
| phases, | phases, | ||||
| pushkey, | pushkey, | ||||
| registrar, | registrar, | ||||
| util, | util, | ||||
| wireproto, | wireproto, | ||||
| wireprototypes, | wireprototypes, | ||||
| wireprotov1peer, | wireprotov1peer, | ||||
| ) | ) | ||||
| kind, pat, matcher = stringutil.stringmatcher(pattern) | kind, pat, matcher = stringutil.stringmatcher(pattern) | ||||
| for bookmark, node in bookmarks.iteritems(): | for bookmark, node in bookmarks.iteritems(): | ||||
| if matcher(bookmark): | if matcher(bookmark): | ||||
| results[bookmark] = node | results[bookmark] = node | ||||
| return results | return results | ||||
| else: | else: | ||||
| return orig(self, namespace) | return orig(self, namespace) | ||||
| @peer.batchable | @wireprotov1peer.batchable | ||||
| def listkeyspatterns(self, namespace, patterns): | def listkeyspatterns(self, namespace, patterns): | ||||
| if not self.capable('pushkey'): | if not self.capable('pushkey'): | ||||
| yield {}, None | yield {}, None | ||||
| f = peer.future() | f = wireprotov1peer.future() | ||||
| self.ui.debug('preparing listkeys for "%s" with pattern "%s"\n' % | self.ui.debug('preparing listkeys for "%s" with pattern "%s"\n' % | ||||
| (namespace, patterns)) | (namespace, patterns)) | ||||
| yield { | yield { | ||||
| 'namespace': encoding.fromlocal(namespace), | 'namespace': encoding.fromlocal(namespace), | ||||
| 'patterns': wireprototypes.encodelist(patterns) | 'patterns': wireprototypes.encodelist(patterns) | ||||
| }, f | }, f | ||||
| d = f.value | d = f.value | ||||
| self.ui.debug('received listkey for "%s": %i bytes\n' | self.ui.debug('received listkey for "%s": %i bytes\n' | ||||
| manifest, | manifest, | ||||
| match as matchmod, | match as matchmod, | ||||
| merge as mergemod, | merge as mergemod, | ||||
| mergeutil, | mergeutil, | ||||
| namespaces, | namespaces, | ||||
| narrowspec, | narrowspec, | ||||
| obsolete, | obsolete, | ||||
| pathutil, | pathutil, | ||||
| peer, | |||||
| phases, | phases, | ||||
| pushkey, | pushkey, | ||||
| pycompat, | pycompat, | ||||
| repository, | repository, | ||||
| repoview, | repoview, | ||||
| revset, | revset, | ||||
| revsetlang, | revsetlang, | ||||
| scmutil, | scmutil, | ||||
| sparse, | sparse, | ||||
| store, | store, | ||||
| subrepoutil, | subrepoutil, | ||||
| tags as tagsmod, | tags as tagsmod, | ||||
| transaction, | transaction, | ||||
| txnutil, | txnutil, | ||||
| util, | util, | ||||
| vfs as vfsmod, | vfs as vfsmod, | ||||
| wireprotov1peer, | |||||
| ) | ) | ||||
| from .utils import ( | from .utils import ( | ||||
| procutil, | procutil, | ||||
| stringutil, | stringutil, | ||||
| ) | ) | ||||
| release = lockmod.release | release = lockmod.release | ||||
| urlerr = util.urlerr | urlerr = util.urlerr | ||||
| def wrapper(repo, *args, **kwargs): | def wrapper(repo, *args, **kwargs): | ||||
| return orig(repo.unfiltered(), *args, **kwargs) | return orig(repo.unfiltered(), *args, **kwargs) | ||||
| return wrapper | return wrapper | ||||
| moderncaps = {'lookup', 'branchmap', 'pushkey', 'known', 'getbundle', | moderncaps = {'lookup', 'branchmap', 'pushkey', 'known', 'getbundle', | ||||
| 'unbundle'} | 'unbundle'} | ||||
| legacycaps = moderncaps.union({'changegroupsubset'}) | legacycaps = moderncaps.union({'changegroupsubset'}) | ||||
| class localiterbatcher(wireprotov1peer.iterbatcher): | |||||
| def __init__(self, local): | |||||
| super(localiterbatcher, self).__init__() | |||||
| self.local = local | |||||
| def submit(self): | |||||
| # submit for a local iter batcher is a noop | |||||
| pass | |||||
| def results(self): | |||||
| for name, args, opts, resref in self.calls: | |||||
| resref.set(getattr(self.local, name)(*args, **opts)) | |||||
| yield resref.value | |||||
| class localpeer(repository.peer): | class localpeer(repository.peer): | ||||
| '''peer for a local repo; reflects only the most recent API''' | '''peer for a local repo; reflects only the most recent API''' | ||||
| def __init__(self, repo, caps=None): | def __init__(self, repo, caps=None): | ||||
| super(localpeer, self).__init__() | super(localpeer, self).__init__() | ||||
| if caps is None: | if caps is None: | ||||
| caps = moderncaps.copy() | caps = moderncaps.copy() | ||||
| raise error.ResponseError(_('push failed:'), | raise error.ResponseError(_('push failed:'), | ||||
| stringutil.forcebytestr(exc)) | stringutil.forcebytestr(exc)) | ||||
| # End of _basewirecommands interface. | # End of _basewirecommands interface. | ||||
| # Begin of peer interface. | # Begin of peer interface. | ||||
| def iterbatch(self): | def iterbatch(self): | ||||
| return peer.localiterbatcher(self) | return localiterbatcher(self) | ||||
| # End of peer interface. | # End of peer interface. | ||||
| class locallegacypeer(repository.legacypeer, localpeer): | class locallegacypeer(repository.legacypeer, localpeer): | ||||
| '''peer extension which implements legacy methods too; used for tests with | '''peer extension which implements legacy methods too; used for tests with | ||||
| restricted capabilities''' | restricted capabilities''' | ||||
| def __init__(self, repo): | def __init__(self, repo): | ||||
| # peer.py - repository base classes for mercurial | |||||
| # | |||||
| # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com> | |||||
| # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com> | |||||
| # | |||||
| # This software may be used and distributed according to the terms of the | |||||
| # GNU General Public License version 2 or any later version. | |||||
| from __future__ import absolute_import | |||||
| from . import ( | |||||
| error, | |||||
| pycompat, | |||||
| util, | |||||
| ) | |||||
| # abstract batching support | |||||
| class future(object): | |||||
| '''placeholder for a value to be set later''' | |||||
| def set(self, value): | |||||
| if util.safehasattr(self, 'value'): | |||||
| raise error.RepoError("future is already set") | |||||
| self.value = value | |||||
| class batcher(object): | |||||
| '''base class for batches of commands submittable in a single request | |||||
| All methods invoked on instances of this class are simply queued and | |||||
| return a a future for the result. Once you call submit(), all the queued | |||||
| calls are performed and the results set in their respective futures. | |||||
| ''' | |||||
| def __init__(self): | |||||
| self.calls = [] | |||||
| def __getattr__(self, name): | |||||
| def call(*args, **opts): | |||||
| resref = future() | |||||
| # Please don't invent non-ascii method names, or you will | |||||
| # give core hg a very sad time. | |||||
| self.calls.append((name.encode('ascii'), args, opts, resref,)) | |||||
| return resref | |||||
| return call | |||||
| def submit(self): | |||||
| raise NotImplementedError() | |||||
| class iterbatcher(batcher): | |||||
| def submit(self): | |||||
| raise NotImplementedError() | |||||
| def results(self): | |||||
| raise NotImplementedError() | |||||
| class localiterbatcher(iterbatcher): | |||||
| def __init__(self, local): | |||||
| super(iterbatcher, self).__init__() | |||||
| self.local = local | |||||
| def submit(self): | |||||
| # submit for a local iter batcher is a noop | |||||
| pass | |||||
| def results(self): | |||||
| for name, args, opts, resref in self.calls: | |||||
| resref.set(getattr(self.local, name)(*args, **opts)) | |||||
| yield resref.value | |||||
| def batchable(f): | |||||
| '''annotation for batchable methods | |||||
| Such methods must implement a coroutine as follows: | |||||
| @batchable | |||||
| def sample(self, one, two=None): | |||||
| # Build list of encoded arguments suitable for your wire protocol: | |||||
| encargs = [('one', encode(one),), ('two', encode(two),)] | |||||
| # Create future for injection of encoded result: | |||||
| encresref = future() | |||||
| # Return encoded arguments and future: | |||||
| yield encargs, encresref | |||||
| # Assuming the future to be filled with the result from the batched | |||||
| # request now. Decode it: | |||||
| yield decode(encresref.value) | |||||
| The decorator returns a function which wraps this coroutine as a plain | |||||
| method, but adds the original method as an attribute called "batchable", | |||||
| which is used by remotebatch to split the call into separate encoding and | |||||
| decoding phases. | |||||
| ''' | |||||
| def plain(*args, **opts): | |||||
| batchable = f(*args, **opts) | |||||
| encargsorres, encresref = next(batchable) | |||||
| if not encresref: | |||||
| return encargsorres # a local result in this case | |||||
| self = args[0] | |||||
| cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr | |||||
| encresref.set(self._submitone(cmd, encargsorres)) | |||||
| return next(batchable) | |||||
| setattr(plain, 'batchable', f) | |||||
| return plain | |||||
| bin, | bin, | ||||
| ) | ) | ||||
| from . import ( | from . import ( | ||||
| bundle2, | bundle2, | ||||
| changegroup as changegroupmod, | changegroup as changegroupmod, | ||||
| encoding, | encoding, | ||||
| error, | error, | ||||
| peer, | |||||
| pushkey as pushkeymod, | pushkey as pushkeymod, | ||||
| pycompat, | pycompat, | ||||
| repository, | repository, | ||||
| util, | util, | ||||
| wireprototypes, | wireprototypes, | ||||
| ) | ) | ||||
| urlreq = util.urlreq | urlreq = util.urlreq | ||||
| class remoteiterbatcher(peer.iterbatcher): | def batchable(f): | ||||
| '''annotation for batchable methods | |||||
| Such methods must implement a coroutine as follows: | |||||
| @batchable | |||||
| def sample(self, one, two=None): | |||||
| # Build list of encoded arguments suitable for your wire protocol: | |||||
| encargs = [('one', encode(one),), ('two', encode(two),)] | |||||
| # Create future for injection of encoded result: | |||||
| encresref = future() | |||||
| # Return encoded arguments and future: | |||||
| yield encargs, encresref | |||||
| # Assuming the future to be filled with the result from the batched | |||||
| # request now. Decode it: | |||||
| yield decode(encresref.value) | |||||
| The decorator returns a function which wraps this coroutine as a plain | |||||
| method, but adds the original method as an attribute called "batchable", | |||||
| which is used by remotebatch to split the call into separate encoding and | |||||
| decoding phases. | |||||
| ''' | |||||
| def plain(*args, **opts): | |||||
| batchable = f(*args, **opts) | |||||
| encargsorres, encresref = next(batchable) | |||||
| if not encresref: | |||||
| return encargsorres # a local result in this case | |||||
| self = args[0] | |||||
| cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr | |||||
| encresref.set(self._submitone(cmd, encargsorres)) | |||||
| return next(batchable) | |||||
| setattr(plain, 'batchable', f) | |||||
| return plain | |||||
| class future(object): | |||||
| '''placeholder for a value to be set later''' | |||||
| def set(self, value): | |||||
| if util.safehasattr(self, 'value'): | |||||
| raise error.RepoError("future is already set") | |||||
| self.value = value | |||||
| class batcher(object): | |||||
| '''base class for batches of commands submittable in a single request | |||||
| All methods invoked on instances of this class are simply queued and | |||||
| return a a future for the result. Once you call submit(), all the queued | |||||
| calls are performed and the results set in their respective futures. | |||||
| ''' | |||||
| def __init__(self): | |||||
| self.calls = [] | |||||
| def __getattr__(self, name): | |||||
| def call(*args, **opts): | |||||
| resref = future() | |||||
| # Please don't invent non-ascii method names, or you will | |||||
| # give core hg a very sad time. | |||||
| self.calls.append((name.encode('ascii'), args, opts, resref,)) | |||||
| return resref | |||||
| return call | |||||
| def submit(self): | |||||
| raise NotImplementedError() | |||||
| class iterbatcher(batcher): | |||||
| def submit(self): | |||||
| raise NotImplementedError() | |||||
| def results(self): | |||||
| raise NotImplementedError() | |||||
| class remoteiterbatcher(iterbatcher): | |||||
| def __init__(self, remote): | def __init__(self, remote): | ||||
| super(remoteiterbatcher, self).__init__() | super(remoteiterbatcher, self).__init__() | ||||
| self._remote = remote | self._remote = remote | ||||
| def __getattr__(self, name): | def __getattr__(self, name): | ||||
| # Validate this method is batchable, since submit() only supports | # Validate this method is batchable, since submit() only supports | ||||
| # batchable methods. | # batchable methods. | ||||
| fn = getattr(self._remote, name) | fn = getattr(self._remote, name) | ||||
| except StopIteration: | except StopIteration: | ||||
| pass | pass | ||||
| else: | else: | ||||
| raise error.ProgrammingError('%s @batchable generator emitted ' | raise error.ProgrammingError('%s @batchable generator emitted ' | ||||
| 'unexpected value count' % command) | 'unexpected value count' % command) | ||||
| yield finalfuture.value | yield finalfuture.value | ||||
| # Forward a couple of names from peer to make wireproto interactions | |||||
| # slightly more sensible. | |||||
| batchable = peer.batchable | |||||
| future = peer.future | |||||
| def encodebatchcmds(req): | def encodebatchcmds(req): | ||||
| """Return a ``cmds`` argument value for the ``batch`` command.""" | """Return a ``cmds`` argument value for the ``batch`` command.""" | ||||
| escapearg = wireprototypes.escapebatcharg | escapearg = wireprototypes.escapebatcharg | ||||
| cmds = [] | cmds = [] | ||||
| for op, argsdict in req: | for op, argsdict in req: | ||||
| # Old servers didn't properly unescape argument names. So prevent | # Old servers didn't properly unescape argument names. So prevent | ||||
| # the sending of argument names that may not be decoded properly by | # the sending of argument names that may not be decoded properly by | ||||
| # test-batching.py - tests for transparent command batching | # test-batching.py - tests for transparent command batching | ||||
| # | # | ||||
| # Copyright 2011 Peter Arrenbrecht <peter@arrenbrecht.ch> | # Copyright 2011 Peter Arrenbrecht <peter@arrenbrecht.ch> | ||||
| # | # | ||||
| # This software may be used and distributed according to the terms of the | # This software may be used and distributed according to the terms of the | ||||
| # GNU General Public License version 2 or any later version. | # GNU General Public License version 2 or any later version. | ||||
| from __future__ import absolute_import, print_function | from __future__ import absolute_import, print_function | ||||
| from mercurial import ( | from mercurial import ( | ||||
| error, | error, | ||||
| peer, | localrepo, | ||||
| util, | util, | ||||
| wireprotov1peer, | wireprotov1peer, | ||||
| ) | ) | ||||
| # equivalent of repo.repository | # equivalent of repo.repository | ||||
| class thing(object): | class thing(object): | ||||
| def hello(self): | def hello(self): | ||||
| return "Ready." | return "Ready." | ||||
| # equivalent of localrepo.localrepository | # equivalent of localrepo.localrepository | ||||
| class localthing(thing): | class localthing(thing): | ||||
| def foo(self, one, two=None): | def foo(self, one, two=None): | ||||
| if one: | if one: | ||||
| return "%s and %s" % (one, two,) | return "%s and %s" % (one, two,) | ||||
| return "Nope" | return "Nope" | ||||
| def bar(self, b, a): | def bar(self, b, a): | ||||
| return "%s und %s" % (b, a,) | return "%s und %s" % (b, a,) | ||||
| def greet(self, name=None): | def greet(self, name=None): | ||||
| return "Hello, %s" % name | return "Hello, %s" % name | ||||
| def batchiter(self): | def batchiter(self): | ||||
| '''Support for local batching.''' | '''Support for local batching.''' | ||||
| return peer.localiterbatcher(self) | return localrepo.localiterbatcher(self) | ||||
| # usage of "thing" interface | # usage of "thing" interface | ||||
| def use(it): | def use(it): | ||||
| # Direct call to base method shared between client and server. | # Direct call to base method shared between client and server. | ||||
| print(it.hello()) | print(it.hello()) | ||||
| # Direct calls to proxied methods. They cause individual roundtrips. | # Direct calls to proxied methods. They cause individual roundtrips. | ||||
| print(it.foo("Un", two="Deux")) | print(it.foo("Un", two="Deux")) | ||||
| print(it.bar("Eins", "Zwei")) | print(it.bar("Eins", "Zwei")) | ||||
| # Batched call to a couple of proxied methods. | # Batched call to a couple of proxied methods. | ||||
| batch = it.batchiter() | batch = it.batchiter() | ||||
| # The calls return futures to eventually hold results. | # The calls return futures to eventually hold results. | ||||
| foo = batch.foo(one="One", two="Two") | foo = batch.foo(one="One", two="Two") | ||||
| bar = batch.bar("Eins", "Zwei") | bar = batch.bar("Eins", "Zwei") | ||||
| bar2 = batch.bar(b="Uno", a="Due") | bar2 = batch.bar(b="Uno", a="Due") | ||||
| # Future shouldn't be set until we submit(). | # Future shouldn't be set until we submit(). | ||||
| assert isinstance(foo, peer.future) | assert isinstance(foo, wireprotov1peer.future) | ||||
| assert not util.safehasattr(foo, 'value') | assert not util.safehasattr(foo, 'value') | ||||
| assert not util.safehasattr(bar, 'value') | assert not util.safehasattr(bar, 'value') | ||||
| batch.submit() | batch.submit() | ||||
| # Call results() to obtain results as a generator. | # Call results() to obtain results as a generator. | ||||
| results = batch.results() | results = batch.results() | ||||
| # Future results shouldn't be set until we consume a value. | # Future results shouldn't be set until we consume a value. | ||||
| assert not util.safehasattr(foo, 'value') | assert not util.safehasattr(foo, 'value') | ||||
| req = ';'.join(req) | req = ';'.join(req) | ||||
| res = self._submitone('batch', [('cmds', req,)]) | res = self._submitone('batch', [('cmds', req,)]) | ||||
| for r in res.split(';'): | for r in res.split(';'): | ||||
| yield r | yield r | ||||
| def batchiter(self): | def batchiter(self): | ||||
| return wireprotov1peer.remoteiterbatcher(self) | return wireprotov1peer.remoteiterbatcher(self) | ||||
| @peer.batchable | @wireprotov1peer.batchable | ||||
| def foo(self, one, two=None): | def foo(self, one, two=None): | ||||
| encargs = [('one', mangle(one),), ('two', mangle(two),)] | encargs = [('one', mangle(one),), ('two', mangle(two),)] | ||||
| encresref = peer.future() | encresref = wireprotov1peer.future() | ||||
| yield encargs, encresref | yield encargs, encresref | ||||
| yield unmangle(encresref.value) | yield unmangle(encresref.value) | ||||
| @peer.batchable | @wireprotov1peer.batchable | ||||
| def bar(self, b, a): | def bar(self, b, a): | ||||
| encresref = peer.future() | encresref = wireprotov1peer.future() | ||||
| yield [('b', mangle(b),), ('a', mangle(a),)], encresref | yield [('b', mangle(b),), ('a', mangle(a),)], encresref | ||||
| yield unmangle(encresref.value) | yield unmangle(encresref.value) | ||||
| # greet is coded directly. It therefore does not support batching. If it | # greet is coded directly. It therefore does not support batching. If it | ||||
| # does appear in a batch, the batch is split around greet, and the call to | # does appear in a batch, the batch is split around greet, and the call to | ||||
| # greet is done in its own roundtrip. | # greet is done in its own roundtrip. | ||||
| def greet(self, name=None): | def greet(self, name=None): | ||||
| return unmangle(self._submitone('greet', [('name', mangle(name),)])) | return unmangle(self._submitone('greet', [('name', mangle(name),)])) | ||||
| # demo remote usage | # demo remote usage | ||||
| myproxy = remotething(myserver) | myproxy = remotething(myserver) | ||||
| print() | print() | ||||
| print("== Remote") | print("== Remote") | ||||
| use(myproxy) | use(myproxy) | ||||