diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -9,6 +9,7 @@ import errno import os +import select import signal import sys import threading @@ -89,7 +90,6 @@ return func(*staticargs + (args,)) def _posixworker(ui, func, staticargs, args): - rfd, wfd = os.pipe() workers = _numworkers(ui) oldhandler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -138,7 +138,15 @@ oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) ui.flush() parentpid = os.getpid() + pipes = [] for pargs in partition(args, workers): + # Every worker gets its own pipe to send results on, so we don't have to + # implement atomic writes larger than PIPE_BUF. Each forked process has + # its own pipe's descriptors in the local variables, and the parent + # process has the full list of pipe descriptors (and it doesn't really + # care what order they're in). + rfd, wfd = os.pipe() + pipes.append((rfd, wfd)) # make sure we use os._exit in all worker code paths. otherwise the # worker may do some clean-ups which could cause surprises like # deadlock. see sshpeer.cleanup for example. @@ -175,8 +183,10 @@ finally: os._exit(ret & 255) pids.add(pid) - os.close(wfd) - fp = os.fdopen(rfd, r'rb', 0) + fps = [] + for rfd, wfd in pipes: + os.close(wfd) + fps.append(os.fdopen(rfd, r'rb', 0)) def cleanup(): signal.signal(signal.SIGINT, oldhandler) waitforworkers() @@ -187,15 +197,23 @@ os.kill(os.getpid(), -status) sys.exit(status) try: - while True: + while fps: try: - yield util.pickle.load(fp) - except EOFError: - break - except IOError as e: - if e.errno == errno.EINTR: + rlist, wlist, xlist = select.select(fps, [], fps) + except select.error as e: + if e[0] == errno.EINTR: continue raise + for fp in rlist + xlist: + try: + yield util.pickle.load(fp) + except EOFError: + fp.close() + except IOError as e: + if e.errno == errno.EINTR: + continue + raise + fps = [fp for fp in fps if not fp.closed] except: # re-raises killworkers() cleanup()