Python – Asyncio

Purpose and overview

asyncio is a library to write concurrent code using the async/await syntax.
At its core, it relies on an event loop model processing coroutines, so it uses only one thread.

It is is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.

When use asyncio ?
For IO-bound code.
But things are not so simple. See the limitations point below.

asyncio provides two API flavors

high-level APIs :
– run Python coroutines concurrently and have full control over their execution
– perform network IO and IPC
– control subprocesses
– distribute tasks via queues
– synchronize concurrent code

low-level APIs (APIs mainly for library and framework developers) :
– create and manage event loops, which provide asynchronous APIs for networking, running subprocesses, handling OS signals, etc
– implement efficient protocols using transports
– bridge callback-based libraries and code with async/await syntax

Asyncio limitations

Asyncio High-Level api not efficient with blocking libraries and blocking functions

Requests library or any other blocking libraries/functions are not efficient with asyncio because these block the one and only thread (the event loop).
Indeed to benefit natively of asyncio (it means without the explicit use of loop object and taskexecutor/threads under the hood), all IO functions that we call such as file reading/writing API, http request processing API have to be implemented with asyncio.
That’s why some libaries that originally didn’t support asyncio have created a version specific to asyncio (ex: aiofiles, aiohttp…)
As alternative, to overcome blocking functions, we use threads

Combine asyncio with threads. Possible ?

asyncio and threads are very distinct API and are not straightly mixable.
To use thread inside asyncio, we have mainly two possibilities :
– use a ThreadPoolExecutor to allow the request to be executed by a thread.
– asyncio low-level api combinated to BaseEventLoop.run_in_executor that run each tasks inside a thread of the pool (not in the only one event loop thread). Under the cover, it uses ThreadPoolExecutor but frees the developer to manipulate low-level

Coroutine and Task (high level)

Coroutine

How to declare a coroutine ?
Prefix the function with the async keyword.

How to run a coroutine ?
A single time we do :
asyncio.run() function to run the top-level “main()” coroutine function

Then we use the one or the other way according to the requirement :
awaiting on other coroutines.
asyncio.create_task() function to run coroutines concurrently as asyncio Tasks.

Awaitable

An object is « awaitable » if if can be used in an await expression.
There are three main types of awaitable objects:
– coroutines
– Tasks
– Futures

coroutines
Coroutines can be awaited from other coroutines.
await waits, so alone it doesn’t allow concurrent executions of coroutines.

Tasks
Tasks are used to schedule coroutines concurrently.
Tasks are automatically scheduled to run soon.
2 main ways to create tasks :
– Pass a coroutine (and its args) to asyncio.create_task().
asyncio.create_task() doesn’t require to await the task to execute it.
While we may await it to process the result or to block the process at a specific moment because the task result is required.
– Pass a set of coroutines (and theirs args) to await asyncio.gather().
If all awaitables are completed successfully, the result is an aggregate list of returned values.
The order of result values corresponds to the order of awaitables.

Futures
A Future is a low-level awaitable object that represents an eventual result of an asynchronous operation.
When a Future object is awaited it means that the coroutine will wait until the Future is resolved in some other place.
Future objects in asyncio are needed to allow callback-based code to be used with async/await

Couroutines non-concurrent execution examples

Here 2 examples :
1) execution of foo_coroutine()
2) sequential execution of 3 calls to foo_coroutine()

import asyncio
import time
from asyncio import sleep
from collections.abc import Awaitable
 
 
async def foo_coroutine(name):
    await sleep(2)
    return f'hello {name}'
 
 
async def main():
    # 1. execution of foo_coroutine()  -> take 2 secs (2sec * 1)
    print(f"1 coroutine executed, started at {time.strftime('%X')}")
    # Incorrect : Nothing happens because we just call foo_coroutine().
    foo_coroutine("david")
    # Correct : we await on foo_coroutine():
    print(await foo_coroutine("david"))  # print "hello david".
    print(f"finished at {time.strftime('%X')}")
 
    # 2. execution of 3 call to foo_coroutine() -> take 6 secs (2sec * 3)
    # That is sequential because await is blocking
    print(f"3 coroutines executed sequentially by default, started at : {time.strftime('%X')}")
    coroutine: Awaitable = foo_coroutine("david")
    coroutine_two: Awaitable = foo_coroutine("jane")
    coroutine_three: Awaitable = foo_coroutine("john")
    print(await coroutine + ", " + await coroutine_two + ", " + await coroutine_three)
    print(f"finished at {time.strftime('%X')}")
 
 
asyncio.run(main())

Output :

1 coroutine executed, started at 13:32:24
async_io_non_concurrent_examples.py:16: RuntimeWarning: coroutine 'foo_coroutine' was never awaited
  foo_coroutine("david")
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
hello david
finished at 13:32:26
3 coroutines executed sequentially by default, started at : 13:32:26
hello david, hello jane, hello john
finished at 13:32:32

Task concurrent execution examples

