Details
Details
- Reviewers
- None
- Group Reviewers
hg-reviewers - Commits
- rHGe59eaf51cc0d: streamclone: use progress helper
Diff Detail
Diff Detail
- Repository
- rHG Mercurial
- Lint
Lint Skipped - Unit
Unit Tests Skipped
| hg-reviewers |
| Lint Skipped |
| Unit Tests Skipped |
| Path | Packages | |||
|---|---|---|---|---|
| M | mercurial/streamclone.py (49 lines) |
| Commit | Parents | Author | Summary | Date |
|---|---|---|---|---|
| Martin von Zweigbergk | Jun 16 2018, 3:37 AM |
| Status | Author | Revision | |
|---|---|---|---|
| Closed | martinvonz | ||
| Closed | martinvonz | ||
| Closed | martinvonz | ||
| Closed | martinvonz | ||
| Closed | martinvonz | ||
| Closed | martinvonz |
| yield struct.pack('>QQ', filecount, bytecount) | yield struct.pack('>QQ', filecount, bytecount) | ||||
| yield struct.pack('>H', len(requires) + 1) | yield struct.pack('>H', len(requires) + 1) | ||||
| yield requires + '\0' | yield requires + '\0' | ||||
| # This is where we'll add compression in the future. | # This is where we'll add compression in the future. | ||||
| assert compression == 'UN' | assert compression == 'UN' | ||||
| seen = 0 | progress = repo.ui.makeprogress(_('bundle'), total=bytecount, | ||||
| repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes')) | unit=_('bytes')) | ||||
| progress.update(0) | |||||
| for chunk in it: | for chunk in it: | ||||
| seen += len(chunk) | progress.increment(step=len(chunk)) | ||||
| repo.ui.progress(_('bundle'), seen, total=bytecount, | |||||
| unit=_('bytes')) | |||||
| yield chunk | yield chunk | ||||
| repo.ui.progress(_('bundle'), None) | progress.update(None) | ||||
| return requirements, gen() | return requirements, gen() | ||||
| def consumev1(repo, fp, filecount, bytecount): | def consumev1(repo, fp, filecount, bytecount): | ||||
| """Apply the contents from version 1 of a streaming clone file handle. | """Apply the contents from version 1 of a streaming clone file handle. | ||||
| This takes the output from "stream_out" and applies it to the specified | This takes the output from "stream_out" and applies it to the specified | ||||
| repository. | repository. | ||||
| Like "stream_out," the status line added by the wire protocol is not | Like "stream_out," the status line added by the wire protocol is not | ||||
| handled by this function. | handled by this function. | ||||
| """ | """ | ||||
| with repo.lock(): | with repo.lock(): | ||||
| repo.ui.status(_('%d files to transfer, %s of data\n') % | repo.ui.status(_('%d files to transfer, %s of data\n') % | ||||
| (filecount, util.bytecount(bytecount))) | (filecount, util.bytecount(bytecount))) | ||||
| handled_bytes = 0 | progress = repo.ui.makeprogress(_('clone'), total=bytecount, | ||||
| repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes')) | unit=_('bytes')) | ||||
| progress.update(0) | |||||
| start = util.timer() | start = util.timer() | ||||
| # TODO: get rid of (potential) inconsistency | # TODO: get rid of (potential) inconsistency | ||||
| # | # | ||||
| # If transaction is started and any @filecache property is | # If transaction is started and any @filecache property is | ||||
| # changed at this point, it causes inconsistency between | # changed at this point, it causes inconsistency between | ||||
| # in-memory cached property and streamclone-ed file on the | # in-memory cached property and streamclone-ed file on the | ||||
| # disk. Nested transaction prevents transaction scope "clone" | # disk. Nested transaction prevents transaction scope "clone" | ||||
| _('unexpected response from remote server:'), l) | _('unexpected response from remote server:'), l) | ||||
| if repo.ui.debugflag: | if repo.ui.debugflag: | ||||
| repo.ui.debug('adding %s (%s)\n' % | repo.ui.debug('adding %s (%s)\n' % | ||||
| (name, util.bytecount(size))) | (name, util.bytecount(size))) | ||||
| # for backwards compat, name was partially encoded | # for backwards compat, name was partially encoded | ||||
| path = store.decodedir(name) | path = store.decodedir(name) | ||||
| with repo.svfs(path, 'w', backgroundclose=True) as ofp: | with repo.svfs(path, 'w', backgroundclose=True) as ofp: | ||||
| for chunk in util.filechunkiter(fp, limit=size): | for chunk in util.filechunkiter(fp, limit=size): | ||||
| handled_bytes += len(chunk) | progress.increment(step=len(chunk)) | ||||
| repo.ui.progress(_('clone'), handled_bytes, | |||||
| total=bytecount, unit=_('bytes')) | |||||
| ofp.write(chunk) | ofp.write(chunk) | ||||
| # force @filecache properties to be reloaded from | # force @filecache properties to be reloaded from | ||||
| # streamclone-ed file at next access | # streamclone-ed file at next access | ||||
| repo.invalidate(clearfilecache=True) | repo.invalidate(clearfilecache=True) | ||||
| elapsed = util.timer() - start | elapsed = util.timer() - start | ||||
| if elapsed <= 0: | if elapsed <= 0: | ||||
| elapsed = 0.001 | elapsed = 0.001 | ||||
| repo.ui.progress(_('clone'), None) | progress.update(None) | ||||
| repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | ||||
| (util.bytecount(bytecount), elapsed, | (util.bytecount(bytecount), elapsed, | ||||
| util.bytecount(bytecount / elapsed))) | util.bytecount(bytecount / elapsed))) | ||||
| def readbundle1header(fp): | def readbundle1header(fp): | ||||
| compression = fp.read(2) | compression = fp.read(2) | ||||
| if compression != 'UN': | if compression != 'UN': | ||||
| raise error.Abort(_('only uncompressed stream clone bundles are ' | raise error.Abort(_('only uncompressed stream clone bundles are ' | ||||
| # (eg: .hg/hgrc) | # (eg: .hg/hgrc) | ||||
| assert repo.vfs not in vfsmap.values() | assert repo.vfs not in vfsmap.values() | ||||
| return vfsmap | return vfsmap | ||||
| def _emit2(repo, entries, totalfilesize): | def _emit2(repo, entries, totalfilesize): | ||||
| """actually emit the stream bundle""" | """actually emit the stream bundle""" | ||||
| vfsmap = _makemap(repo) | vfsmap = _makemap(repo) | ||||
| progress = repo.ui.progress | progress = repo.ui.makeprogress(_('bundle'), total=totalfilesize, | ||||
| progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes')) | unit=_('bytes')) | ||||
| progress.update(0) | |||||
| with maketempcopies() as copy: | with maketempcopies() as copy: | ||||
| try: | try: | ||||
| # copy is delayed until we are in the try | # copy is delayed until we are in the try | ||||
| entries = [_filterfull(e, copy, vfsmap) for e in entries] | entries = [_filterfull(e, copy, vfsmap) for e in entries] | ||||
| yield None # this release the lock on the repository | yield None # this release the lock on the repository | ||||
| seen = 0 | seen = 0 | ||||
| for src, name, ftype, data in entries: | for src, name, ftype, data in entries: | ||||
| yield util.uvarintencode(size) | yield util.uvarintencode(size) | ||||
| yield name | yield name | ||||
| if size <= 65536: | if size <= 65536: | ||||
| chunks = (fp.read(size),) | chunks = (fp.read(size),) | ||||
| else: | else: | ||||
| chunks = util.filechunkiter(fp, limit=size) | chunks = util.filechunkiter(fp, limit=size) | ||||
| for chunk in chunks: | for chunk in chunks: | ||||
| seen += len(chunk) | seen += len(chunk) | ||||
| progress(_('bundle'), seen, total=totalfilesize, | progress.update(seen) | ||||
| unit=_('bytes')) | |||||
| yield chunk | yield chunk | ||||
| finally: | finally: | ||||
| fp.close() | fp.close() | ||||
| finally: | finally: | ||||
| progress(_('bundle'), None) | progress.update(None) | ||||
| def generatev2(repo): | def generatev2(repo): | ||||
| """Emit content for version 2 of a streaming clone. | """Emit content for version 2 of a streaming clone. | ||||
| the data stream consists the following entries: | the data stream consists the following entries: | ||||
| 1) A char representing the file destination (eg: store or cache) | 1) A char representing the file destination (eg: store or cache) | ||||
| 2) A varint containing the length of the filename | 2) A varint containing the length of the filename | ||||
| 3) A varint containing the length of file data | 3) A varint containing the length of file data | ||||
| Data is read from an object that only needs to provide a ``read(size)`` | Data is read from an object that only needs to provide a ``read(size)`` | ||||
| method. | method. | ||||
| """ | """ | ||||
| with repo.lock(): | with repo.lock(): | ||||
| repo.ui.status(_('%d files to transfer, %s of data\n') % | repo.ui.status(_('%d files to transfer, %s of data\n') % | ||||
| (filecount, util.bytecount(filesize))) | (filecount, util.bytecount(filesize))) | ||||
| start = util.timer() | start = util.timer() | ||||
| handledbytes = 0 | progress = repo.ui.makeprogress(_('clone'), total=filesize, | ||||
| progress = repo.ui.progress | unit=_('bytes')) | ||||
| progress.update(0) | |||||
| progress(_('clone'), handledbytes, total=filesize, unit=_('bytes')) | |||||
| vfsmap = _makemap(repo) | vfsmap = _makemap(repo) | ||||
| with repo.transaction('clone'): | with repo.transaction('clone'): | ||||
| ctxs = (vfs.backgroundclosing(repo.ui) | ctxs = (vfs.backgroundclosing(repo.ui) | ||||
| for vfs in vfsmap.values()) | for vfs in vfsmap.values()) | ||||
| with nested(*ctxs): | with nested(*ctxs): | ||||
| for i in range(filecount): | for i in range(filecount): | ||||
| src = util.readexactly(fp, 1) | src = util.readexactly(fp, 1) | ||||
| vfs = vfsmap[src] | vfs = vfsmap[src] | ||||
| namelen = util.uvarintdecodestream(fp) | namelen = util.uvarintdecodestream(fp) | ||||
| datalen = util.uvarintdecodestream(fp) | datalen = util.uvarintdecodestream(fp) | ||||
| name = util.readexactly(fp, namelen) | name = util.readexactly(fp, namelen) | ||||
| if repo.ui.debugflag: | if repo.ui.debugflag: | ||||
| repo.ui.debug('adding [%s] %s (%s)\n' % | repo.ui.debug('adding [%s] %s (%s)\n' % | ||||
| (src, name, util.bytecount(datalen))) | (src, name, util.bytecount(datalen))) | ||||
| with vfs(name, 'w') as ofp: | with vfs(name, 'w') as ofp: | ||||
| for chunk in util.filechunkiter(fp, limit=datalen): | for chunk in util.filechunkiter(fp, limit=datalen): | ||||
| handledbytes += len(chunk) | progress.increment(step=len(chunk)) | ||||
| progress(_('clone'), handledbytes, total=filesize, | |||||
| unit=_('bytes')) | |||||
| ofp.write(chunk) | ofp.write(chunk) | ||||
| # force @filecache properties to be reloaded from | # force @filecache properties to be reloaded from | ||||
| # streamclone-ed file at next access | # streamclone-ed file at next access | ||||
| repo.invalidate(clearfilecache=True) | repo.invalidate(clearfilecache=True) | ||||
| elapsed = util.timer() - start | elapsed = util.timer() - start | ||||
| if elapsed <= 0: | if elapsed <= 0: | ||||
| elapsed = 0.001 | elapsed = 0.001 | ||||
| progress(_('clone'), None) | progress.update(None) | ||||
| repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | ||||
| (util.bytecount(handledbytes), elapsed, | (util.bytecount(progress.pos), elapsed, | ||||
| util.bytecount(handledbytes / elapsed))) | util.bytecount(progress.pos / elapsed))) | ||||
| def applybundlev2(repo, fp, filecount, filesize, requirements): | def applybundlev2(repo, fp, filecount, filesize, requirements): | ||||
| missingreqs = [r for r in requirements if r not in repo.supported] | missingreqs = [r for r in requirements if r not in repo.supported] | ||||
| if missingreqs: | if missingreqs: | ||||
| raise error.Abort(_('unable to apply stream clone: ' | raise error.Abort(_('unable to apply stream clone: ' | ||||
| 'unsupported format: %s') % | 'unsupported format: %s') % | ||||
| ', '.join(sorted(missingreqs))) | ', '.join(sorted(missingreqs))) | ||||