Currently worker supports running functions that return a progress
iterator. Generalize it to handle function that return a progress
iterator then a return value.
It's unused in this commit, but will be used in the next one.
Currently worker supports running functions that return a progress
iterator. Generalize it to handle function that return a progress
iterator then a return value.
It's unused in this commit, but will be used in the next one.
Automatic diff as part of commit; lint not applicable. |
Automatic diff as part of commit; unit tests not applicable. |
Path | Packages | |||
---|---|---|---|---|
M | mercurial/worker.py (51 lines) |
Status | Author | Revision | |
---|---|---|---|
Closed | valentin.gatienbaron | ||
Closed | valentin.gatienbaron | ||
Closed | valentin.gatienbaron |
if not threadsafe and _DISALLOW_THREAD_UNSAFE: | if not threadsafe and _DISALLOW_THREAD_UNSAFE: | ||||
return False | return False | ||||
linear = costperop * nops | linear = costperop * nops | ||||
workers = _numworkers(ui) | workers = _numworkers(ui) | ||||
benefit = linear - (_STARTUP_COST * workers + linear / workers) | benefit = linear - (_STARTUP_COST * workers + linear / workers) | ||||
return benefit >= 0.15 | return benefit >= 0.15 | ||||
def worker(ui, costperarg, func, staticargs, args, threadsafe=True): | def worker(ui, costperarg, func, staticargs, args, hasretval=False, | ||||
threadsafe=True): | |||||
'''run a function, possibly in parallel in multiple worker | '''run a function, possibly in parallel in multiple worker | ||||
processes. | processes. | ||||
returns a progress iterator | returns a progress iterator | ||||
costperarg - cost of a single task | costperarg - cost of a single task | ||||
func - function to run | func - function to run. It is expected to return a progress iterator. | ||||
staticargs - arguments to pass to every invocation of the function | staticargs - arguments to pass to every invocation of the function | ||||
args - arguments to split into chunks, to pass to individual | args - arguments to split into chunks, to pass to individual | ||||
workers | workers | ||||
hasretval - when True, func and the current function return an progress | |||||
iterator then a list (encoded as an iterator that yield many (False, ..) | |||||
then a (True, list)). The resulting list is in the natural order. | |||||
threadsafe - whether work items are thread safe and can be executed using | threadsafe - whether work items are thread safe and can be executed using | ||||
a thread-based worker. Should be disabled for CPU heavy tasks that don't | a thread-based worker. Should be disabled for CPU heavy tasks that don't | ||||
release the GIL. | release the GIL. | ||||
''' | ''' | ||||
enabled = ui.configbool('worker', 'enabled') | enabled = ui.configbool('worker', 'enabled') | ||||
if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): | if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): | ||||
return _platformworker(ui, func, staticargs, args) | return _platformworker(ui, func, staticargs, args, hasretval) | ||||
return func(*staticargs + (args,)) | return func(*staticargs + (args,)) | ||||
def _posixworker(ui, func, staticargs, args): | def _posixworker(ui, func, staticargs, args, hasretval): | ||||
workers = _numworkers(ui) | workers = _numworkers(ui) | ||||
oldhandler = signal.getsignal(signal.SIGINT) | oldhandler = signal.getsignal(signal.SIGINT) | ||||
signal.signal(signal.SIGINT, signal.SIG_IGN) | signal.signal(signal.SIGINT, signal.SIG_IGN) | ||||
pids, problem = set(), [0] | pids, problem = set(), [0] | ||||
def killworkers(): | def killworkers(): | ||||
# unregister SIGCHLD handler as all children will be killed. This | # unregister SIGCHLD handler as all children will be killed. This | ||||
# function shouldn't be interrupted by another SIGCHLD; otherwise pids | # function shouldn't be interrupted by another SIGCHLD; otherwise pids | ||||
# could be updated while iterating, which would cause inconsistency. | # could be updated while iterating, which would cause inconsistency. | ||||
def sigchldhandler(signum, frame): | def sigchldhandler(signum, frame): | ||||
waitforworkers(blocking=False) | waitforworkers(blocking=False) | ||||
if problem[0]: | if problem[0]: | ||||
killworkers() | killworkers() | ||||
oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) | oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) | ||||
ui.flush() | ui.flush() | ||||
parentpid = os.getpid() | parentpid = os.getpid() | ||||
pipes = [] | pipes = [] | ||||
for pargs in partition(args, workers): | retvals = [] | ||||
for i, pargs in enumerate(partition(args, workers)): | |||||
# Every worker gets its own pipe to send results on, so we don't have to | # Every worker gets its own pipe to send results on, so we don't have to | ||||
# implement atomic writes larger than PIPE_BUF. Each forked process has | # implement atomic writes larger than PIPE_BUF. Each forked process has | ||||
# its own pipe's descriptors in the local variables, and the parent | # its own pipe's descriptors in the local variables, and the parent | ||||
# process has the full list of pipe descriptors (and it doesn't really | # process has the full list of pipe descriptors (and it doesn't really | ||||
# care what order they're in). | # care what order they're in). | ||||
rfd, wfd = os.pipe() | rfd, wfd = os.pipe() | ||||
pipes.append((rfd, wfd)) | pipes.append((rfd, wfd)) | ||||
retvals.append(None) | |||||
# make sure we use os._exit in all worker code paths. otherwise the | # make sure we use os._exit in all worker code paths. otherwise the | ||||
# worker may do some clean-ups which could cause surprises like | # worker may do some clean-ups which could cause surprises like | ||||
# deadlock. see sshpeer.cleanup for example. | # deadlock. see sshpeer.cleanup for example. | ||||
# override error handling *before* fork. this is necessary because | # override error handling *before* fork. this is necessary because | ||||
# exception (signal) may arrive after fork, before "pid =" assignment | # exception (signal) may arrive after fork, before "pid =" assignment | ||||
# completes, and other exception handler (dispatch.py) can lead to | # completes, and other exception handler (dispatch.py) can lead to | ||||
# unexpected code path without os._exit. | # unexpected code path without os._exit. | ||||
ret = -1 | ret = -1 | ||||
try: | try: | ||||
pid = os.fork() | pid = os.fork() | ||||
if pid == 0: | if pid == 0: | ||||
signal.signal(signal.SIGINT, oldhandler) | signal.signal(signal.SIGINT, oldhandler) | ||||
signal.signal(signal.SIGCHLD, oldchldhandler) | signal.signal(signal.SIGCHLD, oldchldhandler) | ||||
def workerfunc(): | def workerfunc(): | ||||
for r, w in pipes[:-1]: | for r, w in pipes[:-1]: | ||||
os.close(r) | os.close(r) | ||||
os.close(w) | os.close(w) | ||||
os.close(rfd) | os.close(rfd) | ||||
for result in func(*(staticargs + (pargs,))): | for result in func(*(staticargs + (pargs,))): | ||||
os.write(wfd, util.pickle.dumps(result)) | os.write(wfd, util.pickle.dumps((i, result))) | ||||
return 0 | return 0 | ||||
ret = scmutil.callcatch(ui, workerfunc) | ret = scmutil.callcatch(ui, workerfunc) | ||||
except: # parent re-raises, child never returns | except: # parent re-raises, child never returns | ||||
if os.getpid() == parentpid: | if os.getpid() == parentpid: | ||||
raise | raise | ||||
exctype = sys.exc_info()[0] | exctype = sys.exc_info()[0] | ||||
force = not issubclass(exctype, KeyboardInterrupt) | force = not issubclass(exctype, KeyboardInterrupt) | ||||
signal.signal(signal.SIGCHLD, oldchldhandler) | signal.signal(signal.SIGCHLD, oldchldhandler) | ||||
selector.close() | selector.close() | ||||
return problem[0] | return problem[0] | ||||
try: | try: | ||||
openpipes = len(pipes) | openpipes = len(pipes) | ||||
while openpipes > 0: | while openpipes > 0: | ||||
for key, events in selector.select(): | for key, events in selector.select(): | ||||
try: | try: | ||||
yield util.pickle.load(key.fileobj) | i, res = util.pickle.load(key.fileobj) | ||||
if hasretval and res[0]: | |||||
retvals[i] = res[1] | |||||
else: | |||||
yield res | |||||
except EOFError: | except EOFError: | ||||
selector.unregister(key.fileobj) | selector.unregister(key.fileobj) | ||||
key.fileobj.close() | key.fileobj.close() | ||||
openpipes -= 1 | openpipes -= 1 | ||||
except IOError as e: | except IOError as e: | ||||
if e.errno == errno.EINTR: | if e.errno == errno.EINTR: | ||||
continue | continue | ||||
raise | raise | ||||
except: # re-raises | except: # re-raises | ||||
killworkers() | killworkers() | ||||
cleanup() | cleanup() | ||||
raise | raise | ||||
status = cleanup() | status = cleanup() | ||||
if status: | if status: | ||||
if status < 0: | if status < 0: | ||||
os.kill(os.getpid(), -status) | os.kill(os.getpid(), -status) | ||||
sys.exit(status) | sys.exit(status) | ||||
if hasretval: | |||||
yield True, sum(retvals, []) | |||||
def _posixexitstatus(code): | def _posixexitstatus(code): | ||||
'''convert a posix exit status into the same form returned by | '''convert a posix exit status into the same form returned by | ||||
os.spawnv | os.spawnv | ||||
returns None if the process was stopped instead of exiting''' | returns None if the process was stopped instead of exiting''' | ||||
if os.WIFEXITED(code): | if os.WIFEXITED(code): | ||||
return os.WEXITSTATUS(code) | return os.WEXITSTATUS(code) | ||||
elif os.WIFSIGNALED(code): | elif os.WIFSIGNALED(code): | ||||
return -os.WTERMSIG(code) | return -os.WTERMSIG(code) | ||||
def _windowsworker(ui, func, staticargs, args): | def _windowsworker(ui, func, staticargs, args, hasretval): | ||||
class Worker(threading.Thread): | class Worker(threading.Thread): | ||||
def __init__(self, taskqueue, resultqueue, func, staticargs, *args, | def __init__(self, taskqueue, resultqueue, func, staticargs, *args, | ||||
**kwargs): | **kwargs): | ||||
threading.Thread.__init__(self, *args, **kwargs) | threading.Thread.__init__(self, *args, **kwargs) | ||||
self._taskqueue = taskqueue | self._taskqueue = taskqueue | ||||
self._resultqueue = resultqueue | self._resultqueue = resultqueue | ||||
self._func = func | self._func = func | ||||
self._staticargs = staticargs | self._staticargs = staticargs | ||||
self._interrupted = False | self._interrupted = False | ||||
self.daemon = True | self.daemon = True | ||||
self.exception = None | self.exception = None | ||||
def interrupt(self): | def interrupt(self): | ||||
self._interrupted = True | self._interrupted = True | ||||
def run(self): | def run(self): | ||||
try: | try: | ||||
while not self._taskqueue.empty(): | while not self._taskqueue.empty(): | ||||
try: | try: | ||||
args = self._taskqueue.get_nowait() | i, args = self._taskqueue.get_nowait() | ||||
for res in self._func(*self._staticargs + (args,)): | for res in self._func(*self._staticargs + (args,)): | ||||
self._resultqueue.put(res) | self._resultqueue.put((i, res)) | ||||
# threading doesn't provide a native way to | # threading doesn't provide a native way to | ||||
# interrupt execution. handle it manually at every | # interrupt execution. handle it manually at every | ||||
# iteration. | # iteration. | ||||
if self._interrupted: | if self._interrupted: | ||||
return | return | ||||
except pycompat.queue.Empty: | except pycompat.queue.Empty: | ||||
break | break | ||||
except Exception as e: | except Exception as e: | ||||
# task and does not get to handle the interruption. | # task and does not get to handle the interruption. | ||||
ui.warn(_("failed to kill worker threads while " | ui.warn(_("failed to kill worker threads while " | ||||
"handling an exception\n")) | "handling an exception\n")) | ||||
return | return | ||||
workers = _numworkers(ui) | workers = _numworkers(ui) | ||||
resultqueue = pycompat.queue.Queue() | resultqueue = pycompat.queue.Queue() | ||||
taskqueue = pycompat.queue.Queue() | taskqueue = pycompat.queue.Queue() | ||||
retvals = [] | |||||
# partition work to more pieces than workers to minimize the chance | # partition work to more pieces than workers to minimize the chance | ||||
# of uneven distribution of large tasks between the workers | # of uneven distribution of large tasks between the workers | ||||
for pargs in partition(args, workers * 20): | for pargs in enumerate(partition(args, workers * 20)): | ||||
retvals.append(None) | |||||
taskqueue.put(pargs) | taskqueue.put(pargs) | ||||
for _i in range(workers): | for _i in range(workers): | ||||
t = Worker(taskqueue, resultqueue, func, staticargs) | t = Worker(taskqueue, resultqueue, func, staticargs) | ||||
threads.append(t) | threads.append(t) | ||||
t.start() | t.start() | ||||
try: | try: | ||||
while len(threads) > 0: | while len(threads) > 0: | ||||
while not resultqueue.empty(): | while not resultqueue.empty(): | ||||
yield resultqueue.get() | (i, res) = resultqueue.get() | ||||
if hasretval and res[0]: | |||||
retvals[i] = res[1] | |||||
else: | |||||
yield res | |||||
threads[0].join(0.05) | threads[0].join(0.05) | ||||
finishedthreads = [_t for _t in threads if not _t.is_alive()] | finishedthreads = [_t for _t in threads if not _t.is_alive()] | ||||
for t in finishedthreads: | for t in finishedthreads: | ||||
if t.exception is not None: | if t.exception is not None: | ||||
raise t.exception | raise t.exception | ||||
threads.remove(t) | threads.remove(t) | ||||
except (Exception, KeyboardInterrupt): # re-raises | except (Exception, KeyboardInterrupt): # re-raises | ||||
trykillworkers() | trykillworkers() | ||||
raise | raise | ||||
while not resultqueue.empty(): | while not resultqueue.empty(): | ||||
yield resultqueue.get() | (i, res) = resultqueue.get() | ||||
if hasretval and res[0]: | |||||
retvals[i] = res[1] | |||||
else: | |||||
yield res | |||||
if hasretval: | |||||
yield True, sum(retvals, []) | |||||
if pycompat.iswindows: | if pycompat.iswindows: | ||||
_platformworker = _windowsworker | _platformworker = _windowsworker | ||||
else: | else: | ||||
_platformworker = _posixworker | _platformworker = _posixworker | ||||
_exitstatus = _posixexitstatus | _exitstatus = _posixexitstatus | ||||
def partition(lst, nslices): | def partition(lst, nslices): |