Purpose and overview
The concurrent.futures
module provides a high-level interface for asynchronously
executing callables.
The asynchronous execution can be performed with 2 flavors:
– threads, using ThreadPoolExecutor
– separate processes, using ProcessPoolExecutor
.
Both implement the same interface, which is defined by the abstract Executor
class.
Future Objects
class concurrent.futures.Future
It encapsulates the asynchronous execution of a callable.
Future
instances are created by Executor.submit()
.
Future instance methods:
cancel()
:
Attempt to cancel the call.
If the call is currently being executed or finished running and cannot be cancelled then the
method returns False
, otherwise the call will be cancelled and the method returns
True
.
cancelled()
:
Return True if the call was successfully cancelled.
running()
:
Return True if the call is currently being executed and cannot be cancelled.
done()
:
Return True if the call was successfully cancelled or finished running.
result(timeout=None)
:
Return the value returned by the call.
This method waits up the call is completed.
If timeout is not specified or None, there is no limit to the wait time.
If timeout is specified( can be an int
or float
) and the call hasn’t
completed at the timeout, then a
concurrent.futures.TimeoutError
will be raised.
If the future is cancelled before completing then CancelledError will be raised.
If the call raised an exception, this method will raise the same exception.
exception(timeout=None)
:
That method has a very close behavior to the above result()
method.
The difference is that it doesn’t return the result but the exception raised by the call or
None
if no exception was raised.
add_done_callback(fn)
:
After some testing, it seems to be not very friendly to use because the order is not respected.
Executor Objects
class concurrent.futures.Executor
:
An abstract class that provides methods to execute calls asynchronously.
It should not be used directly, but through its concrete subclasses.
Executor methods
submit(fn, /, *args, **kwargs)
Schedules the callable, fn, to be executed as fn(*args, **kwargs) and returns a Future object
representing the execution of the callable.
Example:
from concurrent.futures import ThreadPoolExecutor, Future with ThreadPoolExecutor(max_workers=1) as executor: future: Future[float] = executor.submit(pow, 2, 3) # The result() method of the future object waits while the result is not returned by the task print(f'future.result()={future.result()}') # future.result()=8 |
map(func, *iterables, timeout=None, chunksize=1)
Similar to map(func, *iterables)
except:
– the iterables
are collected immediately rather than lazily;
– func
is executed asynchronously and several calls to func
may be
made concurrently.
Advantages:
– a way to pass parameters of the function to call as iterables.
– a way to keep the order of submitted tasks.
But that method also has some specific limitations.
Drawbacks:
– any exception not caught in the function of the future will be risen during the iteration of
its result.
Which can be annoying to process all results if it happens.
We will see it in the example part.
shutdown(wait=True, *, cancel_futures=False)
Signal the executor that it should free any resources that it is using when the currently
pending futures are done executing.
Calls to Executor.submit()
and Executor.map()
made after shutdown will
raise RuntimeError
.
should we use a with statement or not?
You can avoid having to call shutdown()
method explicitly if you use the
with
statement, which will shutdown the Executor
(waiting as if Executor.shutdown()
were called
with wait set to True
):
import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt') |
In the general case using a with statement is a good idea things it will shut down for us the
executor service at the right time(That is when all futures are completed or when the wait
condition matches).
But in the case where we want to return early that is even if all futures are not completed, we
need to declare explicitly our executor object.
And generally we will invoke shutdown()
with
the wait
parameter to false
.
ThreadPoolExecutor
It is an Executor subclass that uses a pool of threads to execute calls
Deadlocks risk
Deadlocks can occur when the callable associated with a Future waits on the results of another
Future.
For example:
import time from concurrent.futures import ThreadPoolExecutor def wait_on_b(): time.sleep(5) print(b.result()) # b will never complete because it is waiting on a. return 5 def wait_on_a(): time.sleep(5) print(a.result()) # a will never complete because it is waiting on b. return 6 executor = ThreadPoolExecutor(max_workers=2) a = executor.submit(wait_on_b) b = executor.submit(wait_on_a) |
Deadlocks can also occur when there is not enough workers to perform embedded tasks.
For example:
from concurrent.futures import ThreadPoolExecutor def wait_on_future(): print('start task') f = executor.submit(pow, 5, 2) # This will never complete because there is only one worker thread and # it is executing this function. print(f.result()) executor = ThreadPoolExecutor(max_workers=1) executor.submit(wait_on_future) |
Output:
start task Should not happen here |
ThreadPoolExecutor constructor
concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='',
initializer=None, initargs=())
All parameters of the constructor are optional.
– max_workers
: max concurrent threads to execute calls asynchronously.
default value: it depends on the python version. In version 3.8: min(32, os.cpu_count() +
4)
.
This default value preserves at least 5 workers for I/O bound tasks and at most 32 CPU cores for
CPU bound tasks.
– thread_name_prefix
(close concept of the thread name in thread class) for easier
debugging.
– initializer
: callable that is called at the start of each worker
thread(similar to what we have in the thread class)
– initargs
is a tuple of arguments passed to the initializer.
Module level Functions
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
:
Wait for the Future instances to complete.
Parameters:
– fs
: Future instances to wait for completion.
Duplicate futures are ignored.
– optional timeout
int
or float
: control the maximum
number of seconds to wait before returning.
By default there is no time limit.
– optional return_when
:indicates when this function should return.
It must be one of the following constants(The default value is ALL_COMPLETED
):
Constant |
Description |
---|---|
|
return when any future finishes or is cancelled. |
|
return when any future finishes by raising an exception. If no
future raises an exception then it is equivalent to |
|
return when all futures finish or are cancelled. |
Returns a named 2-tuple of sets:
– The first set, named done
, contains the futures that completed (finished or
cancelled futures) .
– The second set, named not_done
, contains the futures that did not complete
(pending or running futures).
concurrent.futures.as_completed(fs, timeout=None)
return an iterator containing the completed futures while waiting other futures not completed
yet.
Parameters:
–
fs
: Future instances to process.
Duplicate futures are ignored.
– optional timeout
int
or
float
: The returned iterator raises a concurrent.futures.TimeoutError
if
__next__()
is called and the result isn’t available after timeout seconds.
By default there is no time limit.
Returns: an iterator returning completed futures as soon as one is completed(completed
means finished or cancelled futures)
concurrent.futures.as_completed() typical example
The idea is to execute concurrently multiple HTTP requests and to process the result of each callable as soon as it is completed
import concurrent.futures import logging import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://address.that.dont.exist', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] # Retrieve a single page and report the URL and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() logging.basicConfig(level=logging.INFO, format='%(asctime)-15s - %(levelname)s : %(message)s') logger = logging.getLogger() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} logger.info('callables were just submitted') for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() logger.info('%r page is %d bytes' % (url, len(data))) except Exception as exc: logger.error('%r generated an exception: %s' % (url, exc)) |
Output:
2022-06-03 14:18:39,563 - INFO : callables were just submitted 2022-06-03 14:18:39,636 - ERROR : 'http://address.that.dont.exist' generated an exception: <urlopen error [Errno 11001] getaddrinfo failed> 2022-06-03 14:18:39,890 - INFO : 'http://some-made-up-domain.com/' page is 1261 bytes 2022-06-03 14:18:40,019 - INFO : 'http://www.foxnews.com/' page is 305141 bytes 2022-06-03 14:18:40,067 - INFO : 'http://www.bbc.co.uk/' page is 415865 bytes 2022-06-03 14:18:40,481 - INFO : 'http://www.cnn.com/' page is 1136832 bytes |
concurrent.futures.as_completed() example where we want to exit early
The idea is to execute concurrently multiple HTTP requests and to process only the result of the
first successful completed callable and when done exit the program.
We can see we have to explicitly shutdown the executor.
Doing that in the finally
statement ensures that it would be performed whatever the
case
import concurrent.futures import concurrent.futures import logging import urllib.request from concurrent.futures import Executor from concurrent.futures import Future from typing import Dict URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://address.that.dont.exist', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] # Retrieve a single page and report the URL and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() logging.basicConfig(level=logging.INFO, format='%(asctime)-15s - %(levelname)s : %(message)s') logger = logging.getLogger() # We can use a with statement to ensure threads are cleaned up promptly executor: Executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) # Start the load operations and mark each future with its URL future_to_url: Dict[Future, str] = {executor.submit(load_url, url, 60): url for url in URLS} logger.info('callables were just submitted') try: for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() logger.info('%r page is %d bytes' % (url, len(data))) break except Exception as exc: logger.error('%r generated an exception: %s' % (url, exc)) finally: logger.info('we explicitly shutdown the executor') executor.shutdown(wait=False) logger.info('we exited early') |
Output:
2022-06-14 16:18:28,723 - INFO : callables were just submitted 2022-06-14 16:18:28,757 - ERROR : 'http://address.that.dont.exist' generated an exception: <urlopen error [Errno 11001] getaddrinfo failed> 2022-06-14 16:18:29,374 - INFO : 'http://some-made-up-domain.com/' page is 1111 bytes 2022-06-14 16:18:29,374 - INFO : we explicitly shutdown the executor 2022-06-14 16:18:29,374 - INFO : we exited early |
concurrent.futures.wait() without timeout example
The idea is to submit tasks and to wait for all task to be completed to process their results.
import concurrent.futures import logging import urllib.request from concurrent.futures import Future from concurrent.futures._base import DoneAndNotDoneFutures from typing import Dict, Any URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://address.that.dont.exist', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] # Retrieve a single page and report the URL and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() logging.basicConfig(level=logging.INFO, format='%(asctime)-15s - %(levelname)s : %(message)s') logger = logging.getLogger() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url: Dict[Future, str] = {executor.submit(load_url, url, 60): url for url in URLS} logger.info('callables were just submitted') done_and_not_done_futures: DoneAndNotDoneFutures[Any] = concurrent.futures.wait(future_to_url) # logger.info(f'done_and_not_done_futures={done_and_not_done_futures}') # logger.handlers[0].flush() logger.info(f'not_done_futures:') for not_done_future in done_and_not_done_futures.not_done: logger.info(f'not_done_future={not_done_future}') logger.info(f'done_futures:') for done_future in done_and_not_done_futures.done: logger.info(f'done_future={done_future}') url = future_to_url[done_future] try: data = done_future.result() logger.info('%r page is %d bytes' % (url, len(data))) except Exception as exc: logger.error('%r generated an exception: %s' % (url, exc)) |
Output:
2022-06-06 16:14:59,071 - INFO : callables were just submitted 2022-06-06 16:14:59,979 - INFO : not_done_futures: 2022-06-06 16:14:59,980 - INFO : done_futures: 2022-06-06 16:14:59,980 - INFO : done_future=<Future at 0x1e90cdb5ca0 state=finished returned bytes> 2022-06-06 16:14:59,980 - INFO : 'http://some-made-up-domain.com/' page is 1261 bytes 2022-06-06 16:14:59,980 - INFO : done_future=<Future at 0x1e90cda48b0 state=finished returned bytes> 2022-06-06 16:14:59,980 - INFO : 'http://www.cnn.com/' page is 1137030 bytes 2022-06-06 16:14:59,980 - INFO : done_future=<Future at 0x1e90cd959a0 state=finished returned bytes> 2022-06-06 16:14:59,980 - INFO : 'http://www.foxnews.com/' page is 298929 bytes 2022-06-06 16:14:59,980 - INFO : done_future=<Future at 0x1e90cda4fa0 state=finished raised URLError> 2022-06-06 16:14:59,980 - ERROR : 'http://address.that.dont.exist' generated an exception: <urlopen error [Errno 11001] getaddrinfo failed> 2022-06-06 16:14:59,980 - INFO : done_future=<Future at 0x1e90cdb57f0 state=finished returned bytes> 2022-06-06 16:14:59,980 - INFO : 'http://www.bbc.co.uk/' page is 414250 bytes |
Remarks on the output:
– Since we don’t use a timeout during the wait operation, after the wait()
return
we don’t have any not done
futures.
– The order of return completed futures is not the same as the initial order of features
submitted
concurrent.futures.wait() with timeout example
The idea is to submit tasks and to wait a small time for all task to be completed to process
their results.
And then to submit tasks again and to wait that time without timeout.
import concurrent.futures import logging import urllib.request from concurrent.futures import Future from concurrent.futures._base import DoneAndNotDoneFutures from typing import Dict, Any from typing import Set URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://address.that.dont.exist', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] # Retrieve a single page and report the URL and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() logging.basicConfig(level=logging.INFO, format='%(asctime)-15s - %(levelname)s : %(message)s') logger = logging.getLogger() def get_process_done_and_not_done_futures() -> Set[Future]: logger.info('get_process_done_and_not_done_futures()') logger.info(f'not_done_futures:') for not_done_future in done_and_not_done_futures.not_done: url = future_to_url[not_done_future] logger.info(f'not_done_future={not_done_future} for url : {url}') logger.info(f'done_futures:') for done_future in done_and_not_done_futures.done: url = future_to_url[done_future] logger.info(f'done_future={done_future} for url : {url}') try: data = done_future.result() logger.info('%r page is %d bytes' % (url, len(data))) except Exception as exc: logger.error('%r generated an exception: %s' % (url, exc)) return done_and_not_done_futures.not_done # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url: Dict[Future, str] = {executor.submit(load_url, url, 60): url for url in URLS} logger.info('callables were just submitted') done_and_not_done_futures: DoneAndNotDoneFutures[Any] = concurrent.futures.wait(future_to_url, timeout=0.5) not_done_futures: Set[Future] = get_process_done_and_not_done_futures() logger.info('---------------------') # is_all_processed: bool = not not_done_futures logger.info(f'is_all_processed={not not_done_futures} after first wait') if not_done_futures: done_and_not_done_futures: DoneAndNotDoneFutures[Any] = concurrent.futures.wait( not_done_futures) not_done_futures: Set[Future] = get_process_done_and_not_done_futures() logger.info('---------------------') logger.info(f'is_all_processed={not not_done_futures} after second wait') # |
output:
C:\Python39\python.exe C:/Users/david/AppData/Roaming/JetBrains/PyCharmCE2021.3/scratches/thread_pool_executor_wait_with_time_out.py 2022-06-07 12:46:07,152 - INFO : callables were just submitted 2022-06-07 12:46:07,661 - INFO : get_process_done_and_not_done_futures() 2022-06-07 12:46:07,661 - INFO : not_done_futures: 2022-06-07 12:46:07,661 - INFO : not_done_future=<Future at 0x1f87297c0d0 state=running> for url : http://www.cnn.com/ 2022-06-07 12:46:07,661 - INFO : not_done_future=<Future at 0x1f87297cfd0 state=running> for url : http://www.bbc.co.uk/ 2022-06-07 12:46:07,661 - INFO : done_futures: 2022-06-07 12:46:07,661 - INFO : done_future=<Future at 0x1f87297cac0 state=finished raised URLError> for url : http://address.that.dont.exist 2022-06-07 12:46:07,661 - ERROR : 'http://address.that.dont.exist' generated an exception: <urlopen error [Errno 11001] getaddrinfo failed> 2022-06-07 12:46:07,661 - INFO : done_future=<Future at 0x1f8729731c0 state=finished returned bytes> for url : http://www.foxnews.com/ 2022-06-07 12:46:07,661 - INFO : 'http://www.foxnews.com/' page is 300490 bytes 2022-06-07 12:46:07,661 - INFO : done_future=<Future at 0x1f87298a4c0 state=finished returned bytes> for url : http://some-made-up-domain.com/ 2022-06-07 12:46:07,662 - INFO : 'http://some-made-up-domain.com/' page is 1261 bytes 2022-06-07 12:46:07,662 - INFO : --------------------- 2022-06-07 12:46:07,662 - INFO : is_all_processed=False after first wait 2022-06-07 12:46:08,038 - INFO : get_process_done_and_not_done_futures() 2022-06-07 12:46:08,038 - INFO : not_done_futures: 2022-06-07 12:46:08,038 - INFO : done_futures: 2022-06-07 12:46:08,038 - INFO : done_future=<Future at 0x1f87297c0d0 state=finished returned bytes> for url : http://www.cnn.com/ 2022-06-07 12:46:08,038 - INFO : 'http://www.cnn.com/' page is 1138892 bytes 2022-06-07 12:46:08,038 - INFO : done_future=<Future at 0x1f87297cfd0 state=finished returned bytes> for url : http://www.bbc.co.uk/ 2022-06-07 12:46:08,038 - INFO : 'http://www.bbc.co.uk/' page is 415430 bytes 2022-06-07 12:46:08,038 - INFO : --------------------- 2022-06-07 12:46:08,038 - INFO : is_all_processed=True after second wait Process finished with exit code 0 |
Remarks on the output:
– Here again, the order of return completed futures is not the same as the initial order of
features
submitted.
– We can see that the timeout of the wait()
function allows to get results of some
completed futures even if all are not
finished and after that to continue to wait for remaining uncompleted futures
concurrent.futures.Executor.map()
we will execute requests as futures with the map()
method.
For the sake of the example, we will provoke an exception in the second call and we will see how
things happen.
import concurrent.futures import logging import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://address.that.dont.exist', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] TIMEOUTS = [1, 0.1, 1, 2, 1] # Retrieve a single page and report the URL and contents def load_url(url, timeout): print(f'url, timeout={url, timeout}') with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() logging.basicConfig(level=logging.INFO, format='%(asctime)-15s - %(levelname)s : %(message)s') logger = logging.getLogger() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: try: # Start the load operations and mark each future with its URL for index, result in enumerate(executor.map(load_url, URLS, TIMEOUTS)): current_url: str = URLS[index] logger.info(f'url={current_url}, page is {len(result)} bytes') except Exception as e: logger.error('Task after url %r generated an exception: %s' % (current_url, e)) |
Output:
url, timeout=('http://www.foxnews.com/', 1) url, timeout=('http://www.cnn.com/', 0.1) url, timeout=('http://address.that.dont.exist', 1) url, timeout=('http://www.bbc.co.uk/', 2) url, timeout=('http://some-made-up-domain.com/', 1) 2022-06-07 17:18:38,016 - INFO : url=http://www.foxnews.com/, page is 305498 bytes 2022-06-07 17:18:38,016 - ERROR : Task after url 'http://www.foxnews.com/' generated an exception: <urlopen error _ssl.c:1105: The handshake operation timed out> |
Remarks on the code:
– cnn request will provoke an exception because of the short open connection timeout.
Which is interesting here is that exception will not be caught after the map()
call
as we did for the wait()
or the as_completed()
method but before.
Indeed any exception not caught in the function of the future will be risen during the iteration
of its result.
That’s why we need to try/catch the loop of the iterator itself.
– After the first exception detected in the iterator, no other futures is processed.
So if we want to process all futures, we have to catch any exception that could occur during the
function of the future.
Still the map() function but now by catching any exception during the future function
import concurrent.futures import logging import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://address.that.dont.exist', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] TIMEOUTS = [1, 0.1, 1, 2, 1] # Retrieve a single page and report the URL and contents def load_url(url, timeout): print(f'url, timeout={url, timeout}') try: with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() except Exception as e: logger.error('Task after url %r generated an exception: %s' % (url, e)) return None logging.basicConfig(level=logging.INFO, format='%(asctime)-15s - %(levelname)s : %(message)s') logger = logging.getLogger() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL for index, result in enumerate(executor.map(load_url, URLS, TIMEOUTS)): current_url: str = URLS[index] if result: logger.info(f'url={current_url}, page is {len(result)} bytes') else : logger.info(f'url={current_url}, exception detected') |
Output:
url, timeout=('http://www.foxnews.com/', 1) url, timeout=('http://www.cnn.com/', 0.1) url, timeout=('http://address.that.dont.exist', 1) url, timeout=('http://www.bbc.co.uk/', 2) url, timeout=('http://some-made-up-domain.com/', 1) 2022-06-07 18:07:48,336 - ERROR : Task after url 'http://address.that.dont.exist' generated an exception: <urlopen error [Errno 11001] getaddrinfo failed> 2022-06-07 18:07:48,693 - ERROR : Task after url 'http://www.cnn.com/' generated an exception: <urlopen error _ssl.c:1105: The handshake operation timed out> 2022-06-07 18:07:48,779 - INFO : url=http://www.foxnews.com/, page is 311385 bytes 2022-06-07 18:07:48,779 - INFO : url=http://www.cnn.com/, exception detected 2022-06-07 18:07:48,780 - INFO : url=http://address.that.dont.exist, exception detected 2022-06-07 18:07:48,780 - INFO : url=http://www.bbc.co.uk/, page is 421812 bytes 2022-06-07 18:07:48,780 - INFO : url=http://some-made-up-domain.com/, page is 1261 bytes |