Purpose and overview
asyncio is a library to write concurrent code using the async/await syntax.
At its core, it relies on an event loop model processing coroutines, so it uses only one thread.
It is is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.
When use asyncio ?
For IO-bound code.
But things are not so simple. See the limitations point below.
asyncio provides two API flavors
high-level APIs :
– run Python coroutines concurrently and have full control over their execution
– perform network IO and IPC
– control subprocesses
– distribute tasks via queues
– synchronize concurrent code
low-level APIs (APIs mainly for library and framework developers) :
– create and manage event loops, which provide asynchronous APIs for networking, running subprocesses, handling OS signals, etc
– implement efficient protocols using transports
– bridge callback-based libraries and code with async/await syntax
Asyncio limitations
Asyncio High-Level api not efficient with blocking libraries and blocking functions
Requests library or any other blocking libraries/functions are not efficient with asyncio because these block the one and only thread (the event loop).
Indeed to benefit natively of asyncio (it means without the explicit use of loop object and taskexecutor/threads under the hood), all IO functions that we call such as file reading/writing API, http request processing API have to be implemented with asyncio.
That’s why some libaries that originally didn’t support asyncio have created a version specific to asyncio (ex: aiofiles, aiohttp…)
As alternative, to overcome blocking functions, we use threads
Combine asyncio with threads. Possible ?
asyncio and threads are very distinct API and are not straightly mixable.
To use thread inside asyncio, we have mainly two possibilities :
– use a ThreadPoolExecutor
to allow the request to be executed by a thread.
– asyncio low-level api combinated to BaseEventLoop.run_in_executor
that run each tasks inside a thread of the pool (not in the only one event loop thread). Under the cover, it uses ThreadPoolExecutor but frees the developer to manipulate low-level
Coroutine and Task (high level)
Coroutine
How to declare a coroutine ?
Prefix the function with the async
keyword.
How to run a coroutine ?
A single time we do :
– asyncio.run()
function to run the top-level “main()” coroutine function
Then we use the one or the other way according to the requirement :
– awaiting
on other coroutines.
– asyncio.create_task()
function to run coroutines concurrently as asyncio Tasks.
Awaitable
An object is « awaitable » if if can be used in an await
expression.
There are three main types of awaitable objects:
– coroutines
– Tasks
– Futures
coroutines
Coroutines can be awaited from other coroutines.
await
waits, so alone it doesn’t allow concurrent executions of coroutines.
Tasks
Tasks are used to schedule coroutines concurrently.
Tasks are automatically scheduled to run soon.
2 main ways to create tasks :
– Pass a coroutine (and its args) to asyncio.create_task().
asyncio.create_task()
doesn’t require to await the task to execute it.
While we may await it to process the result or to block the process at a specific moment because the task result is required.
– Pass a set of coroutines (and theirs args) to await asyncio.gather()
.
If all awaitables are completed successfully, the result is an aggregate list of returned values.
The order of result values corresponds to the order of awaitables.
Futures
A Future is a low-level awaitable object that represents an eventual result of an asynchronous operation.
When a Future object is awaited it means that the coroutine will wait until the Future is resolved in some other place.
Future objects in asyncio are needed to allow callback-based code to be used with async/await
Couroutines non-concurrent execution examples
Here 2 examples :
1) execution of foo_coroutine()
2) sequential execution of 3 calls to foo_coroutine()
import asyncio import time from asyncio import sleep from collections.abc import Awaitable async def foo_coroutine(name): await sleep(2) return f'hello {name}' async def main(): # 1. execution of foo_coroutine() -> take 2 secs (2sec * 1) print(f"1 coroutine executed, started at {time.strftime('%X')}") # Incorrect : Nothing happens because we just call foo_coroutine(). foo_coroutine("david") # Correct : we await on foo_coroutine(): print(await foo_coroutine("david")) # print "hello david". print(f"finished at {time.strftime('%X')}") # 2. execution of 3 call to foo_coroutine() -> take 6 secs (2sec * 3) # That is sequential because await is blocking print(f"3 coroutines executed sequentially by default, started at : {time.strftime('%X')}") coroutine: Awaitable = foo_coroutine("david") coroutine_two: Awaitable = foo_coroutine("jane") coroutine_three: Awaitable = foo_coroutine("john") print(await coroutine + ", " + await coroutine_two + ", " + await coroutine_three) print(f"finished at {time.strftime('%X')}") asyncio.run(main()) |
Output :
1 coroutine executed, started at 13:32:24 async_io_non_concurrent_examples.py:16: RuntimeWarning: coroutine 'foo_coroutine' was never awaited foo_coroutine("david") RuntimeWarning: Enable tracemalloc to get the object allocation traceback hello david finished at 13:32:26 3 coroutines executed sequentially by default, started at : 13:32:26 hello david, hello jane, hello john finished at 13:32:32 |
Task concurrent execution examples
As reminder, tasks are always executed concurrently.
Here 2 examples :
1) concurrent execution of 3 calls to foo_coroutine()
with asyncio.create_task()
2) concurrent execution of 3 calls to foo_coroutine()
with await asyncio.gather()
import asyncio import time from asyncio import sleep from collections.abc import Awaitable async def foo_coroutine(name): await sleep(2) return f'hello {name}' async def main(): # 1. execution of 3 concurrent calls to to foo_coroutine() with await asyncio.gather() # -> take 2 secs (2sec * 3 in parallel = 2 sec) # That is concurrent because await asyncio.gather() executes tasks concurrently print(f"3 coroutines executed concurrently with asyncio.gather(), started at : {time.strftime('%X')}") coroutine: Awaitable = foo_coroutine("david") coroutine_two: Awaitable = foo_coroutine("jane") coroutine_three: Awaitable = foo_coroutine("john") result_aggregated = await asyncio.gather(coroutine, coroutine_two, coroutine_three) print(f'result_aggregated={result_aggregated}') print(f"finished at {time.strftime('%X')}") # 2. execution of 3 concurrent calls to to foo_coroutine() with asyncio.create_task() # -> take 2 secs (2sec * 3 in parallel = 2 sec) print(f"3 coroutines executed concurrently with asyncio.gather(), started at : {time.strftime('%X')}") coroutine: Awaitable = foo_coroutine("david") coroutine_two: Awaitable = foo_coroutine("jane") coroutine_three: Awaitable = foo_coroutine("john") result = asyncio.create_task(coroutine) result_two = asyncio.create_task(coroutine_two) result_three = asyncio.create_task(coroutine_three) # Optional, we can await on the tasks. # For example, suppose we want to process all result but for the third task await_result = await result await_result_two = await result_two await result_three print(f'await_result={await_result}') print(f'await_result_two={await_result_two}') print(f"finished at {time.strftime('%X')}") asyncio.run(main()) |
Output:
3 coroutines executed concurrently with asyncio.gather(), started at : 13:41:37 result_aggregated=['hello david', 'hello jane', 'hello john'] finished at 13:41:39 3 coroutines executed concurrently with asyncio.gather(), started at : 13:41:39 await_result=hello david await_result_two=hello jane finished at 13:41:41 |
Event Loop(Low-level API)
Overview
The event loop is the core of every asyncio application.
Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses.
We may manipulate the event loop via the loop object.
Under the cover, the High level API manipulate the loop when we call methods like asyncio.run()
.
Obtain the event loop
Get the running event loop in the current OS thread (have to be called from a coroutine or a callback):
loop = syncio.get_running_loop()
Set the current event loop for the current OS thread :
asyncio.set_event_loop(loop)
Create a new event loop object :
loop = asyncio.new_event_loop()
Executing code in thread or process pools
Send a func to be called in the specified executor :
awaitable loop.run_in_executor(executor, func, *args)
executor
argument : should be a concurrent.futures.Executor
instance.
The default executor is used if executor is None
.
args
argument : arguments of the function to execute.
Loop.run_in_executor() concurrent execution examples
The idea is using low-level asyncio api when we call blocking IO functions in order to benefit from concurrency thanks to threads.
For example that is the case when we call inside foo_coroutine()
the time.sleep()
blocking function instead of the asyncio.sleep()
not blocking function.
import asyncio import time from collections.abc import Awaitable # blocking call : async not required here because blocking call (time.sleep) def foo_blocking(name): time.sleep(2) return f'hello {name}' async def main(): loop = asyncio.get_running_loop() # execution of 3 concurrent calls to foo_blocking() with loop.run_in_executor() # -> take 2 secs (2sec * 3 in parallel = 2 sec) print(f"3 call of function executed concurrently with loop.run_in_executor(), started at : " f"{time.strftime('%X')}") task_one: Awaitable = loop.run_in_executor(None, foo_blocking, "my friend") task_two: Awaitable = loop.run_in_executor(None, foo_blocking, "my dear friend") task_three: Awaitable = loop.run_in_executor(None, foo_blocking, "my very dear friend") # Optional, we can await on the tasks. # For example, suppose we want to process all result but for the third task task_one_result = await task_one task_two_result = await task_two print(f'task_one_result={task_one_result}') print(f'task_two_result={task_two_result}') print(f"finished at {time.strftime('%X')}") asyncio.run(main()) |
Output :
3 call of function executed concurrently with loop.run_in_executor(), started at : 13:44:52 task_one_result=hello my friend task_two_result=hello my dear friend finished at 13:44:54 |