Concurrent futures with python 3.8

Purpose and overview

The concurrent.futures module provides a high-level interface for asynchronously executing callables.
The asynchronous execution can be performed with 2 flavors:
– threads, using ThreadPoolExecutor
– separate processes, using ProcessPoolExecutor.
Both implement the same interface, which is defined by the abstract Executor class.

Future Objects

class concurrent.futures.Future
It encapsulates the asynchronous execution of a callable.
Future instances are created by Executor.submit().
Future instance methods:
cancel():
Attempt to cancel the call.
If the call is currently being executed or finished running and cannot be cancelled then the method returns False, otherwise the call will be cancelled and the method returns True.

cancelled():
Return True if the call was successfully cancelled.

running():
Return True if the call is currently being executed and cannot be cancelled.

done():
Return True if the call was successfully cancelled or finished running.

result(timeout=None):
Return the value returned by the call.
This method waits up the call is completed.
If timeout is not specified or None, there is no limit to the wait time.
If timeout is specified( can be an int or float) and the call hasn’t completed at the timeout, then a concurrent.futures.TimeoutError will be raised.

If the future is cancelled before completing then CancelledError will be raised.
If the call raised an exception, this method will raise the same exception.

exception(timeout=None):
That method has a very close behavior to the above result() method.
The difference is that it doesn’t return the result but the exception raised by the call or None if no exception was raised.

add_done_callback(fn):
After some testing, it seems to be not very friendly to use because the order is not respected.

Executor Objects

class concurrent.futures.Executor:
An abstract class that provides methods to execute calls asynchronously.
It should not be used directly, but through its concrete subclasses.

Executor methods

submit(fn, /, *args, **kwargs)
Schedules the callable, fn, to be executed as fn(*args, **kwargs) and returns a Future object representing the execution of the callable.
Example:

from concurrent.futures import ThreadPoolExecutor, Future
 
with ThreadPoolExecutor(max_workers=1) as executor:
    future: Future[float] = executor.submit(pow, 2, 3)
    # The result() method of the future object waits while the result is not returned by the task
    print(f'future.result()={future.result()}')
    # future.result()=8

map(func, *iterables, timeout=None, chunksize=1)
Similar to map(func, *iterables) except:
– the iterables are collected immediately rather than lazily;
func is executed asynchronously and several calls to func may be made concurrently.
Advantages:
– a way to pass parameters of the function to call as iterables.
– a way to keep the order of submitted tasks.
But that method also has some specific limitations.
Drawbacks:
– any exception not caught in the function of the future will be risen during the iteration of its result.
Which can be annoying to process all results if it happens.
We will see it in the example part.

shutdown(wait=True, *, cancel_futures=False)
Signal the executor that it should free any resources that it is using when the currently pending futures are done executing.
Calls to Executor.submit() and Executor.map() made after shutdown will raise RuntimeError.

should we use a with statement or not?

You can avoid having to call shutdown() method explicitly if you use the with statement, which will shutdown the Executor (waiting as if Executor.shutdown() were called with wait set to True):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

In the general case using a with statement is a good idea things it will shut down for us the executor service at the right time(That is when all futures are completed or when the wait condition matches).
But in the case where we want to return early that is even if all futures are not completed, we need to declare explicitly our executor object.
And generally we will invoke shutdown() with the wait parameter to false.

ThreadPoolExecutor

It is an Executor subclass that uses a pool of threads to execute calls

Deadlocks risk

Deadlocks can occur when the callable associated with a Future waits on the results of another Future.
For example:

import time
from concurrent.futures import ThreadPoolExecutor
 
 
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5
 
def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6
 
 
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

Deadlocks can also occur when there is not enough workers to perform embedded tasks.
For example:

from concurrent.futures import ThreadPoolExecutor
 
 
def wait_on_future():
    print('start task')
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())
 
 
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)

Output:

start task
Should not happen here

ThreadPoolExecutor constructor

concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

All parameters of the constructor are optional.
max_workers: max concurrent threads to execute calls asynchronously.
default value: it depends on the python version. In version 3.8: min(32, os.cpu_count() + 4).
This default value preserves at least 5 workers for I/O bound tasks and at most 32 CPU cores for CPU bound tasks.
thread_name_prefix (close concept of the thread name in thread class) for easier debugging.
initializer : callable that is called at the start of each worker thread(similar to what we have in the thread class)
initargs is a tuple of arguments passed to the initializer.

