Sleep based test synchronisation does not work.
Variation in machine performance and load can make the two process miss their
windows. Instead we migrate to explicit signaling through the file system as
other tests file are using.
Alphare |
hg-reviewers |
Sleep based test synchronisation does not work.
Variation in machine performance and load can make the two process miss their
windows. Instead we migrate to explicit signaling through the file system as
other tests file are using.
No Linters Available |
No Unit Test Coverage |
Path | Packages | |||
---|---|---|---|---|
M | mercurial/streamclone.py (12 lines) | |||
M | tests/test-clone-uncompressed.t (57 lines) | |||
A | M | tests/testlib/ext-stream-clone-steps.py (31 lines) |
Commit | Parents | Author | Summary | Date |
---|---|---|---|---|
2c8a84e67b60 | 29f5d32b043f | Pierre-Yves David | Apr 19 2021, 1:12 PM |
total_bytes = 0 | total_bytes = 0 | ||||
# Get consistent snapshot of repo, lock during scan. | # Get consistent snapshot of repo, lock during scan. | ||||
with repo.lock(): | with repo.lock(): | ||||
repo.ui.debug(b'scanning\n') | repo.ui.debug(b'scanning\n') | ||||
for file_type, name, ename, size in _walkstreamfiles(repo): | for file_type, name, ename, size in _walkstreamfiles(repo): | ||||
if size: | if size: | ||||
entries.append((name, size)) | entries.append((name, size)) | ||||
total_bytes += size | total_bytes += size | ||||
_test_sync_point_walk_1(repo) | |||||
_test_sync_point_walk_2(repo) | |||||
repo.ui.debug( | repo.ui.debug( | ||||
b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes) | b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes) | ||||
) | ) | ||||
svfs = repo.svfs | svfs = repo.svfs | ||||
debugflag = repo.ui.debugflag | debugflag = repo.ui.debugflag | ||||
for chunk in chunks: | for chunk in chunks: | ||||
seen += len(chunk) | seen += len(chunk) | ||||
progress.update(seen) | progress.update(seen) | ||||
yield chunk | yield chunk | ||||
finally: | finally: | ||||
fp.close() | fp.close() | ||||
def _test_sync_point_walk_1(repo): | |||||
"""a function for synchronisation during tests""" | |||||
def _test_sync_point_walk_2(repo): | |||||
"""a function for synchronisation during tests""" | |||||
def generatev2(repo, includes, excludes, includeobsmarkers): | def generatev2(repo, includes, excludes, includeobsmarkers): | ||||
"""Emit content for version 2 of a streaming clone. | """Emit content for version 2 of a streaming clone. | ||||
the data stream consists the following entries: | the data stream consists the following entries: | ||||
1) A char representing the file destination (eg: store or cache) | 1) A char representing the file destination (eg: store or cache) | ||||
2) A varint containing the length of the filename | 2) A varint containing the length of the filename | ||||
3) A varint containing the length of file data | 3) A varint containing the length of file data | ||||
4) N bytes containing the filename (the internal, store-agnostic form) | 4) N bytes containing the filename (the internal, store-agnostic form) | ||||
for name in cacheutil.cachetocopy(repo): | for name in cacheutil.cachetocopy(repo): | ||||
if repo.cachevfs.exists(name): | if repo.cachevfs.exists(name): | ||||
totalfilesize += repo.cachevfs.lstat(name).st_size | totalfilesize += repo.cachevfs.lstat(name).st_size | ||||
entries.append((_srccache, name, _filefull, None)) | entries.append((_srccache, name, _filefull, None)) | ||||
chunks = _emit2(repo, entries, totalfilesize) | chunks = _emit2(repo, entries, totalfilesize) | ||||
first = next(chunks) | first = next(chunks) | ||||
assert first is None | assert first is None | ||||
_test_sync_point_walk_1(repo) | |||||
_test_sync_point_walk_2(repo) | |||||
return len(entries), totalfilesize, chunks | return len(entries), totalfilesize, chunks | ||||
@contextlib.contextmanager | @contextlib.contextmanager | ||||
def nested(*ctxs): | def nested(*ctxs): | ||||
this = ctxs[0] | this = ctxs[0] | ||||
rest = ctxs[1:] | rest = ctxs[1:] |
Stream clone while repo is changing: | Stream clone while repo is changing: | ||||
$ mkdir changing | $ mkdir changing | ||||
$ cd changing | $ cd changing | ||||
extension for delaying the server process so we reliably can modify the repo | extension for delaying the server process so we reliably can modify the repo | ||||
while cloning | while cloning | ||||
$ cat > delayer.py <<EOF | $ cat > stream_steps.py <<EOF | ||||
> import time | > import os | ||||
> from mercurial import extensions, vfs | > import sys | ||||
> def __call__(orig, self, path, *args, **kwargs): | > from mercurial import ( | ||||
> if path == 'data/f1.i': | > encoding, | ||||
> time.sleep(2) | > extensions, | ||||
> return orig(self, path, *args, **kwargs) | > streamclone, | ||||
> extensions.wrapfunction(vfs.vfs, '__call__', __call__) | > testing, | ||||
> ) | |||||
> WALKED_FILE_1 = encoding.environ[b'HG_TEST_STREAM_WALKED_FILE_1'] | |||||
> WALKED_FILE_2 = encoding.environ[b'HG_TEST_STREAM_WALKED_FILE_2'] | |||||
> | |||||
> def _test_sync_point_walk_1(orig, repo): | |||||
> testing.write_file(WALKED_FILE_1) | |||||
> | |||||
> def _test_sync_point_walk_2(orig, repo): | |||||
> assert repo._currentlock(repo._lockref) is None | |||||
> testing.wait_file(WALKED_FILE_2) | |||||
> | |||||
> extensions.wrapfunction( | |||||
> streamclone, | |||||
> '_test_sync_point_walk_1', | |||||
> _test_sync_point_walk_1 | |||||
> ) | |||||
> extensions.wrapfunction( | |||||
> streamclone, | |||||
> '_test_sync_point_walk_2', | |||||
> _test_sync_point_walk_2 | |||||
> ) | |||||
> EOF | > EOF | ||||
prepare repo with small and big file to cover both code paths in emitrevlogdata | prepare repo with small and big file to cover both code paths in emitrevlogdata | ||||
$ hg init repo | $ hg init repo | ||||
$ touch repo/f1 | $ touch repo/f1 | ||||
$ $TESTDIR/seq.py 50000 > repo/f2 | $ $TESTDIR/seq.py 50000 > repo/f2 | ||||
$ hg -R repo ci -Aqm "0" | $ hg -R repo ci -Aqm "0" | ||||
$ hg serve -R repo -p $HGPORT1 -d --pid-file=hg.pid --config extensions.delayer=delayer.py | $ HG_TEST_STREAM_WALKED_FILE_1="$TESTTMP/sync_file_walked_1" | ||||
$ export HG_TEST_STREAM_WALKED_FILE_1 | |||||
$ HG_TEST_STREAM_WALKED_FILE_2="$TESTTMP/sync_file_walked_2" | |||||
$ export HG_TEST_STREAM_WALKED_FILE_2 | |||||
$ HG_TEST_STREAM_WALKED_FILE_3="$TESTTMP/sync_file_walked_3" | |||||
$ export HG_TEST_STREAM_WALKED_FILE_3 | |||||
# $ cat << EOF >> $HGRCPATH | |||||
# > [hooks] | |||||
# > pre-clone=rm -f "$TESTTMP/sync_file_walked_*" | |||||
# > EOF | |||||
$ hg serve -R repo -p $HGPORT1 -d --error errors.log --pid-file=hg.pid --config extensions.stream_steps="$RUNTESTDIR/testlib/ext-stream-clone-steps.py" | |||||
$ cat hg.pid >> $DAEMON_PIDS | $ cat hg.pid >> $DAEMON_PIDS | ||||
clone while modifying the repo between stating file with write lock and | clone while modifying the repo between stating file with write lock and | ||||
actually serving file content | actually serving file content | ||||
$ hg clone -q --stream -U http://localhost:$HGPORT1 clone & | $ (hg clone -q --stream -U http://localhost:$HGPORT1 clone; touch "$HG_TEST_STREAM_WALKED_FILE_3") & | ||||
$ sleep 1 | $ $RUNTESTDIR/testlib/wait-on-file 10 $HG_TEST_STREAM_WALKED_FILE_1 | ||||
$ echo >> repo/f1 | $ echo >> repo/f1 | ||||
$ echo >> repo/f2 | $ echo >> repo/f2 | ||||
$ hg -R repo ci -m "1" --config ui.timeout.warn=-1 | $ hg -R repo ci -m "1" --config ui.timeout.warn=-1 | ||||
$ wait | $ touch $HG_TEST_STREAM_WALKED_FILE_2 | ||||
$ $RUNTESTDIR/testlib/wait-on-file 10 $HG_TEST_STREAM_WALKED_FILE_3 | |||||
$ hg -R clone id | $ hg -R clone id | ||||
000000000000 | 000000000000 | ||||
$ cat errors.log | |||||
$ cd .. | $ cd .. | ||||
Stream repository with bookmarks | Stream repository with bookmarks | ||||
-------------------------------- | -------------------------------- | ||||
(revert introduction of secret changeset) | (revert introduction of secret changeset) | ||||
$ hg -R server phase --draft 'secret()' | $ hg -R server phase --draft 'secret()' |
from __future__ import absolute_import | |||||
from mercurial import ( | |||||
encoding, | |||||
extensions, | |||||
streamclone, | |||||
testing, | |||||
) | |||||
WALKED_FILE_1 = encoding.environ[b'HG_TEST_STREAM_WALKED_FILE_1'] | |||||
WALKED_FILE_2 = encoding.environ[b'HG_TEST_STREAM_WALKED_FILE_2'] | |||||
def _test_sync_point_walk_1(orig, repo): | |||||
testing.write_file(WALKED_FILE_1) | |||||
def _test_sync_point_walk_2(orig, repo): | |||||
assert repo._currentlock(repo._lockref) is None | |||||
testing.wait_file(WALKED_FILE_2) | |||||
def uisetup(ui): | |||||
extensions.wrapfunction( | |||||
streamclone, '_test_sync_point_walk_1', _test_sync_point_walk_1 | |||||
) | |||||
extensions.wrapfunction( | |||||
streamclone, '_test_sync_point_walk_2', _test_sync_point_walk_2 | |||||
) |