diff --git a/mercurial/exchangev2.py b/mercurial/exchangev2.py --- a/mercurial/exchangev2.py +++ b/mercurial/exchangev2.py @@ -7,6 +7,7 @@ from __future__ import absolute_import +import collections import weakref from .i18n import _ @@ -58,7 +59,12 @@ remote.url(), pullop.gettransaction, explicit=pullop.explicitbookmarks) - _fetchmanifests(repo, tr, remote, csetres['manifestnodes']) + manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes']) + + # Find all file nodes referenced by added manifests and fetch those + # revisions. + fnodes = _derivefilesfrommanifests(repo, manres['added']) + _fetchfiles(repo, tr, remote, fnodes, manres['linkrevs']) def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): """Determine which changesets need to be pulled.""" @@ -291,4 +297,98 @@ return { 'added': added, + 'linkrevs': linkrevs, } + +def _derivefilesfrommanifests(repo, manifestnodes): + """Determine what file nodes are relevant given a set of manifest nodes. + + Returns a dict mapping file paths to dicts of file node to first manifest + node. + """ + ml = repo.manifestlog + fnodes = collections.defaultdict(dict) + + for manifestnode in manifestnodes: + m = ml.get(b'', manifestnode) + + # TODO this will pull in unwanted nodes because it takes the storage + # delta into consideration. What we really want is something that takes + # the delta between the manifest's parents. And ideally we would + # ignore file nodes that are known locally. For now, ignore both + # these limitations. This will result in incremental fetches requesting + # data we already have. So this is far from ideal. + md = m.readfast() + + for path, fnode in md.items(): + fnodes[path].setdefault(fnode, manifestnode) + + return fnodes + +def _fetchfiles(repo, tr, remote, fnodes, linkrevs): + def iterrevisions(objs, progress): + for filerevision in objs: + node = filerevision[b'node'] + + if b'deltasize' in filerevision: + basenode = filerevision[b'deltabasenode'] + delta = next(objs) + elif b'revisionsize' in filerevision: + basenode = nullid + revision = next(objs) + delta = mdiff.trivialdiffheader(len(revision)) + revision + else: + continue + + yield ( + node, + filerevision[b'parents'][0], + filerevision[b'parents'][1], + node, + basenode, + delta, + # Flags not yet supported. + 0, + ) + + progress.increment() + + progress = repo.ui.makeprogress( + _('files'), unit=_('chunks'), + total=sum(len(v) for v in fnodes.itervalues())) + + # TODO make batch size configurable + batchsize = 10000 + fnodeslist = [x for x in sorted(fnodes.items())] + + for i in pycompat.xrange(0, len(fnodeslist), batchsize): + batch = [x for x in fnodeslist[i:i + batchsize]] + if not batch: + continue + + with remote.commandexecutor() as e: + fs = [] + locallinkrevs = {} + + for path, nodes in batch: + fs.append((path, e.callcommand(b'filedata', { + b'path': path, + b'nodes': sorted(nodes), + b'fields': {b'parents', b'revision'} + }))) + + locallinkrevs[path] = { + node: linkrevs[manifestnode] + for node, manifestnode in nodes.iteritems()} + + for path, f in fs: + objs = f.result() + + # Chomp off header objects. + next(objs) + + store = repo.file(path) + store.addgroup( + iterrevisions(objs, progress), + locallinkrevs[path].__getitem__, + weakref.proxy(tr)) diff --git a/tests/test-wireproto-exchangev2.t b/tests/test-wireproto-exchangev2.t --- a/tests/test-wireproto-exchangev2.t +++ b/tests/test-wireproto-exchangev2.t @@ -94,6 +94,37 @@ received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation) received frame(size=922; request=1; stream=2; streamflags=; type=command-response; flags=continuation) received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + sending 2 commands + sending command filedata: { + 'fields': set([ + 'parents', + 'revision' + ]), + 'nodes': [ + '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda', + '\x9a8\x12)\x97\xb3\xac\x97\xbe*\x9a\xa2\xe5V\x83\x83A\xfd\xf2\xcc', + '\xc2\xa2\x05\xc8\xb2\xad\xe2J\xf2`b\xe5<\xd5\xbc8\x01\xd6`\xda' + ], + 'path': 'a' + } + sending command filedata: { + 'fields': set([ + 'parents', + 'revision' + ]), + 'nodes': [ + '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16', + '\xb1zk\xd3g=\x9a\xb8\xce\xd5\x81\xa2\t\xf6/=\xa5\xccEx', + '\xc5\xb1\xf9\xd3n\x1c\xc18\xbf\xb6\xef\xb3\xde\xb7]\x8c\xcad\x94\xc3' + ], + 'path': 'b' + } + received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation) + received frame(size=389; request=1; stream=2; streamflags=; type=command-response; flags=continuation) + received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation) + received frame(size=389; request=3; stream=2; streamflags=; type=command-response; flags=continuation) + received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos) updating the branch cache new changesets 3390ef850073:caa2a465451d (3 drafts) @@ -189,6 +220,34 @@ received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation) received frame(size=376; request=1; stream=2; streamflags=; type=command-response; flags=continuation) received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + sending 2 commands + sending command filedata: { + 'fields': set([ + 'parents', + 'revision' + ]), + 'nodes': [ + '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda', + '\x9a8\x12)\x97\xb3\xac\x97\xbe*\x9a\xa2\xe5V\x83\x83A\xfd\xf2\xcc' + ], + 'path': 'a' + } + sending command filedata: { + 'fields': set([ + 'parents', + 'revision' + ]), + 'nodes': [ + '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16' + ], + 'path': 'b' + } + received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation) + received frame(size=249; request=1; stream=2; streamflags=; type=command-response; flags=continuation) + received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation) + received frame(size=109; request=3; stream=2; streamflags=; type=command-response; flags=continuation) + received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos) updating the branch cache new changesets 3390ef850073:4432d83626e8 @@ -268,6 +327,36 @@ received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation) received frame(size=559; request=1; stream=2; streamflags=; type=command-response; flags=continuation) received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + sending 2 commands + sending command filedata: { + 'fields': set([ + 'parents', + 'revision' + ]), + 'nodes': [ + '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda', + '\xc2\xa2\x05\xc8\xb2\xad\xe2J\xf2`b\xe5<\xd5\xbc8\x01\xd6`\xda' + ], + 'path': 'a' + } + sending command filedata: { + 'fields': set([ + 'parents', + 'revision' + ]), + 'nodes': [ + '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16', + '\xb1zk\xd3g=\x9a\xb8\xce\xd5\x81\xa2\t\xf6/=\xa5\xccEx', + '\xc5\xb1\xf9\xd3n\x1c\xc18\xbf\xb6\xef\xb3\xde\xb7]\x8c\xcad\x94\xc3' + ], + 'path': 'b' + } + received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation) + received frame(size=249; request=1; stream=2; streamflags=; type=command-response; flags=continuation) + received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation) + received frame(size=389; request=3; stream=2; streamflags=; type=command-response; flags=continuation) + received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos) updating the branch cache new changesets cd2534766bec:caa2a465451d (3 drafts) (run 'hg update' to get a working copy) @@ -421,6 +510,37 @@ received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation) received frame(size=922; request=1; stream=2; streamflags=; type=command-response; flags=continuation) received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + sending 2 commands + sending command filedata: { + 'fields': set([ + 'parents', + 'revision' + ]), + 'nodes': [ + '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda', + '\x9a8\x12)\x97\xb3\xac\x97\xbe*\x9a\xa2\xe5V\x83\x83A\xfd\xf2\xcc', + '\xc2\xa2\x05\xc8\xb2\xad\xe2J\xf2`b\xe5<\xd5\xbc8\x01\xd6`\xda' + ], + 'path': 'a' + } + sending command filedata: { + 'fields': set([ + 'parents', + 'revision' + ]), + 'nodes': [ + '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16', + '\xb1zk\xd3g=\x9a\xb8\xce\xd5\x81\xa2\t\xf6/=\xa5\xccEx', + '\xc5\xb1\xf9\xd3n\x1c\xc18\xbf\xb6\xef\xb3\xde\xb7]\x8c\xcad\x94\xc3' + ], + 'path': 'b' + } + received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation) + received frame(size=389; request=1; stream=2; streamflags=; type=command-response; flags=continuation) + received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation) + received frame(size=389; request=3; stream=2; streamflags=; type=command-response; flags=continuation) + received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos) updating the branch cache new changesets 3390ef850073:caa2a465451d (1 drafts)