diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -11,6 +11,7 @@ import os import signal import sys +import threading from .i18n import _ from . import ( @@ -53,7 +54,7 @@ raise error.Abort(_('number of cpus must be an integer')) return min(max(countcpus(), 4), 32) -if pycompat.isposix: +if pycompat.isposix or pycompat.iswindows: _startupcost = 0.01 else: _startupcost = 1e30 @@ -203,7 +204,51 @@ elif os.WIFSIGNALED(code): return -os.WTERMSIG(code) -if not pycompat.iswindows: +def _windowsworker(ui, func, staticargs, args): + class Worker(threading.Thread): + def __init__(self, taskqueue, resultqueue, func, staticargs, + group=None, target=None, name=None, verbose=None): + threading.Thread.__init__(self, group=group, target=target, + name=name, verbose=verbose) + self._taskqueue = taskqueue + self._resultqueue = resultqueue + self._func = func + self._staticargs = staticargs + + def run(self): + while not self._taskqueue.empty(): + try: + args = self._taskqueue.get_nowait() + for res in self._func(*self._staticargs + (args,)): + self._resultqueue.put(res) + except util.empty: + break + + workers = _numworkers(ui) + threads = [] + resultqueue = util.queue() + taskqueue = util.queue() + # partition work to more pieces than workers to minimize the chance + # of uneven distribution of large tasks between the workers + for pargs in partition(args, workers * 20): + taskqueue.put(pargs) + for _i in range(workers): + t = Worker(taskqueue, resultqueue, func, staticargs) + threads.append(t) + t.start() + while any(t.is_alive() for t in threads): + while not resultqueue.empty(): + yield resultqueue.get() + t = threads[0] + t.join(0.05) + if not t.is_alive(): + threads.remove(t) + while not resultqueue.empty(): + yield resultqueue.get() + +if pycompat.iswindows: + _platformworker = _windowsworker +else: _platformworker = _posixworker _exitstatus = _posixexitstatus