diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -65,6 +65,41 @@ return min(max(countcpus(), 4), 32) +if pycompat.ispy3: + + class _blockingreader(object): + def __init__(self, wrapped): + self._wrapped = wrapped + + def __getattr__(self, attr): + return getattr(self._wrapped, attr) + + # issue multiple reads until size is fulfilled + def read(self, size=-1): + if size < 0: + return self._wrapped.readall() + + buf = bytearray(size) + view = memoryview(buf) + pos = 0 + + while pos < size: + ret = self._wrapped.readinto(view[pos:]) + if not ret: + break + pos += ret + + del view + del buf[pos:] + return buf + + +else: + + def _blockingreader(wrapped): + return wrapped + + if pycompat.isposix or pycompat.iswindows: _STARTUP_COST = 0.01 # The Windows worker is thread based. If tasks are CPU bound, threads @@ -226,7 +261,7 @@ selector = selectors.DefaultSelector() for rfd, wfd in pipes: os.close(wfd) - selector.register(os.fdopen(rfd, 'rb'), selectors.EVENT_READ) + selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ) def cleanup(): signal.signal(signal.SIGINT, oldhandler) @@ -240,7 +275,7 @@ while openpipes > 0: for key, events in selector.select(): try: - res = util.pickle.load(key.fileobj) + res = util.pickle.load(_blockingreader(key.fileobj)) if hasretval and res[0]: retval.update(res[1]) else: diff --git a/tests/test-worker.t b/tests/test-worker.t --- a/tests/test-worker.t +++ b/tests/test-worker.t @@ -131,4 +131,35 @@ abort: known exception [255] +Do not crash on partially read result + + $ cat > $TESTTMP/detecttruncated.py < from __future__ import absolute_import + > import os + > import sys + > import time + > sys.unraisablehook = lambda x: None + > oldwrite = os.write + > def splitwrite(fd, string): + > ret = oldwrite(fd, string[:9]) + > if ret == 9: + > time.sleep(0.1) + > ret += oldwrite(fd, string[9:]) + > return ret + > os.write = splitwrite + > EOF + + $ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \ + > "extensions.d=$TESTTMP/detecttruncated.py" test 100000.0 + start + run + run + run + run + run + run + run + run + done + #endif