Module level Functions

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED):
Wait for the Future instances to complete.
Parameters:
fs : Future instances to wait for completion.
Duplicate futures are ignored.
– optional timeout int or float : control the maximum number of seconds to wait before returning.
By default there is no time limit.
– optional return_when :indicates when this function should return.

It must be one of the following constants(The default value is ALL_COMPLETED):

Constant

 Description

FIRST_COMPLETED

return when any future finishes or is cancelled.

FIRST_EXCEPTION

return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED.

ALL_COMPLETED

return when all futures finish or are cancelled.

Returns a named 2-tuple of sets:
– The first set, named done, contains the futures that completed (finished or cancelled futures) .
– The second set, named not_done, contains the futures that did not complete (pending or running futures).

concurrent.futures.as_completed(fs, timeout=None)
return an iterator containing the completed futures while waiting other futures not completed yet.
Parameters:
fs : Future instances to process.
Duplicate futures are ignored.
– optional timeout int or float : The returned iterator raises a concurrent.futures.TimeoutError if __next__() is called and the result isn’t available after timeout seconds.
By default there is no time limit.
Returns: an iterator returning completed futures as soon as one is completed(completed means finished or cancelled futures)

concurrent.futures.as_completed() typical example

The idea is to execute concurrently multiple HTTP requests and to process the result of each callable as soon as it is completed

import concurrent.futures
import logging
import urllib.request
 
URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://address.that.dont.exist',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']
 
 
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
 
 
logging.basicConfig(level=logging.INFO, format='%(asctime)-15s - %(levelname)s : %(message)s')
logger = logging.getLogger()
 
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    logger.info('callables were just submitted')
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            logger.info('%r page is %d bytes' % (url, len(data)))
        except Exception as exc:
            logger.error('%r generated an exception: %s' % (url, exc))

Output:

2022-06-03 14:18:39,563 - INFO : callables were just submitted
2022-06-03 14:18:39,636 - ERROR : 'http://address.that.dont.exist' generated an exception: <urlopen error [Errno 11001] getaddrinfo failed>
2022-06-03 14:18:39,890 - INFO : 'http://some-made-up-domain.com/' page is 1261 bytes
2022-06-03 14:18:40,019 - INFO : 'http://www.foxnews.com/' page is 305141 bytes
2022-06-03 14:18:40,067 - INFO : 'http://www.bbc.co.uk/' page is 415865 bytes
2022-06-03 14:18:40,481 - INFO : 'http://www.cnn.com/' page is 1136832 bytes

concurrent.futures.as_completed() example where we want to exit early

The idea is to execute concurrently multiple HTTP requests and to process only the result of the first successful completed callable and when done exit the program.
We can see we have to explicitly shutdown the executor.
Doing that in the finally statement ensures that it would be performed whatever the case

import concurrent.futures
import concurrent.futures
import logging
import urllib.request
from concurrent.futures import Executor
from concurrent.futures import Future
from typing import Dict
 
URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://address.that.dont.exist',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']
 
 
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
 
 
logging.basicConfig(level=logging.INFO, format='%(asctime)-15s - %(levelname)s : %(message)s')
logger = logging.getLogger()
 
# We can use a with statement to ensure threads are cleaned up promptly
executor: Executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
# Start the load operations and mark each future with its URL
future_to_url: Dict[Future, str] = {executor.submit(load_url, url, 60): url for url in URLS}
 
logger.info('callables were just submitted')
 
try:
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            logger.info('%r page is %d bytes' % (url, len(data)))
            break
        except Exception as exc:
            logger.error('%r generated an exception: %s' % (url, exc))
finally:
    logger.info('we explicitly shutdown the executor')
    executor.shutdown(wait=False)
 
logger.info('we exited early')

Output:

2022-06-14 16:18:28,723 - INFO : callables were just submitted
2022-06-14 16:18:28,757 - ERROR : 'http://address.that.dont.exist' generated an exception: <urlopen error [Errno 11001] getaddrinfo failed>
2022-06-14 16:18:29,374 - INFO : 'http://some-made-up-domain.com/' page is 1111 bytes
2022-06-14 16:18:29,374 - INFO : we explicitly shutdown the executor
2022-06-14 16:18:29,374 - INFO : we exited early

