Currently worker supports running functions that return a progress
iterator. Generalize it to handle function that return a progress
iterator then a return value.
It's unused in this commit, but will be used in the next one.
Currently worker supports running functions that return a progress
iterator. Generalize it to handle function that return a progress
iterator then a return value.
It's unused in this commit, but will be used in the next one.
| Automatic diff as part of commit; lint not applicable. |
| Automatic diff as part of commit; unit tests not applicable. |
| Path | Packages | |||
|---|---|---|---|---|
| M | mercurial/worker.py (51 lines) |
| Status | Author | Revision | |
|---|---|---|---|
| Closed | valentin.gatienbaron | ||
| Closed | valentin.gatienbaron | ||
| Closed | valentin.gatienbaron |
| if not threadsafe and _DISALLOW_THREAD_UNSAFE: | if not threadsafe and _DISALLOW_THREAD_UNSAFE: | ||||
| return False | return False | ||||
| linear = costperop * nops | linear = costperop * nops | ||||
| workers = _numworkers(ui) | workers = _numworkers(ui) | ||||
| benefit = linear - (_STARTUP_COST * workers + linear / workers) | benefit = linear - (_STARTUP_COST * workers + linear / workers) | ||||
| return benefit >= 0.15 | return benefit >= 0.15 | ||||
| def worker(ui, costperarg, func, staticargs, args, threadsafe=True): | def worker(ui, costperarg, func, staticargs, args, hasretval=False, | ||||
| threadsafe=True): | |||||
| '''run a function, possibly in parallel in multiple worker | '''run a function, possibly in parallel in multiple worker | ||||
| processes. | processes. | ||||
| returns a progress iterator | returns a progress iterator | ||||
| costperarg - cost of a single task | costperarg - cost of a single task | ||||
| func - function to run | func - function to run. It is expected to return a progress iterator. | ||||
| staticargs - arguments to pass to every invocation of the function | staticargs - arguments to pass to every invocation of the function | ||||
| args - arguments to split into chunks, to pass to individual | args - arguments to split into chunks, to pass to individual | ||||
| workers | workers | ||||
| hasretval - when True, func and the current function return an progress | |||||
| iterator then a list (encoded as an iterator that yield many (False, ..) | |||||
| then a (True, list)). The resulting list is in the natural order. | |||||
| threadsafe - whether work items are thread safe and can be executed using | threadsafe - whether work items are thread safe and can be executed using | ||||
| a thread-based worker. Should be disabled for CPU heavy tasks that don't | a thread-based worker. Should be disabled for CPU heavy tasks that don't | ||||
| release the GIL. | release the GIL. | ||||
| ''' | ''' | ||||
| enabled = ui.configbool('worker', 'enabled') | enabled = ui.configbool('worker', 'enabled') | ||||
| if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): | if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): | ||||
| return _platformworker(ui, func, staticargs, args) | return _platformworker(ui, func, staticargs, args, hasretval) | ||||
| return func(*staticargs + (args,)) | return func(*staticargs + (args,)) | ||||
| def _posixworker(ui, func, staticargs, args): | def _posixworker(ui, func, staticargs, args, hasretval): | ||||
| workers = _numworkers(ui) | workers = _numworkers(ui) | ||||
| oldhandler = signal.getsignal(signal.SIGINT) | oldhandler = signal.getsignal(signal.SIGINT) | ||||
| signal.signal(signal.SIGINT, signal.SIG_IGN) | signal.signal(signal.SIGINT, signal.SIG_IGN) | ||||
| pids, problem = set(), [0] | pids, problem = set(), [0] | ||||
| def killworkers(): | def killworkers(): | ||||
| # unregister SIGCHLD handler as all children will be killed. This | # unregister SIGCHLD handler as all children will be killed. This | ||||
| # function shouldn't be interrupted by another SIGCHLD; otherwise pids | # function shouldn't be interrupted by another SIGCHLD; otherwise pids | ||||
| # could be updated while iterating, which would cause inconsistency. | # could be updated while iterating, which would cause inconsistency. | ||||
| def sigchldhandler(signum, frame): | def sigchldhandler(signum, frame): | ||||
| waitforworkers(blocking=False) | waitforworkers(blocking=False) | ||||
| if problem[0]: | if problem[0]: | ||||
| killworkers() | killworkers() | ||||
| oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) | oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) | ||||
| ui.flush() | ui.flush() | ||||
| parentpid = os.getpid() | parentpid = os.getpid() | ||||
| pipes = [] | pipes = [] | ||||
| for pargs in partition(args, workers): | retvals = [] | ||||
| for i, pargs in enumerate(partition(args, workers)): | |||||
| # Every worker gets its own pipe to send results on, so we don't have to | # Every worker gets its own pipe to send results on, so we don't have to | ||||
| # implement atomic writes larger than PIPE_BUF. Each forked process has | # implement atomic writes larger than PIPE_BUF. Each forked process has | ||||
| # its own pipe's descriptors in the local variables, and the parent | # its own pipe's descriptors in the local variables, and the parent | ||||
| # process has the full list of pipe descriptors (and it doesn't really | # process has the full list of pipe descriptors (and it doesn't really | ||||
| # care what order they're in). | # care what order they're in). | ||||
| rfd, wfd = os.pipe() | rfd, wfd = os.pipe() | ||||
| pipes.append((rfd, wfd)) | pipes.append((rfd, wfd)) | ||||
| retvals.append(None) | |||||
| # make sure we use os._exit in all worker code paths. otherwise the | # make sure we use os._exit in all worker code paths. otherwise the | ||||
| # worker may do some clean-ups which could cause surprises like | # worker may do some clean-ups which could cause surprises like | ||||
| # deadlock. see sshpeer.cleanup for example. | # deadlock. see sshpeer.cleanup for example. | ||||
| # override error handling *before* fork. this is necessary because | # override error handling *before* fork. this is necessary because | ||||
| # exception (signal) may arrive after fork, before "pid =" assignment | # exception (signal) may arrive after fork, before "pid =" assignment | ||||
| # completes, and other exception handler (dispatch.py) can lead to | # completes, and other exception handler (dispatch.py) can lead to | ||||
| # unexpected code path without os._exit. | # unexpected code path without os._exit. | ||||
| ret = -1 | ret = -1 | ||||
| try: | try: | ||||
| pid = os.fork() | pid = os.fork() | ||||
| if pid == 0: | if pid == 0: | ||||
| signal.signal(signal.SIGINT, oldhandler) | signal.signal(signal.SIGINT, oldhandler) | ||||
| signal.signal(signal.SIGCHLD, oldchldhandler) | signal.signal(signal.SIGCHLD, oldchldhandler) | ||||
| def workerfunc(): | def workerfunc(): | ||||
| for r, w in pipes[:-1]: | for r, w in pipes[:-1]: | ||||
| os.close(r) | os.close(r) | ||||
| os.close(w) | os.close(w) | ||||
| os.close(rfd) | os.close(rfd) | ||||
| for result in func(*(staticargs + (pargs,))): | for result in func(*(staticargs + (pargs,))): | ||||
| os.write(wfd, util.pickle.dumps(result)) | os.write(wfd, util.pickle.dumps((i, result))) | ||||
| return 0 | return 0 | ||||
| ret = scmutil.callcatch(ui, workerfunc) | ret = scmutil.callcatch(ui, workerfunc) | ||||
| except: # parent re-raises, child never returns | except: # parent re-raises, child never returns | ||||
| if os.getpid() == parentpid: | if os.getpid() == parentpid: | ||||
| raise | raise | ||||
| exctype = sys.exc_info()[0] | exctype = sys.exc_info()[0] | ||||
| force = not issubclass(exctype, KeyboardInterrupt) | force = not issubclass(exctype, KeyboardInterrupt) | ||||
| signal.signal(signal.SIGCHLD, oldchldhandler) | signal.signal(signal.SIGCHLD, oldchldhandler) | ||||
| selector.close() | selector.close() | ||||
| return problem[0] | return problem[0] | ||||
| try: | try: | ||||
| openpipes = len(pipes) | openpipes = len(pipes) | ||||
| while openpipes > 0: | while openpipes > 0: | ||||
| for key, events in selector.select(): | for key, events in selector.select(): | ||||
| try: | try: | ||||
| yield util.pickle.load(key.fileobj) | i, res = util.pickle.load(key.fileobj) | ||||
| if hasretval and res[0]: | |||||
| retvals[i] = res[1] | |||||
| else: | |||||
| yield res | |||||
| except EOFError: | except EOFError: | ||||
| selector.unregister(key.fileobj) | selector.unregister(key.fileobj) | ||||
| key.fileobj.close() | key.fileobj.close() | ||||
| openpipes -= 1 | openpipes -= 1 | ||||
| except IOError as e: | except IOError as e: | ||||
| if e.errno == errno.EINTR: | if e.errno == errno.EINTR: | ||||
| continue | continue | ||||
| raise | raise | ||||
| except: # re-raises | except: # re-raises | ||||
| killworkers() | killworkers() | ||||
| cleanup() | cleanup() | ||||
| raise | raise | ||||
| status = cleanup() | status = cleanup() | ||||
| if status: | if status: | ||||
| if status < 0: | if status < 0: | ||||
| os.kill(os.getpid(), -status) | os.kill(os.getpid(), -status) | ||||
| sys.exit(status) | sys.exit(status) | ||||
| if hasretval: | |||||
| yield True, sum(retvals, []) | |||||
| def _posixexitstatus(code): | def _posixexitstatus(code): | ||||
| '''convert a posix exit status into the same form returned by | '''convert a posix exit status into the same form returned by | ||||
| os.spawnv | os.spawnv | ||||
| returns None if the process was stopped instead of exiting''' | returns None if the process was stopped instead of exiting''' | ||||
| if os.WIFEXITED(code): | if os.WIFEXITED(code): | ||||
| return os.WEXITSTATUS(code) | return os.WEXITSTATUS(code) | ||||
| elif os.WIFSIGNALED(code): | elif os.WIFSIGNALED(code): | ||||
| return -os.WTERMSIG(code) | return -os.WTERMSIG(code) | ||||
| def _windowsworker(ui, func, staticargs, args): | def _windowsworker(ui, func, staticargs, args, hasretval): | ||||
| class Worker(threading.Thread): | class Worker(threading.Thread): | ||||
| def __init__(self, taskqueue, resultqueue, func, staticargs, *args, | def __init__(self, taskqueue, resultqueue, func, staticargs, *args, | ||||
| **kwargs): | **kwargs): | ||||
| threading.Thread.__init__(self, *args, **kwargs) | threading.Thread.__init__(self, *args, **kwargs) | ||||
| self._taskqueue = taskqueue | self._taskqueue = taskqueue | ||||
| self._resultqueue = resultqueue | self._resultqueue = resultqueue | ||||
| self._func = func | self._func = func | ||||
| self._staticargs = staticargs | self._staticargs = staticargs | ||||
| self._interrupted = False | self._interrupted = False | ||||
| self.daemon = True | self.daemon = True | ||||
| self.exception = None | self.exception = None | ||||
| def interrupt(self): | def interrupt(self): | ||||
| self._interrupted = True | self._interrupted = True | ||||
| def run(self): | def run(self): | ||||
| try: | try: | ||||
| while not self._taskqueue.empty(): | while not self._taskqueue.empty(): | ||||
| try: | try: | ||||
| args = self._taskqueue.get_nowait() | i, args = self._taskqueue.get_nowait() | ||||
| for res in self._func(*self._staticargs + (args,)): | for res in self._func(*self._staticargs + (args,)): | ||||
| self._resultqueue.put(res) | self._resultqueue.put((i, res)) | ||||
| # threading doesn't provide a native way to | # threading doesn't provide a native way to | ||||
| # interrupt execution. handle it manually at every | # interrupt execution. handle it manually at every | ||||
| # iteration. | # iteration. | ||||
| if self._interrupted: | if self._interrupted: | ||||
| return | return | ||||
| except pycompat.queue.Empty: | except pycompat.queue.Empty: | ||||
| break | break | ||||
| except Exception as e: | except Exception as e: | ||||
| # task and does not get to handle the interruption. | # task and does not get to handle the interruption. | ||||
| ui.warn(_("failed to kill worker threads while " | ui.warn(_("failed to kill worker threads while " | ||||
| "handling an exception\n")) | "handling an exception\n")) | ||||
| return | return | ||||
| workers = _numworkers(ui) | workers = _numworkers(ui) | ||||
| resultqueue = pycompat.queue.Queue() | resultqueue = pycompat.queue.Queue() | ||||
| taskqueue = pycompat.queue.Queue() | taskqueue = pycompat.queue.Queue() | ||||
| retvals = [] | |||||
| # partition work to more pieces than workers to minimize the chance | # partition work to more pieces than workers to minimize the chance | ||||
| # of uneven distribution of large tasks between the workers | # of uneven distribution of large tasks between the workers | ||||
| for pargs in partition(args, workers * 20): | for pargs in enumerate(partition(args, workers * 20)): | ||||
| retvals.append(None) | |||||
| taskqueue.put(pargs) | taskqueue.put(pargs) | ||||
| for _i in range(workers): | for _i in range(workers): | ||||
| t = Worker(taskqueue, resultqueue, func, staticargs) | t = Worker(taskqueue, resultqueue, func, staticargs) | ||||
| threads.append(t) | threads.append(t) | ||||
| t.start() | t.start() | ||||
| try: | try: | ||||
| while len(threads) > 0: | while len(threads) > 0: | ||||
| while not resultqueue.empty(): | while not resultqueue.empty(): | ||||
| yield resultqueue.get() | (i, res) = resultqueue.get() | ||||
| if hasretval and res[0]: | |||||
| retvals[i] = res[1] | |||||
| else: | |||||
| yield res | |||||
| threads[0].join(0.05) | threads[0].join(0.05) | ||||
| finishedthreads = [_t for _t in threads if not _t.is_alive()] | finishedthreads = [_t for _t in threads if not _t.is_alive()] | ||||
| for t in finishedthreads: | for t in finishedthreads: | ||||
| if t.exception is not None: | if t.exception is not None: | ||||
| raise t.exception | raise t.exception | ||||
| threads.remove(t) | threads.remove(t) | ||||
| except (Exception, KeyboardInterrupt): # re-raises | except (Exception, KeyboardInterrupt): # re-raises | ||||
| trykillworkers() | trykillworkers() | ||||
| raise | raise | ||||
| while not resultqueue.empty(): | while not resultqueue.empty(): | ||||
| yield resultqueue.get() | (i, res) = resultqueue.get() | ||||
| if hasretval and res[0]: | |||||
| retvals[i] = res[1] | |||||
| else: | |||||
| yield res | |||||
| if hasretval: | |||||
| yield True, sum(retvals, []) | |||||
| if pycompat.iswindows: | if pycompat.iswindows: | ||||
| _platformworker = _windowsworker | _platformworker = _windowsworker | ||||
| else: | else: | ||||
| _platformworker = _posixworker | _platformworker = _posixworker | ||||
| _exitstatus = _posixexitstatus | _exitstatus = _posixexitstatus | ||||
| def partition(lst, nslices): | def partition(lst, nslices): | ||||