diff --git a/mercurial/exchangev2.py b/mercurial/exchangev2.py --- a/mercurial/exchangev2.py +++ b/mercurial/exchangev2.py @@ -29,6 +29,18 @@ """Pull using wire protocol version 2.""" repo = pullop.repo remote = pullop.remote + + usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop) + + # If this is a clone and it was requested to perform a "stream clone", + # we obtain the raw files data from the remote then fall back to an + # incremental pull. This is somewhat hacky and is not nearly robust enough + # for long-term usage. + if usingrawchangelogandmanifest: + with repo.transaction('clone'): + _fetchrawstorefiles(repo, remote) + repo.invalidate(clearfilecache=True) + tr = pullop.trmanager.transaction() # We don't use the repo's narrow matcher here because the patterns passed @@ -79,11 +91,122 @@ manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes']) + # If obtaining the raw store files, we need to scan the full repo to + # derive all the changesets, manifests, and linkrevs. + if usingrawchangelogandmanifest: + csetsforfiles = [] + mnodesforfiles = [] + manifestlinkrevs = {} + + for rev in repo: + ctx = repo[rev] + mnode = ctx.manifestnode() + + csetsforfiles.append(ctx.node()) + mnodesforfiles.append(mnode) + manifestlinkrevs[mnode] = rev + + else: + csetsforfiles = csetres['added'] + mnodesforfiles = manres['added'] + manifestlinkrevs = manres['linkrevs'] + # Find all file nodes referenced by added manifests and fetch those # revisions. - fnodes = _derivefilesfrommanifests(repo, narrowmatcher, manres['added']) - _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csetres['added'], - manres['linkrevs']) + fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles) + _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csetsforfiles, + manifestlinkrevs) + +def _checkuserawstorefiledata(pullop): + """Check whether we should use rawstorefiledata command to retrieve data.""" + + repo = pullop.repo + remote = pullop.remote + + # Command to obtain raw store data isn't available. + if b'rawstorefiledata' not in remote.apidescriptor[b'commands']: + return False + + # Only honor if user requested stream clone operation. + if not pullop.streamclonerequested: + return False + + # Only works on empty repos. + if len(repo): + return False + + # TODO This is super hacky. There needs to be a storage API for this. We + # also need to check for compatibility with the remote. + if b'revlogv1' not in repo.requirements: + return False + + return True + +def _fetchrawstorefiles(repo, remote): + with remote.commandexecutor() as e: + objs = e.callcommand(b'rawstorefiledata', { + b'files': [b'changelog', b'manifestlog'], + }).result() + + # First object is a summary of files data that follows. + overall = next(objs) + + progress = repo.ui.makeprogress(_('clone'), total=overall[b'totalsize'], + unit=_('bytes')) + with progress: + progress.update(0) + + # Next are pairs of file metadata, data. + while True: + try: + filemeta = next(objs) + except StopIteration: + break + + for k in (b'location', b'path', b'size'): + if k not in filemeta: + raise error.Abort(_(b'remote file data missing key: %s') + % k) + + if filemeta[b'location'] == b'store': + vfs = repo.svfs + else: + raise error.Abort(_(b'invalid location for raw file data: ' + b'%s') % filemeta[b'location']) + + bytesremaining = filemeta[b'size'] + + with vfs.open(filemeta[b'path'], b'wb') as fh: + while True: + try: + chunk = next(objs) + except StopIteration: + break + + bytesremaining -= len(chunk) + + if bytesremaining < 0: + raise error.Abort(_( + b'received invalid number of bytes for file ' + b'data; expected %d, got extra') % + filemeta[b'size']) + + progress.increment(step=len(chunk)) + fh.write(chunk) + + try: + if chunk.islast: + break + except AttributeError: + raise error.Abort(_( + b'did not receive indefinite length bytestring ' + b'for file data')) + + if bytesremaining: + raise error.Abort(_(b'received invalid number of bytes for' + b'file data; expected %d got %d') % + (filemeta[b'size'], + filemeta[b'size'] - bytesremaining)) def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): """Determine which changesets need to be pulled.""" diff --git a/tests/test-wireproto-exchangev2.t b/tests/test-wireproto-exchangev2.t --- a/tests/test-wireproto-exchangev2.t +++ b/tests/test-wireproto-exchangev2.t @@ -963,3 +963,185 @@ client-narrow-2/.hg/store/00manifest.i client-narrow-2/.hg/store/data/dir0/d.i #endif + +--stream will use rawfiledata to transfer changelog and manifestlog, then +fall through to get files data + + $ hg --debug clone --stream -U http://localhost:$HGPORT client-stream-0 + using http://localhost:$HGPORT/ + sending capabilities command + sending 1 commands + sending command rawstorefiledata: { + 'files': [ + 'changelog', + 'manifestlog' + ] + } + received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos) + received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=1275; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + updating the branch cache + query 1; heads + sending 2 commands + sending command heads: {} + sending command known: { + 'nodes': [ + '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.' + ] + } + received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos) + received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=22; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + received frame(size=11; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=2; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos) + searching for changes + all remote heads known locally + sending 1 commands + sending command changesetdata: { + 'fields': set([ + 'bookmarks', + 'parents', + 'phase', + 'revision' + ]), + 'revisions': [ + { + 'heads': [ + '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.' + ], + 'roots': [ + '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.' + ], + 'type': 'changesetdagrange' + } + ] + } + received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos) + received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=13; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + checking for updated bookmarks + sending 1 commands + sending command filesdata: { + 'fields': set([ + 'parents', + 'revision' + ]), + 'haveparents': True, + 'revisions': [ + { + 'nodes': [ + '3\x90\xef\x85\x00s\xfb\xc2\xf0\xdf\xff"D4,\x8e\x92)\x01:', + '\xb7\t8\x08\x92\xb1\x93\xc1\t\x1d:\x81\x7fp`R\xe3F\x82\x1b', + 'G\xfe\x01*\xb27\xa8\xc7\xfc\x0cx\xf9\xf2mXf\xee\xf3\xf8%', + '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.' + ], + 'type': 'changesetexplicit' + } + ] + } + received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos) + received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=1133; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + (sent 5 HTTP requests and * bytes; received * bytes in responses) (glob) + +--stream + --include/--exclude will only obtain some files + + $ hg --debug --config extensions.pullext=$TESTDIR/pullext.py clone --stream --include dir0/ -U http://localhost:$HGPORT client-stream-2 + using http://localhost:$HGPORT/ + sending capabilities command + sending 1 commands + sending command rawstorefiledata: { + 'files': [ + 'changelog', + 'manifestlog' + ] + } + received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos) + received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=1275; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + updating the branch cache + query 1; heads + sending 2 commands + sending command heads: {} + sending command known: { + 'nodes': [ + '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.' + ] + } + received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos) + received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=22; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + received frame(size=11; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=2; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos) + searching for changes + all remote heads known locally + sending 1 commands + sending command changesetdata: { + 'fields': set([ + 'bookmarks', + 'parents', + 'phase', + 'revision' + ]), + 'revisions': [ + { + 'heads': [ + '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.' + ], + 'roots': [ + '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.' + ], + 'type': 'changesetdagrange' + } + ] + } + received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos) + received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=13; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + checking for updated bookmarks + sending 1 commands + sending command filesdata: { + 'fields': set([ + 'parents', + 'revision' + ]), + 'haveparents': True, + 'pathfilter': { + 'include': [ + 'path:dir0' + ] + }, + 'revisions': [ + { + 'nodes': [ + '3\x90\xef\x85\x00s\xfb\xc2\xf0\xdf\xff"D4,\x8e\x92)\x01:', + '\xb7\t8\x08\x92\xb1\x93\xc1\t\x1d:\x81\x7fp`R\xe3F\x82\x1b', + 'G\xfe\x01*\xb27\xa8\xc7\xfc\x0cx\xf9\xf2mXf\xee\xf3\xf8%', + '\x97v_\xc3\xcdbO\xd1\xfa\x01v\x93,!\xff\xd1j\xdfC.' + ], + 'type': 'changesetexplicit' + } + ] + } + received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos) + received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=449; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation) + received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) + (sent 5 HTTP requests and * bytes; received * bytes in responses) (glob) + +#if reporevlogstore + $ find client-stream-2/.hg/store -type f -name '*.i' | sort + client-stream-2/.hg/store/00changelog.i + client-stream-2/.hg/store/00manifest.i + client-stream-2/.hg/store/data/dir0/c.i + client-stream-2/.hg/store/data/dir0/d.i +#endif