We no longer support Python 2.
Details
Details
Diff Detail
Diff Detail
- Repository
- rHG Mercurial
- Lint
Automatic diff as part of commit; lint not applicable. - Unit
Automatic diff as part of commit; unit tests not applicable.
We no longer support Python 2.
Automatic diff as part of commit; lint not applicable. |
Automatic diff as part of commit; unit tests not applicable. |
Path | Packages | |||
---|---|---|---|---|
D | M | mercurial/thirdparty/concurrent/LICENSE (48 lines) | ||
D | M | mercurial/thirdparty/concurrent/__init__.py | ||
D | M | mercurial/thirdparty/concurrent/futures/__init__.py (27 lines) | ||
D | M | mercurial/thirdparty/concurrent/futures/_base.py (669 lines) | ||
D | M | mercurial/thirdparty/concurrent/futures/process.py (365 lines) | ||
D | M | mercurial/thirdparty/concurrent/futures/thread.py (162 lines) | ||
M | setup.py (8 lines) |
PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2 | |||||
-------------------------------------------- | |||||
1. This LICENSE AGREEMENT is between the Python Software Foundation | |||||
("PSF"), and the Individual or Organization ("Licensee") accessing and | |||||
otherwise using this software ("Python") in source or binary form and | |||||
its associated documentation. | |||||
2. Subject to the terms and conditions of this License Agreement, PSF | |||||
hereby grants Licensee a nonexclusive, royalty-free, world-wide | |||||
license to reproduce, analyze, test, perform and/or display publicly, | |||||
prepare derivative works, distribute, and otherwise use Python | |||||
alone or in any derivative version, provided, however, that PSF's | |||||
License Agreement and PSF's notice of copyright, i.e., "Copyright (c) | |||||
2001, 2002, 2003, 2004, 2005, 2006 Python Software Foundation; All Rights | |||||
Reserved" are retained in Python alone or in any derivative version | |||||
prepared by Licensee. | |||||
3. In the event Licensee prepares a derivative work that is based on | |||||
or incorporates Python or any part thereof, and wants to make | |||||
the derivative work available to others as provided herein, then | |||||
Licensee hereby agrees to include in any such work a brief summary of | |||||
the changes made to Python. | |||||
4. PSF is making Python available to Licensee on an "AS IS" | |||||
basis. PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR | |||||
IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND | |||||
DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS | |||||
FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT | |||||
INFRINGE ANY THIRD PARTY RIGHTS. | |||||
5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON | |||||
FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS | |||||
A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON, | |||||
OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF. | |||||
6. This License Agreement will automatically terminate upon a material | |||||
breach of its terms and conditions. | |||||
7. Nothing in this License Agreement shall be deemed to create any | |||||
relationship of agency, partnership, or joint venture between PSF and | |||||
Licensee. This License Agreement does not grant permission to use PSF | |||||
trademarks or trade name in a trademark sense to endorse or promote | |||||
products or services of Licensee, or any third party. | |||||
8. By copying, installing or otherwise using Python, Licensee | |||||
agrees to be bound by the terms and conditions of this License | |||||
Agreement. |
# Copyright 2009 Brian Quinlan. All Rights Reserved. | |||||
# Licensed to PSF under a Contributor Agreement. | |||||
"""Execute computations asynchronously using threads or processes.""" | |||||
from __future__ import absolute_import | |||||
__author__ = 'Brian Quinlan (brian@sweetapp.com)' | |||||
from ._base import ( | |||||
FIRST_COMPLETED, | |||||
FIRST_EXCEPTION, | |||||
ALL_COMPLETED, | |||||
CancelledError, | |||||
TimeoutError, | |||||
Future, | |||||
Executor, | |||||
wait, | |||||
as_completed, | |||||
) | |||||
from .thread import ThreadPoolExecutor | |||||
try: | |||||
from .process import ProcessPoolExecutor | |||||
except ImportError: | |||||
# some platforms don't have multiprocessing | |||||
pass |
# Copyright 2009 Brian Quinlan. All Rights Reserved. | |||||
# Licensed to PSF under a Contributor Agreement. | |||||
from __future__ import absolute_import | |||||
import collections | |||||
import logging | |||||
import threading | |||||
import itertools | |||||
import time | |||||
import types | |||||
__author__ = 'Brian Quinlan (brian@sweetapp.com)' | |||||
FIRST_COMPLETED = 'FIRST_COMPLETED' | |||||
FIRST_EXCEPTION = 'FIRST_EXCEPTION' | |||||
ALL_COMPLETED = 'ALL_COMPLETED' | |||||
_AS_COMPLETED = '_AS_COMPLETED' | |||||
# Possible future states (for internal use by the futures package). | |||||
PENDING = 'PENDING' | |||||
RUNNING = 'RUNNING' | |||||
# The future was cancelled by the user... | |||||
CANCELLED = 'CANCELLED' | |||||
# ...and _Waiter.add_cancelled() was called by a worker. | |||||
CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' | |||||
FINISHED = 'FINISHED' | |||||
_FUTURE_STATES = [ | |||||
PENDING, | |||||
RUNNING, | |||||
CANCELLED, | |||||
CANCELLED_AND_NOTIFIED, | |||||
FINISHED | |||||
] | |||||
_STATE_TO_DESCRIPTION_MAP = { | |||||
PENDING: "pending", | |||||
RUNNING: "running", | |||||
CANCELLED: "cancelled", | |||||
CANCELLED_AND_NOTIFIED: "cancelled", | |||||
FINISHED: "finished" | |||||
} | |||||
# Logger for internal use by the futures package. | |||||
LOGGER = logging.getLogger("concurrent.futures") | |||||
class Error(Exception): | |||||
"""Base class for all future-related exceptions.""" | |||||
pass | |||||
class CancelledError(Error): | |||||
"""The Future was cancelled.""" | |||||
pass | |||||
class TimeoutError(Error): | |||||
"""The operation exceeded the given deadline.""" | |||||
pass | |||||
class _Waiter(object): | |||||
"""Provides the event that wait() and as_completed() block on.""" | |||||
def __init__(self): | |||||
self.event = threading.Event() | |||||
self.finished_futures = [] | |||||
def add_result(self, future): | |||||
self.finished_futures.append(future) | |||||
def add_exception(self, future): | |||||
self.finished_futures.append(future) | |||||
def add_cancelled(self, future): | |||||
self.finished_futures.append(future) | |||||
class _AsCompletedWaiter(_Waiter): | |||||
"""Used by as_completed().""" | |||||
def __init__(self): | |||||
super(_AsCompletedWaiter, self).__init__() | |||||
self.lock = threading.Lock() | |||||
def add_result(self, future): | |||||
with self.lock: | |||||
super(_AsCompletedWaiter, self).add_result(future) | |||||
self.event.set() | |||||
def add_exception(self, future): | |||||
with self.lock: | |||||
super(_AsCompletedWaiter, self).add_exception(future) | |||||
self.event.set() | |||||
def add_cancelled(self, future): | |||||
with self.lock: | |||||
super(_AsCompletedWaiter, self).add_cancelled(future) | |||||
self.event.set() | |||||
class _FirstCompletedWaiter(_Waiter): | |||||
"""Used by wait(return_when=FIRST_COMPLETED).""" | |||||
def add_result(self, future): | |||||
super(_FirstCompletedWaiter, self).add_result(future) | |||||
self.event.set() | |||||
def add_exception(self, future): | |||||
super(_FirstCompletedWaiter, self).add_exception(future) | |||||
self.event.set() | |||||
def add_cancelled(self, future): | |||||
super(_FirstCompletedWaiter, self).add_cancelled(future) | |||||
self.event.set() | |||||
class _AllCompletedWaiter(_Waiter): | |||||
"""Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" | |||||
def __init__(self, num_pending_calls, stop_on_exception): | |||||
self.num_pending_calls = num_pending_calls | |||||
self.stop_on_exception = stop_on_exception | |||||
self.lock = threading.Lock() | |||||
super(_AllCompletedWaiter, self).__init__() | |||||
def _decrement_pending_calls(self): | |||||
with self.lock: | |||||
self.num_pending_calls -= 1 | |||||
if not self.num_pending_calls: | |||||
self.event.set() | |||||
def add_result(self, future): | |||||
super(_AllCompletedWaiter, self).add_result(future) | |||||
self._decrement_pending_calls() | |||||
def add_exception(self, future): | |||||
super(_AllCompletedWaiter, self).add_exception(future) | |||||
if self.stop_on_exception: | |||||
self.event.set() | |||||
else: | |||||
self._decrement_pending_calls() | |||||
def add_cancelled(self, future): | |||||
super(_AllCompletedWaiter, self).add_cancelled(future) | |||||
self._decrement_pending_calls() | |||||
class _AcquireFutures(object): | |||||
"""A context manager that does an ordered acquire of Future conditions.""" | |||||
def __init__(self, futures): | |||||
self.futures = sorted(futures, key=id) | |||||
def __enter__(self): | |||||
for future in self.futures: | |||||
future._condition.acquire() | |||||
def __exit__(self, *args): | |||||
for future in self.futures: | |||||
future._condition.release() | |||||
def _create_and_install_waiters(fs, return_when): | |||||
if return_when == _AS_COMPLETED: | |||||
waiter = _AsCompletedWaiter() | |||||
elif return_when == FIRST_COMPLETED: | |||||
waiter = _FirstCompletedWaiter() | |||||
else: | |||||
pending_count = sum( | |||||
f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) | |||||
if return_when == FIRST_EXCEPTION: | |||||
waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) | |||||
elif return_when == ALL_COMPLETED: | |||||
waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) | |||||
else: | |||||
raise ValueError("Invalid return condition: %r" % return_when) | |||||
for f in fs: | |||||
f._waiters.append(waiter) | |||||
return waiter | |||||
def _yield_finished_futures(fs, waiter, ref_collect): | |||||
""" | |||||
Iterate on the list *fs*, yielding finished futures one by one in | |||||
reverse order. | |||||
Before yielding a future, *waiter* is removed from its waiters | |||||
and the future is removed from each set in the collection of sets | |||||
*ref_collect*. | |||||
The aim of this function is to avoid keeping stale references after | |||||
the future is yielded and before the iterator resumes. | |||||
""" | |||||
while fs: | |||||
f = fs[-1] | |||||
for futures_set in ref_collect: | |||||
futures_set.remove(f) | |||||
with f._condition: | |||||
f._waiters.remove(waiter) | |||||
del f | |||||
# Careful not to keep a reference to the popped value | |||||
yield fs.pop() | |||||
def as_completed(fs, timeout=None): | |||||
"""An iterator over the given futures that yields each as it completes. | |||||
Args: | |||||
fs: The sequence of Futures (possibly created by different Executors) to | |||||
iterate over. | |||||
timeout: The maximum number of seconds to wait. If None, then there | |||||
is no limit on the wait time. | |||||
Returns: | |||||
An iterator that yields the given Futures as they complete (finished or | |||||
cancelled). If any given Futures are duplicated, they will be returned | |||||
once. | |||||
Raises: | |||||
TimeoutError: If the entire result iterator could not be generated | |||||
before the given timeout. | |||||
""" | |||||
if timeout is not None: | |||||
end_time = timeout + time.time() | |||||
fs = set(fs) | |||||
total_futures = len(fs) | |||||
with _AcquireFutures(fs): | |||||
finished = set( | |||||
f for f in fs | |||||
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) | |||||
pending = fs - finished | |||||
waiter = _create_and_install_waiters(fs, _AS_COMPLETED) | |||||
finished = list(finished) | |||||
try: | |||||
for f in _yield_finished_futures(finished, waiter, | |||||
ref_collect=(fs,)): | |||||
f = [f] | |||||
yield f.pop() | |||||
while pending: | |||||
if timeout is None: | |||||
wait_timeout = None | |||||
else: | |||||
wait_timeout = end_time - time.time() | |||||
if wait_timeout < 0: | |||||
raise TimeoutError( | |||||
'%d (of %d) futures unfinished' % ( | |||||
len(pending), total_futures)) | |||||
waiter.event.wait(wait_timeout) | |||||
with waiter.lock: | |||||
finished = waiter.finished_futures | |||||
waiter.finished_futures = [] | |||||
waiter.event.clear() | |||||
# reverse to keep finishing order | |||||
finished.reverse() | |||||
for f in _yield_finished_futures(finished, waiter, | |||||
ref_collect=(fs, pending)): | |||||
f = [f] | |||||
yield f.pop() | |||||
finally: | |||||
# Remove waiter from unfinished futures | |||||
for f in fs: | |||||
with f._condition: | |||||
f._waiters.remove(waiter) | |||||
DoneAndNotDoneFutures = collections.namedtuple( | |||||
'DoneAndNotDoneFutures', 'done not_done') | |||||
def wait(fs, timeout=None, return_when=ALL_COMPLETED): | |||||
"""Wait for the futures in the given sequence to complete. | |||||
Args: | |||||
fs: The sequence of Futures (possibly created by different Executors) to | |||||
wait upon. | |||||
timeout: The maximum number of seconds to wait. If None, then there | |||||
is no limit on the wait time. | |||||
return_when: Indicates when this function should return. The options | |||||
are: | |||||
FIRST_COMPLETED - Return when any future finishes or is | |||||
cancelled. | |||||
FIRST_EXCEPTION - Return when any future finishes by raising an | |||||
exception. If no future raises an exception | |||||
then it is equivalent to ALL_COMPLETED. | |||||
ALL_COMPLETED - Return when all futures finish or are cancelled. | |||||
Returns: | |||||
A named 2-tuple of sets. The first set, named 'done', contains the | |||||
futures that completed (is finished or cancelled) before the wait | |||||
completed. The second set, named 'not_done', contains uncompleted | |||||
futures. | |||||
""" | |||||
with _AcquireFutures(fs): | |||||
done = set(f for f in fs | |||||
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) | |||||
not_done = set(fs) - done | |||||
if (return_when == FIRST_COMPLETED) and done: | |||||
return DoneAndNotDoneFutures(done, not_done) | |||||
elif (return_when == FIRST_EXCEPTION) and done: | |||||
if any(f for f in done | |||||
if not f.cancelled() and f.exception() is not None): | |||||
return DoneAndNotDoneFutures(done, not_done) | |||||
if len(done) == len(fs): | |||||
return DoneAndNotDoneFutures(done, not_done) | |||||
waiter = _create_and_install_waiters(fs, return_when) | |||||
waiter.event.wait(timeout) | |||||
for f in fs: | |||||
with f._condition: | |||||
f._waiters.remove(waiter) | |||||
done.update(waiter.finished_futures) | |||||
return DoneAndNotDoneFutures(done, set(fs) - done) | |||||
class Future(object): | |||||
"""Represents the result of an asynchronous computation.""" | |||||
def __init__(self): | |||||
"""Initializes the future. Should not be called by clients.""" | |||||
self._condition = threading.Condition() | |||||
self._state = PENDING | |||||
self._result = None | |||||
self._exception = None | |||||
self._traceback = None | |||||
self._waiters = [] | |||||
self._done_callbacks = [] | |||||
def _invoke_callbacks(self): | |||||
for callback in self._done_callbacks: | |||||
try: | |||||
callback(self) | |||||
except Exception: | |||||
LOGGER.exception('exception calling callback for %r', self) | |||||
except BaseException: | |||||
# Explicitly let all other new-style exceptions through so | |||||
# that we can catch all old-style exceptions with a simple | |||||
# "except:" clause below. | |||||
# | |||||
# All old-style exception objects are instances of | |||||
# types.InstanceType, but "except types.InstanceType:" does | |||||
# not catch old-style exceptions for some reason. Thus, the | |||||
# only way to catch all old-style exceptions without catching | |||||
# any new-style exceptions is to filter out the new-style | |||||
# exceptions, which all derive from BaseException. | |||||
raise | |||||
except: | |||||
# Because of the BaseException clause above, this handler only | |||||
# executes for old-style exception objects. | |||||
LOGGER.exception('exception calling callback for %r', self) | |||||
def __repr__(self): | |||||
with self._condition: | |||||
if self._state == FINISHED: | |||||
if self._exception: | |||||
return '<%s at %#x state=%s raised %s>' % ( | |||||
self.__class__.__name__, | |||||
id(self), | |||||
_STATE_TO_DESCRIPTION_MAP[self._state], | |||||
self._exception.__class__.__name__) | |||||
else: | |||||
return '<%s at %#x state=%s returned %s>' % ( | |||||
self.__class__.__name__, | |||||
id(self), | |||||
_STATE_TO_DESCRIPTION_MAP[self._state], | |||||
self._result.__class__.__name__) | |||||
return '<%s at %#x state=%s>' % ( | |||||
self.__class__.__name__, | |||||
id(self), | |||||
_STATE_TO_DESCRIPTION_MAP[self._state]) | |||||
def cancel(self): | |||||
"""Cancel the future if possible. | |||||
Returns True if the future was cancelled, False otherwise. A future | |||||
cannot be cancelled if it is running or has already completed. | |||||
""" | |||||
with self._condition: | |||||
if self._state in [RUNNING, FINISHED]: | |||||
return False | |||||
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | |||||
return True | |||||
self._state = CANCELLED | |||||
self._condition.notify_all() | |||||
self._invoke_callbacks() | |||||
return True | |||||
def cancelled(self): | |||||
"""Return True if the future was cancelled.""" | |||||
with self._condition: | |||||
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] | |||||
def running(self): | |||||
"""Return True if the future is currently executing.""" | |||||
with self._condition: | |||||
return self._state == RUNNING | |||||
def done(self): | |||||
"""Return True of the future was cancelled or finished executing.""" | |||||
with self._condition: | |||||
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] | |||||
def __get_result(self): | |||||
if self._exception: | |||||
if isinstance(self._exception, types.InstanceType): | |||||
# The exception is an instance of an old-style class, which | |||||
# means type(self._exception) returns types.ClassType instead | |||||
# of the exception's actual class type. | |||||
exception_type = self._exception.__class__ | |||||
else: | |||||
exception_type = type(self._exception) | |||||
raise exception_type, self._exception, self._traceback | |||||
else: | |||||
return self._result | |||||
def add_done_callback(self, fn): | |||||
"""Attaches a callable that will be called when the future finishes. | |||||
Args: | |||||
fn: A callable that will be called with this future as its only | |||||
argument when the future completes or is cancelled. The callable | |||||
will always be called by a thread in the same process in which | |||||
it was added. If the future has already completed or been | |||||
cancelled then the callable will be called immediately. These | |||||
callables are called in the order that they were added. | |||||
""" | |||||
with self._condition: | |||||
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: | |||||
self._done_callbacks.append(fn) | |||||
return | |||||
fn(self) | |||||
def result(self, timeout=None): | |||||
"""Return the result of the call that the future represents. | |||||
Args: | |||||
timeout: The number of seconds to wait for the result if the future | |||||
isn't done. If None, then there is no limit on the wait time. | |||||
Returns: | |||||
The result of the call that the future represents. | |||||
Raises: | |||||
CancelledError: If the future was cancelled. | |||||
TimeoutError: If the future didn't finish executing before the given | |||||
timeout. | |||||
Exception: If the call raised then that exception will be raised. | |||||
""" | |||||
with self._condition: | |||||
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | |||||
raise CancelledError() | |||||
elif self._state == FINISHED: | |||||
return self.__get_result() | |||||
self._condition.wait(timeout) | |||||
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | |||||
raise CancelledError() | |||||
elif self._state == FINISHED: | |||||
return self.__get_result() | |||||
else: | |||||
raise TimeoutError() | |||||
def exception_info(self, timeout=None): | |||||
"""Return a tuple of (exception, traceback) raised by the call that the | |||||
future represents. | |||||
Args: | |||||
timeout: The number of seconds to wait for the exception if the | |||||
future isn't done. If None, then there is no limit on the wait | |||||
time. | |||||
Returns: | |||||
The exception raised by the call that the future represents or None | |||||
if the call completed without raising. | |||||
Raises: | |||||
CancelledError: If the future was cancelled. | |||||
TimeoutError: If the future didn't finish executing before the given | |||||
timeout. | |||||
""" | |||||
with self._condition: | |||||
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | |||||
raise CancelledError() | |||||
elif self._state == FINISHED: | |||||
return self._exception, self._traceback | |||||
self._condition.wait(timeout) | |||||
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | |||||
raise CancelledError() | |||||
elif self._state == FINISHED: | |||||
return self._exception, self._traceback | |||||
else: | |||||
raise TimeoutError() | |||||
def exception(self, timeout=None): | |||||
"""Return the exception raised by the call that the future represents. | |||||
Args: | |||||
timeout: The number of seconds to wait for the exception if the | |||||
future isn't done. If None, then there is no limit on the wait | |||||
time. | |||||
Returns: | |||||
The exception raised by the call that the future represents or None | |||||
if the call completed without raising. | |||||
Raises: | |||||
CancelledError: If the future was cancelled. | |||||
TimeoutError: If the future didn't finish executing before the given | |||||
timeout. | |||||
""" | |||||
return self.exception_info(timeout)[0] | |||||
# The following methods should only be used by Executors and in tests. | |||||
def set_running_or_notify_cancel(self): | |||||
"""Mark the future as running or process any cancel notifications. | |||||
Should only be used by Executor implementations and unit tests. | |||||
If the future has been cancelled (cancel() was called and returned | |||||
True) then any threads waiting on the future completing (though calls | |||||
to as_completed() or wait()) are notified and False is returned. | |||||
If the future was not cancelled then it is put in the running state | |||||
(future calls to running() will return True) and True is returned. | |||||
This method should be called by Executor implementations before | |||||
executing the work associated with this future. If this method returns | |||||
False then the work should not be executed. | |||||
Returns: | |||||
False if the Future was cancelled, True otherwise. | |||||
Raises: | |||||
RuntimeError: if this method was already called or if set_result() | |||||
or set_exception() was called. | |||||
""" | |||||
with self._condition: | |||||
if self._state == CANCELLED: | |||||
self._state = CANCELLED_AND_NOTIFIED | |||||
for waiter in self._waiters: | |||||
waiter.add_cancelled(self) | |||||
# self._condition.notify_all() is not necessary because | |||||
# self.cancel() triggers a notification. | |||||
return False | |||||
elif self._state == PENDING: | |||||
self._state = RUNNING | |||||
return True | |||||
else: | |||||
LOGGER.critical('Future %s in unexpected state: %s', | |||||
id(self), | |||||
self._state) | |||||
raise RuntimeError('Future in unexpected state') | |||||
def set_result(self, result): | |||||
"""Sets the return value of work associated with the future. | |||||
Should only be used by Executor implementations and unit tests. | |||||
""" | |||||
with self._condition: | |||||
self._result = result | |||||
self._state = FINISHED | |||||
for waiter in self._waiters: | |||||
waiter.add_result(self) | |||||
self._condition.notify_all() | |||||
self._invoke_callbacks() | |||||
def set_exception_info(self, exception, traceback): | |||||
"""Sets the result of the future as being the given exception | |||||
and traceback. | |||||
Should only be used by Executor implementations and unit tests. | |||||
""" | |||||
with self._condition: | |||||
self._exception = exception | |||||
self._traceback = traceback | |||||
self._state = FINISHED | |||||
for waiter in self._waiters: | |||||
waiter.add_exception(self) | |||||
self._condition.notify_all() | |||||
self._invoke_callbacks() | |||||
def set_exception(self, exception): | |||||
"""Sets the result of the future as being the given exception. | |||||
Should only be used by Executor implementations and unit tests. | |||||
""" | |||||
self.set_exception_info(exception, None) | |||||
class Executor(object): | |||||
"""This is an abstract base class for concrete asynchronous executors.""" | |||||
def submit(self, fn, *args, **kwargs): | |||||
"""Submits a callable to be executed with the given arguments. | |||||
Schedules the callable to be executed as fn(*args, **kwargs) and returns | |||||
a Future instance representing the execution of the callable. | |||||
Returns: | |||||
A Future representing the given call. | |||||
""" | |||||
raise NotImplementedError() | |||||
def map(self, fn, *iterables, **kwargs): | |||||
"""Returns an iterator equivalent to map(fn, iter). | |||||
Args: | |||||
fn: A callable that will take as many arguments as there are | |||||
passed iterables. | |||||
timeout: The maximum number of seconds to wait. If None, then there | |||||
is no limit on the wait time. | |||||
Returns: | |||||
An iterator equivalent to: map(func, *iterables) but the calls may | |||||
be evaluated out-of-order. | |||||
Raises: | |||||
TimeoutError: If the entire result iterator could not be generated | |||||
before the given timeout. | |||||
Exception: If fn(*args) raises for any values. | |||||
""" | |||||
timeout = kwargs.get('timeout') | |||||
if timeout is not None: | |||||
end_time = timeout + time.time() | |||||
fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)] | |||||
# Yield must be hidden in closure so that the futures are submitted | |||||
# before the first iterator value is required. | |||||
def result_iterator(): | |||||
try: | |||||
# reverse to keep finishing order | |||||
fs.reverse() | |||||
while fs: | |||||
# Careful not to keep a reference to the popped future | |||||
if timeout is None: | |||||
yield fs.pop().result() | |||||
else: | |||||
yield fs.pop().result(end_time - time.time()) | |||||
finally: | |||||
for future in fs: | |||||
future.cancel() | |||||
return result_iterator() | |||||
def shutdown(self, wait=True): | |||||
"""Clean-up the resources associated with the Executor. | |||||
It is safe to call this method several times. Otherwise, no other | |||||
methods can be called after this one. | |||||
Args: | |||||
wait: If True then shutdown will not return until all running | |||||
futures have finished executing and the resources used by the | |||||
executor have been reclaimed. | |||||
""" | |||||
pass | |||||
def __enter__(self): | |||||
return self | |||||
def __exit__(self, exc_type, exc_val, exc_tb): | |||||
self.shutdown(wait=True) | |||||
return False |
# Copyright 2009 Brian Quinlan. All Rights Reserved. | |||||
# Licensed to PSF under a Contributor Agreement. | |||||
"""Implements ProcessPoolExecutor. | |||||
The follow diagram and text describe the data-flow through the system: | |||||
|======================= In-process =====================|== Out-of-process ==| | |||||
+----------+ +----------+ +--------+ +-----------+ +---------+ | |||||
| | => | Work Ids | => | | => | Call Q | => | | | |||||
| | +----------+ | | +-----------+ | | | |||||
| | | ... | | | | ... | | | | |||||
| | | 6 | | | | 5, call() | | | | |||||
| | | 7 | | | | ... | | | | |||||
| Process | | ... | | Local | +-----------+ | Process | | |||||
| Pool | +----------+ | Worker | | #1..n | | |||||
| Executor | | Thread | | | | |||||
| | +----------- + | | +-----------+ | | | |||||
| | <=> | Work Items | <=> | | <= | Result Q | <= | | | |||||
| | +------------+ | | +-----------+ | | | |||||
| | | 6: call() | | | | ... | | | | |||||
| | | future | | | | 4, result | | | | |||||
| | | ... | | | | 3, except | | | | |||||
+----------+ +------------+ +--------+ +-----------+ +---------+ | |||||
Executor.submit() called: | |||||
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict | |||||
- adds the id of the _WorkItem to the "Work Ids" queue | |||||
Local worker thread: | |||||
- reads work ids from the "Work Ids" queue and looks up the corresponding | |||||
WorkItem from the "Work Items" dict: if the work item has been cancelled then | |||||
it is simply removed from the dict, otherwise it is repackaged as a | |||||
_CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" | |||||
until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because | |||||
calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). | |||||
- reads _ResultItems from "Result Q", updates the future stored in the | |||||
"Work Items" dict and deletes the dict entry | |||||
Process #1..n: | |||||
- reads _CallItems from "Call Q", executes the calls, and puts the resulting | |||||
_ResultItems in "Request Q" | |||||
""" | |||||
from __future__ import absolute_import | |||||
import atexit | |||||
from . import _base | |||||
import Queue as queue | |||||
import multiprocessing | |||||
import threading | |||||
import weakref | |||||
import sys | |||||
__author__ = 'Brian Quinlan (brian@sweetapp.com)' | |||||
# Workers are created as daemon threads and processes. This is done to allow the | |||||
# interpreter to exit when there are still idle processes in a | |||||
# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, | |||||
# allowing workers to die with the interpreter has two undesirable properties: | |||||
# - The workers would still be running during interpretor shutdown, | |||||
# meaning that they would fail in unpredictable ways. | |||||
# - The workers could be killed while evaluating a work item, which could | |||||
# be bad if the callable being evaluated has external side-effects e.g. | |||||
# writing to a file. | |||||
# | |||||
# To work around this problem, an exit handler is installed which tells the | |||||
# workers to exit when their work queues are empty and then waits until the | |||||
# threads/processes finish. | |||||
_threads_queues = weakref.WeakKeyDictionary() | |||||
_shutdown = False | |||||
def _python_exit(): | |||||
global _shutdown | |||||
_shutdown = True | |||||
items = list(_threads_queues.items()) if _threads_queues else () | |||||
for t, q in items: | |||||
q.put(None) | |||||
for t, q in items: | |||||
t.join(sys.maxint) | |||||
# Controls how many more calls than processes will be queued in the call queue. | |||||
# A smaller number will mean that processes spend more time idle waiting for | |||||
# work while a larger number will make Future.cancel() succeed less frequently | |||||
# (Futures in the call queue cannot be cancelled). | |||||
EXTRA_QUEUED_CALLS = 1 | |||||
class _WorkItem(object): | |||||
def __init__(self, future, fn, args, kwargs): | |||||
self.future = future | |||||
self.fn = fn | |||||
self.args = args | |||||
self.kwargs = kwargs | |||||
class _ResultItem(object): | |||||
def __init__(self, work_id, exception=None, result=None): | |||||
self.work_id = work_id | |||||
self.exception = exception | |||||
self.result = result | |||||
class _CallItem(object): | |||||
def __init__(self, work_id, fn, args, kwargs): | |||||
self.work_id = work_id | |||||
self.fn = fn | |||||
self.args = args | |||||
self.kwargs = kwargs | |||||
def _process_worker(call_queue, result_queue): | |||||
"""Evaluates calls from call_queue and places the results in result_queue. | |||||
This worker is run in a separate process. | |||||
Args: | |||||
call_queue: A multiprocessing.Queue of _CallItems that will be read and | |||||
evaluated by the worker. | |||||
result_queue: A multiprocessing.Queue of _ResultItems that will written | |||||
to by the worker. | |||||
shutdown: A multiprocessing.Event that will be set as a signal to the | |||||
worker that it should exit when call_queue is empty. | |||||
""" | |||||
while True: | |||||
call_item = call_queue.get(block=True) | |||||
if call_item is None: | |||||
# Wake up queue management thread | |||||
result_queue.put(None) | |||||
return | |||||
try: | |||||
r = call_item.fn(*call_item.args, **call_item.kwargs) | |||||
except: | |||||
e = sys.exc_info()[1] | |||||
result_queue.put(_ResultItem(call_item.work_id, | |||||
exception=e)) | |||||
else: | |||||
result_queue.put(_ResultItem(call_item.work_id, | |||||
result=r)) | |||||
def _add_call_item_to_queue(pending_work_items, | |||||
work_ids, | |||||
call_queue): | |||||
"""Fills call_queue with _WorkItems from pending_work_items. | |||||
This function never blocks. | |||||
Args: | |||||
pending_work_items: A dict mapping work ids to _WorkItems e.g. | |||||
{5: <_WorkItem...>, 6: <_WorkItem...>, ...} | |||||
work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids | |||||
are consumed and the corresponding _WorkItems from | |||||
pending_work_items are transformed into _CallItems and put in | |||||
call_queue. | |||||
call_queue: A multiprocessing.Queue that will be filled with _CallItems | |||||
derived from _WorkItems. | |||||
""" | |||||
while True: | |||||
if call_queue.full(): | |||||
return | |||||
try: | |||||
work_id = work_ids.get(block=False) | |||||
except queue.Empty: | |||||
return | |||||
else: | |||||
work_item = pending_work_items[work_id] | |||||
if work_item.future.set_running_or_notify_cancel(): | |||||
call_queue.put(_CallItem(work_id, | |||||
work_item.fn, | |||||
work_item.args, | |||||
work_item.kwargs), | |||||
block=True) | |||||
else: | |||||
del pending_work_items[work_id] | |||||
continue | |||||
def _queue_management_worker(executor_reference, | |||||
processes, | |||||
pending_work_items, | |||||
work_ids_queue, | |||||
call_queue, | |||||
result_queue): | |||||
"""Manages the communication between this process and the worker processes. | |||||
This function is run in a local thread. | |||||
Args: | |||||
executor_reference: A weakref.ref to the ProcessPoolExecutor that owns | |||||
this thread. Used to determine if the ProcessPoolExecutor has been | |||||
garbage collected and that this function can exit. | |||||
process: A list of the multiprocessing.Process instances used as | |||||
workers. | |||||
pending_work_items: A dict mapping work ids to _WorkItems e.g. | |||||
{5: <_WorkItem...>, 6: <_WorkItem...>, ...} | |||||
work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). | |||||
call_queue: A multiprocessing.Queue that will be filled with _CallItems | |||||
derived from _WorkItems for processing by the process workers. | |||||
result_queue: A multiprocessing.Queue of _ResultItems generated by the | |||||
process workers. | |||||
""" | |||||
nb_shutdown_processes = [0] | |||||
def shutdown_one_process(): | |||||
"""Tell a worker to terminate, which will in turn wake us again""" | |||||
call_queue.put(None) | |||||
nb_shutdown_processes[0] += 1 | |||||
while True: | |||||
_add_call_item_to_queue(pending_work_items, | |||||
work_ids_queue, | |||||
call_queue) | |||||
result_item = result_queue.get(block=True) | |||||
if result_item is not None: | |||||
work_item = pending_work_items[result_item.work_id] | |||||
del pending_work_items[result_item.work_id] | |||||
if result_item.exception: | |||||
work_item.future.set_exception(result_item.exception) | |||||
else: | |||||
work_item.future.set_result(result_item.result) | |||||
# Delete references to object. See issue16284 | |||||
del work_item | |||||
# Check whether we should start shutting down. | |||||
executor = executor_reference() | |||||
# No more work items can be added if: | |||||
# - The interpreter is shutting down OR | |||||
# - The executor that owns this worker has been collected OR | |||||
# - The executor that owns this worker has been shutdown. | |||||
if _shutdown or executor is None or executor._shutdown_thread: | |||||
# Since no new work items can be added, it is safe to shutdown | |||||
# this thread if there are no pending work items. | |||||
if not pending_work_items: | |||||
while nb_shutdown_processes[0] < len(processes): | |||||
shutdown_one_process() | |||||
# If .join() is not called on the created processes then | |||||
# some multiprocessing.Queue methods may deadlock on Mac OS | |||||
# X. | |||||
for p in processes: | |||||
p.join() | |||||
call_queue.close() | |||||
return | |||||
del executor | |||||
_system_limits_checked = False | |||||
_system_limited = None | |||||
def _check_system_limits(): | |||||
global _system_limits_checked, _system_limited | |||||
if _system_limits_checked: | |||||
if _system_limited: | |||||
raise NotImplementedError(_system_limited) | |||||
_system_limits_checked = True | |||||
try: | |||||
import os | |||||
nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") | |||||
except (AttributeError, ValueError): | |||||
# sysconf not available or setting not available | |||||
return | |||||
if nsems_max == -1: | |||||
# indetermine limit, assume that limit is determined | |||||
# by available memory only | |||||
return | |||||
if nsems_max >= 256: | |||||
# minimum number of semaphores available | |||||
# according to POSIX | |||||
return | |||||
_system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max | |||||
raise NotImplementedError(_system_limited) | |||||
class ProcessPoolExecutor(_base.Executor): | |||||
def __init__(self, max_workers=None): | |||||
"""Initializes a new ProcessPoolExecutor instance. | |||||
Args: | |||||
max_workers: The maximum number of processes that can be used to | |||||
execute the given calls. If None or not given then as many | |||||
worker processes will be created as the machine has processors. | |||||
""" | |||||
_check_system_limits() | |||||
if max_workers is None: | |||||
self._max_workers = multiprocessing.cpu_count() | |||||
else: | |||||
if max_workers <= 0: | |||||
raise ValueError("max_workers must be greater than 0") | |||||
self._max_workers = max_workers | |||||
# Make the call queue slightly larger than the number of processes to | |||||
# prevent the worker processes from idling. But don't make it too big | |||||
# because futures in the call queue cannot be cancelled. | |||||
self._call_queue = multiprocessing.Queue(self._max_workers + | |||||
EXTRA_QUEUED_CALLS) | |||||
self._result_queue = multiprocessing.Queue() | |||||
self._work_ids = queue.Queue() | |||||
self._queue_management_thread = None | |||||
self._processes = set() | |||||
# Shutdown is a two-step process. | |||||
self._shutdown_thread = False | |||||
self._shutdown_lock = threading.Lock() | |||||
self._queue_count = 0 | |||||
self._pending_work_items = {} | |||||
def _start_queue_management_thread(self): | |||||
# When the executor gets lost, the weakref callback will wake up | |||||
# the queue management thread. | |||||
def weakref_cb(_, q=self._result_queue): | |||||
q.put(None) | |||||
if self._queue_management_thread is None: | |||||
self._queue_management_thread = threading.Thread( | |||||
target=_queue_management_worker, | |||||
args=(weakref.ref(self, weakref_cb), | |||||
self._processes, | |||||
self._pending_work_items, | |||||
self._work_ids, | |||||
self._call_queue, | |||||
self._result_queue)) | |||||
self._queue_management_thread.daemon = True | |||||
self._queue_management_thread.start() | |||||
_threads_queues[self._queue_management_thread] = self._result_queue | |||||
def _adjust_process_count(self): | |||||
for _ in range(len(self._processes), self._max_workers): | |||||
p = multiprocessing.Process( | |||||
target=_process_worker, | |||||
args=(self._call_queue, | |||||
self._result_queue)) | |||||
p.start() | |||||
self._processes.add(p) | |||||
def submit(self, fn, *args, **kwargs): | |||||
with self._shutdown_lock: | |||||
if self._shutdown_thread: | |||||
raise RuntimeError('cannot schedule new futures after shutdown') | |||||
f = _base.Future() | |||||
w = _WorkItem(f, fn, args, kwargs) | |||||
self._pending_work_items[self._queue_count] = w | |||||
self._work_ids.put(self._queue_count) | |||||
self._queue_count += 1 | |||||
# Wake up queue management thread | |||||
self._result_queue.put(None) | |||||
self._start_queue_management_thread() | |||||
self._adjust_process_count() | |||||
return f | |||||
submit.__doc__ = _base.Executor.submit.__doc__ | |||||
def shutdown(self, wait=True): | |||||
with self._shutdown_lock: | |||||
self._shutdown_thread = True | |||||
if self._queue_management_thread: | |||||
# Wake up queue management thread | |||||
self._result_queue.put(None) | |||||
if wait: | |||||
self._queue_management_thread.join(sys.maxint) | |||||
# To reduce the risk of openning too many files, remove references to | |||||
# objects that use file descriptors. | |||||
self._queue_management_thread = None | |||||
self._call_queue = None | |||||
self._result_queue = None | |||||
self._processes = None | |||||
shutdown.__doc__ = _base.Executor.shutdown.__doc__ | |||||
atexit.register(_python_exit) |
# Copyright 2009 Brian Quinlan. All Rights Reserved. | |||||
# Licensed to PSF under a Contributor Agreement. | |||||
"""Implements ThreadPoolExecutor.""" | |||||
from __future__ import absolute_import | |||||
import atexit | |||||
from . import _base | |||||
import itertools | |||||
import Queue as queue | |||||
import threading | |||||
import weakref | |||||
import sys | |||||
try: | |||||
from multiprocessing import cpu_count | |||||
except ImportError: | |||||
# some platforms don't have multiprocessing | |||||
def cpu_count(): | |||||
return None | |||||
__author__ = 'Brian Quinlan (brian@sweetapp.com)' | |||||
# Workers are created as daemon threads. This is done to allow the interpreter | |||||
# to exit when there are still idle threads in a ThreadPoolExecutor's thread | |||||
# pool (i.e. shutdown() was not called). However, allowing workers to die with | |||||
# the interpreter has two undesirable properties: | |||||
# - The workers would still be running during interpretor shutdown, | |||||
# meaning that they would fail in unpredictable ways. | |||||
# - The workers could be killed while evaluating a work item, which could | |||||
# be bad if the callable being evaluated has external side-effects e.g. | |||||
# writing to a file. | |||||
# | |||||
# To work around this problem, an exit handler is installed which tells the | |||||
# workers to exit when their work queues are empty and then waits until the | |||||
# threads finish. | |||||
_threads_queues = weakref.WeakKeyDictionary() | |||||
_shutdown = False | |||||
def _python_exit(): | |||||
global _shutdown | |||||
_shutdown = True | |||||
items = list(_threads_queues.items()) if _threads_queues else () | |||||
for t, q in items: | |||||
q.put(None) | |||||
for t, q in items: | |||||
t.join(sys.maxint) | |||||
atexit.register(_python_exit) | |||||
class _WorkItem(object): | |||||
def __init__(self, future, fn, args, kwargs): | |||||
self.future = future | |||||
self.fn = fn | |||||
self.args = args | |||||
self.kwargs = kwargs | |||||
def run(self): | |||||
if not self.future.set_running_or_notify_cancel(): | |||||
return | |||||
try: | |||||
result = self.fn(*self.args, **self.kwargs) | |||||
except: | |||||
e, tb = sys.exc_info()[1:] | |||||
self.future.set_exception_info(e, tb) | |||||
else: | |||||
self.future.set_result(result) | |||||
def _worker(executor_reference, work_queue): | |||||
try: | |||||
while True: | |||||
work_item = work_queue.get(block=True) | |||||
if work_item is not None: | |||||
work_item.run() | |||||
# Delete references to object. See issue16284 | |||||
del work_item | |||||
continue | |||||
executor = executor_reference() | |||||
# Exit if: | |||||
# - The interpreter is shutting down OR | |||||
# - The executor that owns the worker has been collected OR | |||||
# - The executor that owns the worker has been shutdown. | |||||
if _shutdown or executor is None or executor._shutdown: | |||||
# Notice other workers | |||||
work_queue.put(None) | |||||
return | |||||
del executor | |||||
except: | |||||
_base.LOGGER.critical('Exception in worker', exc_info=True) | |||||
class ThreadPoolExecutor(_base.Executor): | |||||
# Used to assign unique thread names when thread_name_prefix is not supplied. | |||||
_counter = itertools.count().next | |||||
def __init__(self, max_workers=None, thread_name_prefix=''): | |||||
"""Initializes a new ThreadPoolExecutor instance. | |||||
Args: | |||||
max_workers: The maximum number of threads that can be used to | |||||
execute the given calls. | |||||
thread_name_prefix: An optional name prefix to give our threads. | |||||
""" | |||||
if max_workers is None: | |||||
# Use this number because ThreadPoolExecutor is often | |||||
# used to overlap I/O instead of CPU work. | |||||
max_workers = (cpu_count() or 1) * 5 | |||||
if max_workers <= 0: | |||||
raise ValueError("max_workers must be greater than 0") | |||||
self._max_workers = max_workers | |||||
self._work_queue = queue.Queue() | |||||
self._threads = set() | |||||
self._shutdown = False | |||||
self._shutdown_lock = threading.Lock() | |||||
self._thread_name_prefix = (thread_name_prefix or | |||||
("ThreadPoolExecutor-%d" % self._counter())) | |||||
def submit(self, fn, *args, **kwargs): | |||||
with self._shutdown_lock: | |||||
if self._shutdown: | |||||
raise RuntimeError('cannot schedule new futures after shutdown') | |||||
f = _base.Future() | |||||
w = _WorkItem(f, fn, args, kwargs) | |||||
self._work_queue.put(w) | |||||
self._adjust_thread_count() | |||||
return f | |||||
submit.__doc__ = _base.Executor.submit.__doc__ | |||||
def _adjust_thread_count(self): | |||||
# When the executor gets lost, the weakref callback will wake up | |||||
# the worker threads. | |||||
def weakref_cb(_, q=self._work_queue): | |||||
q.put(None) | |||||
# TODO(bquinlan): Should avoid creating new threads if there are more | |||||
# idle threads than items in the work queue. | |||||
num_threads = len(self._threads) | |||||
if num_threads < self._max_workers: | |||||
thread_name = '%s_%d' % (self._thread_name_prefix or self, | |||||
num_threads) | |||||
t = threading.Thread(name=thread_name, target=_worker, | |||||
args=(weakref.ref(self, weakref_cb), | |||||
self._work_queue)) | |||||
t.daemon = True | |||||
t.start() | |||||
self._threads.add(t) | |||||
_threads_queues[t] = self._work_queue | |||||
def shutdown(self, wait=True): | |||||
with self._shutdown_lock: | |||||
self._shutdown = True | |||||
self._work_queue.put(None) | |||||
if wait: | |||||
for t in self._threads: | |||||
t.join(sys.maxint) | |||||
shutdown.__doc__ = _base.Executor.shutdown.__doc__ |
] | ] | ||||
for name in os.listdir(os.path.join('mercurial', 'templates')): | for name in os.listdir(os.path.join('mercurial', 'templates')): | ||||
if name != '__pycache__' and os.path.isdir( | if name != '__pycache__' and os.path.isdir( | ||||
os.path.join('mercurial', 'templates', name) | os.path.join('mercurial', 'templates', name) | ||||
): | ): | ||||
packages.append('mercurial.templates.%s' % name) | packages.append('mercurial.templates.%s' % name) | ||||
if sys.version_info[0] == 2: | |||||
packages.extend( | |||||
[ | |||||
'mercurial.thirdparty.concurrent', | |||||
'mercurial.thirdparty.concurrent.futures', | |||||
] | |||||
) | |||||
if 'HG_PY2EXE_EXTRA_INSTALL_PACKAGES' in os.environ: | if 'HG_PY2EXE_EXTRA_INSTALL_PACKAGES' in os.environ: | ||||
# py2exe can't cope with namespace packages very well, so we have to | # py2exe can't cope with namespace packages very well, so we have to | ||||
# install any hgext3rd.* extensions that we want in the final py2exe | # install any hgext3rd.* extensions that we want in the final py2exe | ||||
# image here. This is gross, but you gotta do what you gotta do. | # image here. This is gross, but you gotta do what you gotta do. | ||||
packages.extend(os.environ['HG_PY2EXE_EXTRA_INSTALL_PACKAGES'].split(' ')) | packages.extend(os.environ['HG_PY2EXE_EXTRA_INSTALL_PACKAGES'].split(' ')) | ||||
common_depends = [ | common_depends = [ | ||||
'mercurial/bitmanipulation.h', | 'mercurial/bitmanipulation.h', |