diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -214,18 +214,45 @@ self._resultqueue = resultqueue self._func = func self._staticargs = staticargs + self._interrupted = False + self.exception = None + + def interrupt(self): + self._interrupted = True def run(self): - while not self._taskqueue.empty(): - try: - args = self._taskqueue.get_nowait() - for res in self._func(*self._staticargs + (args,)): - self._resultqueue.put(res) - except util.empty: - break + try: + while not self._taskqueue.empty(): + try: + args = self._taskqueue.get_nowait() + for res in self._func(*self._staticargs + (args,)): + self._resultqueue.put(res) + # threading doesn't provide a native way to + # interrupt execution. handle it manually at every + # iteration. + if self._interrupted: + return + except util.empty: + break + except Exception as e: + # store the exception such that the main thread can resurface + # it as if the func was running without workers. + self.exception = e + raise + + threads = [] + def killworkers(): + for t in threads: + t.interrupt() + for t in threads: + # try to let the threads handle interruption, but don't wait + # indefintely. the thread could be in infinite loop, handling + # a very long task or in a deadlock situation + t.join(5) + if t.is_alive(): + raise error.Abort(_('failed to join worker thread')) workers = _numworkers(ui) - threads = [] resultqueue = util.queue() taskqueue = util.queue() # partition work to more pieces than workers to minimize the chance @@ -236,12 +263,24 @@ t = Worker(taskqueue, resultqueue, func, staticargs) threads.append(t) t.start() - while any(t.is_alive() for t in threads): + + while len(threads) > 0: while not resultqueue.empty(): yield resultqueue.get() - t = threads[0] - t.join(0.05) - if not t.is_alive(): + threads[0].join(0.05) + finishedthreads = [_t for _t in threads if not _t.is_alive()] + for t in finishedthreads: + if t.exception is not None: + try: + killworkers() + except Exception: + # pass over the workers joining failure. it is more + # important to surface the inital exception than the + # fact that one of workers may be processing a large + # task and does not get to handle the interruption. + ui.warn(_("failed to kill worker threads while handling " + "an exception")) + raise t.exception threads.remove(t) while not resultqueue.empty(): yield resultqueue.get()