As reminder, tasks are always executed concurrently.
Here 2 examples :
1) concurrent execution of 3 calls to foo_coroutine() with asyncio.create_task()
2) concurrent execution of 3 calls to foo_coroutine() with await asyncio.gather()

import asyncio
import time
from asyncio import sleep
from collections.abc import Awaitable
 
 
async def foo_coroutine(name):
    await sleep(2)
    return f'hello {name}'
 
 
async def main():
    # 1. execution of 3 concurrent calls to to foo_coroutine() with await asyncio.gather()
    # -> take 2 secs (2sec * 3 in parallel = 2 sec)
    # That is concurrent because await asyncio.gather() executes tasks concurrently
    print(f"3 coroutines executed concurrently with asyncio.gather(), started at : {time.strftime('%X')}")
    coroutine: Awaitable = foo_coroutine("david")
    coroutine_two: Awaitable = foo_coroutine("jane")
    coroutine_three: Awaitable = foo_coroutine("john")
    result_aggregated = await asyncio.gather(coroutine, coroutine_two, coroutine_three)
    print(f'result_aggregated={result_aggregated}')
    print(f"finished at {time.strftime('%X')}")
 
    # 2. execution of 3 concurrent calls to to foo_coroutine() with asyncio.create_task()
    # -> take 2 secs (2sec * 3 in parallel = 2 sec)
    print(f"3 coroutines executed concurrently with asyncio.gather(), started at : {time.strftime('%X')}")
    coroutine: Awaitable = foo_coroutine("david")
    coroutine_two: Awaitable = foo_coroutine("jane")
    coroutine_three: Awaitable = foo_coroutine("john")
    result = asyncio.create_task(coroutine)
    result_two = asyncio.create_task(coroutine_two)
    result_three = asyncio.create_task(coroutine_three)
 
    # Optional, we can await on the tasks.
    # For example, suppose we want to process all result but for the third task
    await_result = await result
    await_result_two = await result_two
    await result_three
 
    print(f'await_result={await_result}')
    print(f'await_result_two={await_result_two}')
    print(f"finished at {time.strftime('%X')}")
 
asyncio.run(main())
 

Output:

3 coroutines executed concurrently with asyncio.gather(), started at : 13:41:37
result_aggregated=['hello david', 'hello jane', 'hello john']
finished at 13:41:39
3 coroutines executed concurrently with asyncio.gather(), started at : 13:41:39
await_result=hello david
await_result_two=hello jane
finished at 13:41:41

Event Loop(Low-level API)

Overview

The event loop is the core of every asyncio application.
Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. We may manipulate the event loop via the loop object.
Under the cover, the High level API manipulate the loop when we call methods like asyncio.run().

Obtain the event loop

Get the running event loop in the current OS thread (have to be called from a coroutine or a callback):
loop = syncio.get_running_loop()

Set the current event loop for the current OS thread :
asyncio.set_event_loop(loop)

Create a new event loop object :
loop = asyncio.new_event_loop()

Executing code in thread or process pools

Send a func to be called in the specified executor :
awaitable loop.run_in_executor(executor, func, *args)
executor argument : should be a concurrent.futures.Executor instance.
The default executor is used if executor is None.
args argument : arguments of the function to execute.

Loop.run_in_executor() concurrent execution examples

The idea is using low-level asyncio api when we call blocking IO functions in order to benefit from concurrency thanks to threads.
For example that is the case when we call inside foo_coroutine() the time.sleep() blocking function instead of the asyncio.sleep() not blocking function.

import asyncio
import time
from collections.abc import Awaitable
 
 
# blocking call : async not required here because blocking call (time.sleep)
def foo_blocking(name):
    time.sleep(2)
    return f'hello {name}'
 
 
async def main():
    loop = asyncio.get_running_loop()
 
    # execution of 3 concurrent calls to foo_blocking() with loop.run_in_executor()
    # -> take 2 secs (2sec * 3 in parallel = 2 sec)
    print(f"3 call of function executed concurrently with loop.run_in_executor(), started at : "
          f"{time.strftime('%X')}")
    task_one: Awaitable = loop.run_in_executor(None, foo_blocking, "my friend")
    task_two: Awaitable = loop.run_in_executor(None, foo_blocking, "my dear friend")
    task_three: Awaitable = loop.run_in_executor(None, foo_blocking, "my very dear friend")
 
    # Optional, we can await on the tasks.
    # For example, suppose we want to process all result but for the third task
    task_one_result = await task_one
    task_two_result = await task_two
 
    print(f'task_one_result={task_one_result}')
    print(f'task_two_result={task_two_result}')
    print(f"finished at {time.strftime('%X')}")
 
 
asyncio.run(main())

Output :

3 call of function executed concurrently with loop.run_in_executor(), started at : 13:44:52
task_one_result=hello my friend
task_two_result=hello my dear friend
finished at 13:44:54
Ce contenu a été publié dans Non classé. Vous pouvez le mettre en favoris avec ce permalien.

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *