diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -12,6 +12,7 @@ import signal import sys import threading +import time from .i18n import _ from . import ( @@ -216,6 +217,7 @@ self._func = func self._staticargs = staticargs self._interrupted = False + self.daemon = True self.exception = None def interrupt(self): @@ -242,16 +244,22 @@ raise threads = [] - def killworkers(): + def trykillworkers(): + # Allow up to 1 second to clean worker threads nicely + cleanupend = time.time() + 1 for t in threads: t.interrupt() for t in threads: - # try to let the threads handle interruption, but don't wait - # indefintely. the thread could be in infinite loop, handling - # a very long task or in a deadlock situation - t.join(5) + remainingtime = cleanupend - time.time() + t.join(remainingtime) if t.is_alive(): - raise error.Abort(_('failed to join worker thread')) + # pass over the workers joining failure. it is more + # important to surface the inital exception than the + # fact that one of workers may be processing a large + # task and does not get to handle the interruption. + ui.warn(_("failed to kill worker threads while " + "handling an exception\n")) + return workers = _numworkers(ui) resultqueue = util.queue() @@ -264,25 +272,19 @@ t = Worker(taskqueue, resultqueue, func, staticargs) threads.append(t) t.start() - - while len(threads) > 0: - while not resultqueue.empty(): - yield resultqueue.get() - threads[0].join(0.05) - finishedthreads = [_t for _t in threads if not _t.is_alive()] - for t in finishedthreads: - if t.exception is not None: - try: - killworkers() - except Exception: - # pass over the workers joining failure. it is more - # important to surface the inital exception than the - # fact that one of workers may be processing a large - # task and does not get to handle the interruption. - ui.warn(_("failed to kill worker threads while handling " - "an exception")) - raise t.exception - threads.remove(t) + try: + while len(threads) > 0: + while not resultqueue.empty(): + yield resultqueue.get() + threads[0].join(0.05) + finishedthreads = [_t for _t in threads if not _t.is_alive()] + for t in finishedthreads: + if t.exception is not None: + raise t.exception + threads.remove(t) + except Exception: # re-raises + trykillworkers() + raise while not resultqueue.empty(): yield resultqueue.get()