Details
Details
- Reviewers
- None
- Group Reviewers
hg-reviewers - Commits
- rHGe59eaf51cc0d: streamclone: use progress helper
Diff Detail
Diff Detail
- Repository
- rHG Mercurial
- Lint
Lint Skipped - Unit
Unit Tests Skipped
hg-reviewers |
Lint Skipped |
Unit Tests Skipped |
Path | Packages | |||
---|---|---|---|---|
M | mercurial/streamclone.py (49 lines) |
Commit | Parents | Author | Summary | Date |
---|---|---|---|---|
Martin von Zweigbergk | Jun 16 2018, 3:37 AM |
Status | Author | Revision | |
---|---|---|---|
Closed | martinvonz | ||
Closed | martinvonz | ||
Closed | martinvonz | ||
Closed | martinvonz | ||
Closed | martinvonz | ||
Closed | martinvonz |
yield struct.pack('>QQ', filecount, bytecount) | yield struct.pack('>QQ', filecount, bytecount) | ||||
yield struct.pack('>H', len(requires) + 1) | yield struct.pack('>H', len(requires) + 1) | ||||
yield requires + '\0' | yield requires + '\0' | ||||
# This is where we'll add compression in the future. | # This is where we'll add compression in the future. | ||||
assert compression == 'UN' | assert compression == 'UN' | ||||
seen = 0 | progress = repo.ui.makeprogress(_('bundle'), total=bytecount, | ||||
repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes')) | unit=_('bytes')) | ||||
progress.update(0) | |||||
for chunk in it: | for chunk in it: | ||||
seen += len(chunk) | progress.increment(step=len(chunk)) | ||||
repo.ui.progress(_('bundle'), seen, total=bytecount, | |||||
unit=_('bytes')) | |||||
yield chunk | yield chunk | ||||
repo.ui.progress(_('bundle'), None) | progress.update(None) | ||||
return requirements, gen() | return requirements, gen() | ||||
def consumev1(repo, fp, filecount, bytecount): | def consumev1(repo, fp, filecount, bytecount): | ||||
"""Apply the contents from version 1 of a streaming clone file handle. | """Apply the contents from version 1 of a streaming clone file handle. | ||||
This takes the output from "stream_out" and applies it to the specified | This takes the output from "stream_out" and applies it to the specified | ||||
repository. | repository. | ||||
Like "stream_out," the status line added by the wire protocol is not | Like "stream_out," the status line added by the wire protocol is not | ||||
handled by this function. | handled by this function. | ||||
""" | """ | ||||
with repo.lock(): | with repo.lock(): | ||||
repo.ui.status(_('%d files to transfer, %s of data\n') % | repo.ui.status(_('%d files to transfer, %s of data\n') % | ||||
(filecount, util.bytecount(bytecount))) | (filecount, util.bytecount(bytecount))) | ||||
handled_bytes = 0 | progress = repo.ui.makeprogress(_('clone'), total=bytecount, | ||||
repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes')) | unit=_('bytes')) | ||||
progress.update(0) | |||||
start = util.timer() | start = util.timer() | ||||
# TODO: get rid of (potential) inconsistency | # TODO: get rid of (potential) inconsistency | ||||
# | # | ||||
# If transaction is started and any @filecache property is | # If transaction is started and any @filecache property is | ||||
# changed at this point, it causes inconsistency between | # changed at this point, it causes inconsistency between | ||||
# in-memory cached property and streamclone-ed file on the | # in-memory cached property and streamclone-ed file on the | ||||
# disk. Nested transaction prevents transaction scope "clone" | # disk. Nested transaction prevents transaction scope "clone" | ||||
_('unexpected response from remote server:'), l) | _('unexpected response from remote server:'), l) | ||||
if repo.ui.debugflag: | if repo.ui.debugflag: | ||||
repo.ui.debug('adding %s (%s)\n' % | repo.ui.debug('adding %s (%s)\n' % | ||||
(name, util.bytecount(size))) | (name, util.bytecount(size))) | ||||
# for backwards compat, name was partially encoded | # for backwards compat, name was partially encoded | ||||
path = store.decodedir(name) | path = store.decodedir(name) | ||||
with repo.svfs(path, 'w', backgroundclose=True) as ofp: | with repo.svfs(path, 'w', backgroundclose=True) as ofp: | ||||
for chunk in util.filechunkiter(fp, limit=size): | for chunk in util.filechunkiter(fp, limit=size): | ||||
handled_bytes += len(chunk) | progress.increment(step=len(chunk)) | ||||
repo.ui.progress(_('clone'), handled_bytes, | |||||
total=bytecount, unit=_('bytes')) | |||||
ofp.write(chunk) | ofp.write(chunk) | ||||
# force @filecache properties to be reloaded from | # force @filecache properties to be reloaded from | ||||
# streamclone-ed file at next access | # streamclone-ed file at next access | ||||
repo.invalidate(clearfilecache=True) | repo.invalidate(clearfilecache=True) | ||||
elapsed = util.timer() - start | elapsed = util.timer() - start | ||||
if elapsed <= 0: | if elapsed <= 0: | ||||
elapsed = 0.001 | elapsed = 0.001 | ||||
repo.ui.progress(_('clone'), None) | progress.update(None) | ||||
repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | ||||
(util.bytecount(bytecount), elapsed, | (util.bytecount(bytecount), elapsed, | ||||
util.bytecount(bytecount / elapsed))) | util.bytecount(bytecount / elapsed))) | ||||
def readbundle1header(fp): | def readbundle1header(fp): | ||||
compression = fp.read(2) | compression = fp.read(2) | ||||
if compression != 'UN': | if compression != 'UN': | ||||
raise error.Abort(_('only uncompressed stream clone bundles are ' | raise error.Abort(_('only uncompressed stream clone bundles are ' | ||||
# (eg: .hg/hgrc) | # (eg: .hg/hgrc) | ||||
assert repo.vfs not in vfsmap.values() | assert repo.vfs not in vfsmap.values() | ||||
return vfsmap | return vfsmap | ||||
def _emit2(repo, entries, totalfilesize): | def _emit2(repo, entries, totalfilesize): | ||||
"""actually emit the stream bundle""" | """actually emit the stream bundle""" | ||||
vfsmap = _makemap(repo) | vfsmap = _makemap(repo) | ||||
progress = repo.ui.progress | progress = repo.ui.makeprogress(_('bundle'), total=totalfilesize, | ||||
progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes')) | unit=_('bytes')) | ||||
progress.update(0) | |||||
with maketempcopies() as copy: | with maketempcopies() as copy: | ||||
try: | try: | ||||
# copy is delayed until we are in the try | # copy is delayed until we are in the try | ||||
entries = [_filterfull(e, copy, vfsmap) for e in entries] | entries = [_filterfull(e, copy, vfsmap) for e in entries] | ||||
yield None # this release the lock on the repository | yield None # this release the lock on the repository | ||||
seen = 0 | seen = 0 | ||||
for src, name, ftype, data in entries: | for src, name, ftype, data in entries: | ||||
yield util.uvarintencode(size) | yield util.uvarintencode(size) | ||||
yield name | yield name | ||||
if size <= 65536: | if size <= 65536: | ||||
chunks = (fp.read(size),) | chunks = (fp.read(size),) | ||||
else: | else: | ||||
chunks = util.filechunkiter(fp, limit=size) | chunks = util.filechunkiter(fp, limit=size) | ||||
for chunk in chunks: | for chunk in chunks: | ||||
seen += len(chunk) | seen += len(chunk) | ||||
progress(_('bundle'), seen, total=totalfilesize, | progress.update(seen) | ||||
unit=_('bytes')) | |||||
yield chunk | yield chunk | ||||
finally: | finally: | ||||
fp.close() | fp.close() | ||||
finally: | finally: | ||||
progress(_('bundle'), None) | progress.update(None) | ||||
def generatev2(repo): | def generatev2(repo): | ||||
"""Emit content for version 2 of a streaming clone. | """Emit content for version 2 of a streaming clone. | ||||
the data stream consists the following entries: | the data stream consists the following entries: | ||||
1) A char representing the file destination (eg: store or cache) | 1) A char representing the file destination (eg: store or cache) | ||||
2) A varint containing the length of the filename | 2) A varint containing the length of the filename | ||||
3) A varint containing the length of file data | 3) A varint containing the length of file data | ||||
Data is read from an object that only needs to provide a ``read(size)`` | Data is read from an object that only needs to provide a ``read(size)`` | ||||
method. | method. | ||||
""" | """ | ||||
with repo.lock(): | with repo.lock(): | ||||
repo.ui.status(_('%d files to transfer, %s of data\n') % | repo.ui.status(_('%d files to transfer, %s of data\n') % | ||||
(filecount, util.bytecount(filesize))) | (filecount, util.bytecount(filesize))) | ||||
start = util.timer() | start = util.timer() | ||||
handledbytes = 0 | progress = repo.ui.makeprogress(_('clone'), total=filesize, | ||||
progress = repo.ui.progress | unit=_('bytes')) | ||||
progress.update(0) | |||||
progress(_('clone'), handledbytes, total=filesize, unit=_('bytes')) | |||||
vfsmap = _makemap(repo) | vfsmap = _makemap(repo) | ||||
with repo.transaction('clone'): | with repo.transaction('clone'): | ||||
ctxs = (vfs.backgroundclosing(repo.ui) | ctxs = (vfs.backgroundclosing(repo.ui) | ||||
for vfs in vfsmap.values()) | for vfs in vfsmap.values()) | ||||
with nested(*ctxs): | with nested(*ctxs): | ||||
for i in range(filecount): | for i in range(filecount): | ||||
src = util.readexactly(fp, 1) | src = util.readexactly(fp, 1) | ||||
vfs = vfsmap[src] | vfs = vfsmap[src] | ||||
namelen = util.uvarintdecodestream(fp) | namelen = util.uvarintdecodestream(fp) | ||||
datalen = util.uvarintdecodestream(fp) | datalen = util.uvarintdecodestream(fp) | ||||
name = util.readexactly(fp, namelen) | name = util.readexactly(fp, namelen) | ||||
if repo.ui.debugflag: | if repo.ui.debugflag: | ||||
repo.ui.debug('adding [%s] %s (%s)\n' % | repo.ui.debug('adding [%s] %s (%s)\n' % | ||||
(src, name, util.bytecount(datalen))) | (src, name, util.bytecount(datalen))) | ||||
with vfs(name, 'w') as ofp: | with vfs(name, 'w') as ofp: | ||||
for chunk in util.filechunkiter(fp, limit=datalen): | for chunk in util.filechunkiter(fp, limit=datalen): | ||||
handledbytes += len(chunk) | progress.increment(step=len(chunk)) | ||||
progress(_('clone'), handledbytes, total=filesize, | |||||
unit=_('bytes')) | |||||
ofp.write(chunk) | ofp.write(chunk) | ||||
# force @filecache properties to be reloaded from | # force @filecache properties to be reloaded from | ||||
# streamclone-ed file at next access | # streamclone-ed file at next access | ||||
repo.invalidate(clearfilecache=True) | repo.invalidate(clearfilecache=True) | ||||
elapsed = util.timer() - start | elapsed = util.timer() - start | ||||
if elapsed <= 0: | if elapsed <= 0: | ||||
elapsed = 0.001 | elapsed = 0.001 | ||||
progress(_('clone'), None) | progress.update(None) | ||||
repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | ||||
(util.bytecount(handledbytes), elapsed, | (util.bytecount(progress.pos), elapsed, | ||||
util.bytecount(handledbytes / elapsed))) | util.bytecount(progress.pos / elapsed))) | ||||
def applybundlev2(repo, fp, filecount, filesize, requirements): | def applybundlev2(repo, fp, filecount, filesize, requirements): | ||||
missingreqs = [r for r in requirements if r not in repo.supported] | missingreqs = [r for r in requirements if r not in repo.supported] | ||||
if missingreqs: | if missingreqs: | ||||
raise error.Abort(_('unable to apply stream clone: ' | raise error.Abort(_('unable to apply stream clone: ' | ||||
'unsupported format: %s') % | 'unsupported format: %s') % | ||||
', '.join(sorted(missingreqs))) | ', '.join(sorted(missingreqs))) | ||||