concurrent.futures.wait() without timeout example

The idea is to submit tasks and to wait for all task to be completed to process their results.

import concurrent.futures
import logging
import urllib.request
from concurrent.futures import Future
from concurrent.futures._base import DoneAndNotDoneFutures
from typing import Dict, Any
 
URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://address.that.dont.exist',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']
 
 
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
 
 
logging.basicConfig(level=logging.INFO, format='%(asctime)-15s - %(levelname)s : %(message)s')
logger = logging.getLogger()
 
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url: Dict[Future, str] = {executor.submit(load_url, url, 60): url for url in URLS}
 
    logger.info('callables were just submitted')
    done_and_not_done_futures: DoneAndNotDoneFutures[Any] = concurrent.futures.wait(future_to_url)
    # logger.info(f'done_and_not_done_futures={done_and_not_done_futures}')
    # logger.handlers[0].flush()
    logger.info(f'not_done_futures:')
    for not_done_future in done_and_not_done_futures.not_done:
        logger.info(f'not_done_future={not_done_future}')
 
    logger.info(f'done_futures:')
    for done_future in done_and_not_done_futures.done:
        logger.info(f'done_future={done_future}')
        url = future_to_url[done_future]
        try:
            data = done_future.result()
            logger.info('%r page is %d bytes' % (url, len(data)))
        except Exception as exc:
            logger.error('%r generated an exception: %s' % (url, exc))

Output:

2022-06-06 16:14:59,071 - INFO : callables were just submitted
2022-06-06 16:14:59,979 - INFO : not_done_futures:
2022-06-06 16:14:59,980 - INFO : done_futures:
2022-06-06 16:14:59,980 - INFO : done_future=<Future at 0x1e90cdb5ca0 state=finished returned bytes>
2022-06-06 16:14:59,980 - INFO : 'http://some-made-up-domain.com/' page is 1261 bytes
2022-06-06 16:14:59,980 - INFO : done_future=<Future at 0x1e90cda48b0 state=finished returned bytes>
2022-06-06 16:14:59,980 - INFO : 'http://www.cnn.com/' page is 1137030 bytes
2022-06-06 16:14:59,980 - INFO : done_future=<Future at 0x1e90cd959a0 state=finished returned bytes>
2022-06-06 16:14:59,980 - INFO : 'http://www.foxnews.com/' page is 298929 bytes
2022-06-06 16:14:59,980 - INFO : done_future=<Future at 0x1e90cda4fa0 state=finished raised URLError>
2022-06-06 16:14:59,980 - ERROR : 'http://address.that.dont.exist' generated an exception: <urlopen error [Errno 11001] getaddrinfo failed>
2022-06-06 16:14:59,980 - INFO : done_future=<Future at 0x1e90cdb57f0 state=finished returned bytes>
2022-06-06 16:14:59,980 - INFO : 'http://www.bbc.co.uk/' page is 414250 bytes

Remarks on the output:
– Since we don’t use a timeout during the wait operation, after the wait() return we don’t have any not done futures.
– The order of return completed futures is not the same as the initial order of features submitted

concurrent.futures.wait() with timeout example

The idea is to submit tasks and to wait a small time for all task to be completed to process their results.
And then to submit tasks again and to wait that time without timeout.

import concurrent.futures
import logging
import urllib.request
from concurrent.futures import Future
from concurrent.futures._base import DoneAndNotDoneFutures
from typing import Dict, Any
from typing import Set
 
URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://address.that.dont.exist',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']
 
 
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
 
 
logging.basicConfig(level=logging.INFO, format='%(asctime)-15s - %(levelname)s : %(message)s')
logger = logging.getLogger()
 
 
def get_process_done_and_not_done_futures() -> Set[Future]:
    logger.info('get_process_done_and_not_done_futures()')
    logger.info(f'not_done_futures:')
    for not_done_future in done_and_not_done_futures.not_done:
        url = future_to_url[not_done_future]
        logger.info(f'not_done_future={not_done_future} for url : {url}')
    logger.info(f'done_futures:')
    for done_future in done_and_not_done_futures.done:
        url = future_to_url[done_future]
        logger.info(f'done_future={done_future} for url : {url}')
        try:
            data = done_future.result()
            logger.info('%r page is %d bytes' % (url, len(data)))
        except Exception as exc:
            logger.error('%r generated an exception: %s' % (url, exc))
 
    return done_and_not_done_futures.not_done
 
 
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url: Dict[Future, str] = {executor.submit(load_url, url, 60): url for url in URLS}
 
    logger.info('callables were just submitted')
    done_and_not_done_futures: DoneAndNotDoneFutures[Any] = concurrent.futures.wait(future_to_url,
                                                                                    timeout=0.5)
    not_done_futures: Set[Future] = get_process_done_and_not_done_futures()
 
    logger.info('---------------------')
    # is_all_processed: bool = not not_done_futures
    logger.info(f'is_all_processed={not not_done_futures} after first wait')
    if not_done_futures:
        done_and_not_done_futures: DoneAndNotDoneFutures[Any] = concurrent.futures.wait(
                not_done_futures)
        not_done_futures: Set[Future] = get_process_done_and_not_done_futures()
        logger.info('---------------------')
        logger.info(f'is_all_processed={not not_done_futures} after second wait')
 
#

output:

C:\Python39\python.exe C:/Users/david/AppData/Roaming/JetBrains/PyCharmCE2021.3/scratches/thread_pool_executor_wait_with_time_out.py
2022-06-07 12:46:07,152 - INFO : callables were just submitted
2022-06-07 12:46:07,661 - INFO : get_process_done_and_not_done_futures()
2022-06-07 12:46:07,661 - INFO : not_done_futures:
2022-06-07 12:46:07,661 - INFO : not_done_future=<Future at 0x1f87297c0d0 state=running> for url : http://www.cnn.com/
2022-06-07 12:46:07,661 - INFO : not_done_future=<Future at 0x1f87297cfd0 state=running> for url : http://www.bbc.co.uk/
2022-06-07 12:46:07,661 - INFO : done_futures:
2022-06-07 12:46:07,661 - INFO : done_future=<Future at 0x1f87297cac0 state=finished raised URLError> for url : http://address.that.dont.exist
2022-06-07 12:46:07,661 - ERROR : 'http://address.that.dont.exist' generated an exception: <urlopen error [Errno 11001] getaddrinfo failed>
2022-06-07 12:46:07,661 - INFO : done_future=<Future at 0x1f8729731c0 state=finished returned bytes> for url : http://www.foxnews.com/
2022-06-07 12:46:07,661 - INFO : 'http://www.foxnews.com/' page is 300490 bytes
2022-06-07 12:46:07,661 - INFO : done_future=<Future at 0x1f87298a4c0 state=finished returned bytes> for url : http://some-made-up-domain.com/
2022-06-07 12:46:07,662 - INFO : 'http://some-made-up-domain.com/' page is 1261 bytes
2022-06-07 12:46:07,662 - INFO : ---------------------
2022-06-07 12:46:07,662 - INFO : is_all_processed=False after first wait
2022-06-07 12:46:08,038 - INFO : get_process_done_and_not_done_futures()
2022-06-07 12:46:08,038 - INFO : not_done_futures:
2022-06-07 12:46:08,038 - INFO : done_futures:
2022-06-07 12:46:08,038 - INFO : done_future=<Future at 0x1f87297c0d0 state=finished returned bytes> for url : http://www.cnn.com/
2022-06-07 12:46:08,038 - INFO : 'http://www.cnn.com/' page is 1138892 bytes
2022-06-07 12:46:08,038 - INFO : done_future=<Future at 0x1f87297cfd0 state=finished returned bytes> for url : http://www.bbc.co.uk/
2022-06-07 12:46:08,038 - INFO : 'http://www.bbc.co.uk/' page is 415430 bytes
2022-06-07 12:46:08,038 - INFO : ---------------------
2022-06-07 12:46:08,038 - INFO : is_all_processed=True after second wait
 
Process finished with exit code 0

Remarks on the output:
– Here again, the order of return completed futures is not the same as the initial order of features submitted.
– We can see that the timeout of the wait() function allows to get results of some completed futures even if all are not finished and after that to continue to wait for remaining uncompleted futures

