queue module – A synchronized queue class
queue module overview
It implements multi-producer, multi-consumer queues.
It is useful when information must be exchanged safely between multiple threads.
The Queue class in this module implements all the required locking semantics.
The module implements three types of queue.
3 things are true for these three queues:
– maxsize
is the number max of element that the queue may contain (0 meant no limit).
– Insertion will block once this size has been reached, until queue items are consumed.
class queue.Queue(maxsize=0)
:
FIFO queue.
class queue.LifoQueue(maxsize=0)
LIFO queue.
class queue.PriorityQueue(maxsize=0)
:
Priority queue.
The lowest valued entries are retrieved first (the lowest valued entry is the one that would be returned by min(entries)). A typical pattern for entries is
a
tuple in the form: (priority_number, data).
If the data elements are not comparable, the data can be wrapped in a class that ignores the data item and only compares the priority number.
Internally, those three types of queues use locks to temporarily block competing threads; however, they are not designed to handle reentrancy within a
thread.
Queue public methods
Queue.qsize()
:
Return the approximate size of the queue.
Queue.empty()
:
Return True if the queue is empty, False otherwise.
Queue.full()
:
Return True if the queue is full, False otherwise.
Queue.put(item, block=True, timeout=None)
:
Put item into the queue.
– If block
is true and timeout
is None
(the default): block if necessary until a free slot is available.
– Else If block
is true and timeout
> 0, it blocks at most timeout seconds and raises the Full exception if no free slot was
available within that time.
– Else If block
is false: put an item on the queue if a free slot is immediately available, else raise the Full exception.
Queue.put_nowait(item)
:
Equivalent to put(item, block=False)
.
Queue.get(block=True, timeout=None)
:
Remove and return an item from the queue.
– If block
is true and timeout
is None
(the default): block if necessary until an item is available.
– Else If block
is true and timeout
> 0, it blocks at most timeout seconds and raises the Empty exception if no item was
available within that time.
– Else If block
is false: return an item if one is immediately available, else raise the Empty exception
Queue.get_nowait()
:
Equivalent to get(False)
.
Example of Queue with a limited size and a consumer and a producer thread
import logging from queue import Queue from random import randrange from threading import Thread from time import sleep messages: Queue[str] logger: logging.Logger = logging.getLogger('main') def consume_message(): while True: while not messages.empty(): consumed_message: str = messages.get() logger.info(f'I am a consumer thread and I have consumed this consumed_message: ' f'{consumed_message}') sleep(1) sleep(0.1) def produce_message(): while True: while not messages.full(): produced_message: str = 'foo-' + str(randrange(1000)) messages.put(produced_message) logger.info(f'I am a producer thread and I have produced this message: {produced_message}') sleep(0.1) sleep(0.1) format = '%(asctime)-15s - %(threadName)s - %(levelname)s : %(name)s.%(funcName)s %(message)s' logging.basicConfig(level=logging.INFO, format=format, force=True) messages = Queue(5) Thread(target=lambda: consume_message(), name='consumer').start() Thread(target=lambda: produce_message(), name='producer').start() |
We can see that the producer and the consumer thread doesn’t work at the same pace. The producer is able to produce the messages every 0.1 second while the
consumer thread has to wait for 1 second after each consumed message.
Consequently, we can see in the output that the producer has to wait for the consumer about 1 second because the queue is limited in size.
Output:
2023-04-19 16:54:55,123 - producer - INFO : main.produce_message I am a producer thread and I have produced this message: foo-859 2023-04-19 16:54:56,047 - consumer - INFO : main.consume_message I am a consumer thread and I have consumed this consumed_message: foo-61 2023-04-19 16:54:56,111 - producer - INFO : main.produce_message I am a producer thread and I have produced this message: foo-168 2023-04-19 16:54:57,052 - consumer - INFO : main.consume_message I am a consumer thread and I have consumed this consumed_message: foo-20 2023-04-19 16:54:57,099 - producer - INFO : main.produce_message I am a producer thread and I have produced this message: foo-822 2023-04-19 16:54:58,056 - consumer - INFO : main.consume_message I am a consumer thread and I have consumed this consumed_message: foo-188 2023-04-19 16:54:58,088 - producer - INFO : main.produce_message I am a producer thread and I have produced this message: foo-135 2023-04-19 16:54:59,070 - consumer - INFO : main.consume_message I am a consumer thread and I have consumed this consumed_message: foo-974 2023-04-19 16:54:59,179 - producer - INFO : main.produce_message I am a producer thread and I have produced this message: foo-81 |
Thread-Local Data
Overview
Thread-local data is data whose values are thread specific. To manage thread-local data, just create an instance of local (or a subclass) and store attributes on it:
mydata = threading.local() mydata.x = 1 |
The instance values will be different for separate threads.
We can consider the thread-local data as a global variable at the thread level.
We can use it in two main cases:
– We didn’t subclass the thread class while we have a state specific by thread instance.
– We want to transmit one or multiple information between functions without transmitting them explicitly but via the thread local to simplify the
invocation of the functions.
Example
We need to define a transaction ID by thread and we want to store it into the thread local variable to allow any functions invoked by the thread to use it directly.
import logging import threading from threading import Thread, local def some_order_process(): logger.info(f'__local.transaction_id={__local.transaction_id}') def foo_process(transaction_id: int): __local.transaction_id = transaction_id logger.info(f'__local.transaction_id={__local.transaction_id}') some_order_process() format = '%(asctime)-15s - %(threadName)s - %(levelname)s : %(name)s.%(funcName)s %(message)s' logging.basicConfig(level=logging.INFO, format=format) logger: logging.Logger = logging.getLogger('main') __local: local = threading.local() Thread(target=lambda: foo_process(123), name='foo_process_1').start() Thread(target=lambda: foo_process(456), name='foo_process_2').start() |
Output:
2023-04-19 18:50:36,917 - foo_process_1 - INFO : main.foo_process __local.transaction_id=123 2023-04-19 18:50:36,917 - foo_process_1 - INFO : main.some_order_process __local.transaction_id=123 2023-04-19 18:50:36,918 - foo_process_2 - INFO : main.foo_process __local.transaction_id=456 2023-04-19 18:50:36,918 - foo_process_2 - INFO : main.some_order_process __local.transaction_id=456 |
Lock or not lock? We must aware of the GIL
GIL in CPython
In CPython, the global interpreter lock, or GIL, is a mutex that protects access to Python objects, preventing multiple threads from executing Python
bytecodes at once.
The GIL is not ideal, since it prevents multithreaded CPython programs from taking full advantage of multiprocessor systems in certain situations.
Luckily, operations, such as I/O, image processing, and NumPy number crunching, happen outside the GIL.
Therefore it is only in multithreaded programs that spend a lot of time inside the GIL, interpreting CPython bytecode, that the GIL becomes a bottleneck.
Locks
A lock is in one of two states: locked or unlocked.
It is created in the unlocked state.
It has two basic methods: acquire()
and release()
.
When the state is unlocked, acquire()
changes the state to locked and returns immediately.
When the state is locked, acquire()
blocks until a call to release()
in another thread changes it to unlocked, then the acquire()
call
resets it to locked and returns.
Locks also support the context management protocol.
Examples of multi-threading code that looks not thread-safe but it is thanks to the GIL
We start two threads that update a shared counter and inside their logic we do a processing according to the value of the counter.
And inside this processing, we check the value of the counter and normally it should be updated from time to time but it will never be: indeed, the
exception is never risen.
import logging from threading import Thread from time import sleep counter: int = 0 logger: logging.Logger = logging.getLogger('main') def increment_counter_according_to_a_rule(should_sleep: bool): global counter for i in range(100): if counter % 2 == 0: if should_sleep: sleep(0.1) if counter % 2 != 0: raise RuntimeError() else: counter += 1 sleep(0.05) format = '%(asctime)-15s - %(threadName)s - %(levelname)s : %(name)s.%(funcName)s %(message)s' logging.basicConfig(level=logging.INFO, format=format, force=True) logger.info('Start the first thread') Thread(target=lambda: increment_counter_according_to_a_rule(True), name='increment_counter_according_to_a_rule 1').start() logger.info('Start the second thread') Thread(target=lambda: increment_counter_according_to_a_rule(False), name='increment_counter_according_to_a_rule 2').start() |
Output:
2023-04-20 14:34:55,795 - MainThread - INFO : main.<module> Start the first thread 2023-04-20 14:34:55,796 - MainThread - INFO : main.<module> Start the second thread Process finished with exit code 0 |
Example of dictionary state corruption without lock
We start two threads: the first one, iterates on a dictionary and the second one, updates the dictionary.
But updating a dictionary during iteration is not allowed in Python.
from threading import Thread def iterate_dic() -> None: for key, value in my_dic.items(): print(f'iterate_dic() : key={key}, value={value}', flush=True) def update_dic() -> None: print(f'update_dic()', flush=True) my_dic['danger'] = 1 my_dic = {'key-' + str(i): i for i in range(100)} check_dic_thread = Thread(target=iterate_dic) check_dic_thread.start() add_to_dic_thread = Thread(target=update_dic) add_to_dic_thread.start() |
Output:
iterate_dic() : key=key-0, value=0 iterate_dic() : key=key-1, value=1 iterate_dic() : key=key-2, value=2 iterate_dic() : key=key-3, value=3 iterate_dic() : key=key-4, value=4 iterate_dic() : key=key-5, value=5 iterate_dic() : key=key-6, value=6 update_dic() Exception in thread Thread-1 (iterate_dic): Traceback (most recent call last): File "C:\Users\david\AppData\Local\Programs\Python\Python310\lib\threading.py", line 1016, in _bootstrap_inner self.run() File "C:\Users\david\AppData\Local\Programs\Python\Python310\lib\threading.py", line 953, in run self._target(*self._args, **self._kwargs) File "C:\programmation\workspace-python\python_blog_examples\thread_safety\a.py", line 7, in iterate_dic for key, value in my_dic.items(): RuntimeError: dictionary changed size during iteration |
Example of fix dictionary state corruption with lock
from threading import Lock from threading import Thread def iterate_dic() -> None: with dic_lock: for key, value in my_dic.items(): print(f'iterate_dic() : key={key}, value={value}', flush=True) def update_dic() -> None: print(f'update_dic()', flush=True) with dic_lock: my_dic['safe'] = 1 my_dic = {'key-' + str(i): i for i in range(100)} dic_lock = Lock() check_dic_thread = Thread(target=iterate_dic) check_dic_thread.start() add_to_dic_thread = Thread(target=update_dic) add_to_dic_thread.start() |