concurrent.futures.Executor.map()

we will execute requests as futures with the map() method.
For the sake of the example, we will provoke an exception in the second call and we will see how things happen.

import concurrent.futures
import logging
import urllib.request
 
URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://address.that.dont.exist',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']
 
TIMEOUTS = [1, 0.1, 1, 2, 1]
 
 
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    print(f'url, timeout={url, timeout}')
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
 
 
logging.basicConfig(level=logging.INFO, format='%(asctime)-15s - %(levelname)s : %(message)s')
logger = logging.getLogger()
 
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    try:
        # Start the load operations and mark each future with its URL
        for index, result in enumerate(executor.map(load_url, URLS, TIMEOUTS)):
            current_url: str = URLS[index]
            logger.info(f'url={current_url}, page is {len(result)} bytes')
 
    except Exception as e:
        logger.error('Task after url %r generated an exception: %s' % (current_url, e))

Output:

url, timeout=('http://www.foxnews.com/', 1)
url, timeout=('http://www.cnn.com/', 0.1)
url, timeout=('http://address.that.dont.exist', 1)
url, timeout=('http://www.bbc.co.uk/', 2)
url, timeout=('http://some-made-up-domain.com/', 1)
2022-06-07 17:18:38,016 - INFO : url=http://www.foxnews.com/, page is 305498 bytes
2022-06-07 17:18:38,016 - ERROR : Task after url 'http://www.foxnews.com/' generated an exception: <urlopen error _ssl.c:1105: The handshake operation timed out>

Remarks on the code:
– cnn request will provoke an exception because of the short open connection timeout.
Which is interesting here is that exception will not be caught after the map() call as we did for the wait() or the as_completed() method but before.
Indeed any exception not caught in the function of the future will be risen during the iteration of its result.
That’s why we need to try/catch the loop of the iterator itself.
– After the first exception detected in the iterator, no other futures is processed.
So if we want to process all futures, we have to catch any exception that could occur during the function of the future.

Still the map() function but now by catching any exception during the future function

import concurrent.futures
import logging
import urllib.request
 
URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://address.that.dont.exist',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']
 
TIMEOUTS = [1, 0.1, 1, 2, 1]
 
 
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    print(f'url, timeout={url, timeout}')
 
    try:
        with urllib.request.urlopen(url, timeout=timeout) as conn:
            return conn.read()
    except Exception as e:
        logger.error('Task after url %r generated an exception: %s' % (url, e))
        return None
 
 
logging.basicConfig(level=logging.INFO, format='%(asctime)-15s - %(levelname)s : %(message)s')
logger = logging.getLogger()
 
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    for index, result in enumerate(executor.map(load_url, URLS, TIMEOUTS)):
        current_url: str = URLS[index]
        if result:
            logger.info(f'url={current_url}, page is {len(result)} bytes')
        else :
            logger.info(f'url={current_url}, exception detected')

Output:

url, timeout=('http://www.foxnews.com/', 1)
url, timeout=('http://www.cnn.com/', 0.1)
url, timeout=('http://address.that.dont.exist', 1)
url, timeout=('http://www.bbc.co.uk/', 2)
url, timeout=('http://some-made-up-domain.com/', 1)
2022-06-07 18:07:48,336 - ERROR : Task after url 'http://address.that.dont.exist' generated an exception: <urlopen error [Errno 11001] getaddrinfo failed>
2022-06-07 18:07:48,693 - ERROR : Task after url 'http://www.cnn.com/' generated an exception: <urlopen error _ssl.c:1105: The handshake operation timed out>
2022-06-07 18:07:48,779 - INFO : url=http://www.foxnews.com/, page is 311385 bytes
2022-06-07 18:07:48,779 - INFO : url=http://www.cnn.com/, exception detected
2022-06-07 18:07:48,780 - INFO : url=http://address.that.dont.exist, exception detected
2022-06-07 18:07:48,780 - INFO : url=http://www.bbc.co.uk/, page is 421812 bytes
2022-06-07 18:07:48,780 - INFO : url=http://some-made-up-domain.com/, page is 1261 bytes
Ce contenu a été publié dans Non classé. Vous pouvez le mettre en favoris avec ce permalien.

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *