Thread-safety in Python

queue module – A synchronized queue class

queue module overview

It implements multi-producer, multi-consumer queues.
It is useful when information must be exchanged safely between multiple threads.
The Queue class in this module implements all the required locking semantics.

The module implements three types of queue.
3 things are true for these three queues:
maxsize is the number max of element that the queue may contain (0 meant no limit).
– Insertion will block once this size has been reached, until queue items are consumed.

class queue.Queue(maxsize=0):
FIFO queue.

class queue.LifoQueue(maxsize=0)
LIFO queue.

class queue.PriorityQueue(maxsize=0):
Priority queue.
The lowest valued entries are retrieved first (the lowest valued entry is the one that would be returned by min(entries)). A typical pattern for entries is a tuple in the form: (priority_number, data).
If the data elements are not comparable, the data can be wrapped in a class that ignores the data item and only compares the priority number.

Internally, those three types of queues use locks to temporarily block competing threads; however, they are not designed to handle reentrancy within a thread.

Queue public methods

Queue.qsize():
Return the approximate size of the queue.

Queue.empty():
Return True if the queue is empty, False otherwise.

Queue.full():
Return True if the queue is full, False otherwise.

Queue.put(item, block=True, timeout=None):
Put item into the queue.
– If block is true and timeout is None (the default): block if necessary until a free slot is available.
– Else If block is true and timeout > 0, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time.
– Else If block is false: put an item on the queue if a free slot is immediately available, else raise the Full exception.

Queue.put_nowait(item):
Equivalent to put(item, block=False).

Queue.get(block=True, timeout=None):
Remove and return an item from the queue.
– If block is true and timeout is None (the default): block if necessary until an item is available.
– Else If block is true and timeout > 0, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time.
– Else If block is false: return an item if one is immediately available, else raise the Empty exception

Queue.get_nowait():
Equivalent to get(False).

Example of Queue with a limited size and a consumer and a producer thread

import logging
from queue import Queue
from random import randrange
from threading import Thread
from time import sleep
 
messages: Queue[str]
logger: logging.Logger = logging.getLogger('main')
 
 
def consume_message():
    while True:
        while not messages.empty():
            consumed_message: str = messages.get()
            logger.info(f'I am a consumer thread and I have consumed this consumed_message: '
                        f'{consumed_message}')
            sleep(1)
        sleep(0.1)
 
 
def produce_message():
    while True:
        while not messages.full():
            produced_message: str = 'foo-' + str(randrange(1000))
            messages.put(produced_message)
            logger.info(f'I am a producer thread and I have produced this message: {produced_message}')
            sleep(0.1)
        sleep(0.1)
 
 
format = '%(asctime)-15s - %(threadName)s - %(levelname)s : %(name)s.%(funcName)s %(message)s'
logging.basicConfig(level=logging.INFO,
                    format=format,
                    force=True)
messages = Queue(5)
Thread(target=lambda: consume_message(), name='consumer').start()
Thread(target=lambda: produce_message(), name='producer').start()

We can see that the producer and the consumer thread doesn’t work at the same pace. The producer is able to produce the messages every 0.1 second while the consumer thread has to wait for 1 second after each consumed message.
Consequently, we can see in the output that the producer has to wait for the consumer about 1 second because the queue is limited in size.
Output:

2023-04-19 16:54:55,123 - producer - INFO : main.produce_message I am a producer thread and I have produced this message: foo-859
2023-04-19 16:54:56,047 - consumer - INFO : main.consume_message I am a consumer thread and I have consumed this consumed_message: foo-61
2023-04-19 16:54:56,111 - producer - INFO : main.produce_message I am a producer thread and I have produced this message: foo-168
2023-04-19 16:54:57,052 - consumer - INFO : main.consume_message I am a consumer thread and I have consumed this consumed_message: foo-20
2023-04-19 16:54:57,099 - producer - INFO : main.produce_message I am a producer thread and I have produced this message: foo-822
2023-04-19 16:54:58,056 - consumer - INFO : main.consume_message I am a consumer thread and I have consumed this consumed_message: foo-188
2023-04-19 16:54:58,088 - producer - INFO : main.produce_message I am a producer thread and I have produced this message: foo-135
2023-04-19 16:54:59,070 - consumer - INFO : main.consume_message I am a consumer thread and I have consumed this consumed_message: foo-974
2023-04-19 16:54:59,179 - producer - INFO : main.produce_message I am a producer thread and I have produced this message: foo-81

Thread-Local Data

Overview

Thread-local data is data whose values are thread specific. To manage thread-local data, just create an instance of local (or a subclass) and store attributes on it:

mydata = threading.local()
mydata.x = 1

The instance values will be different for separate threads.
We can consider the thread-local data as a global variable at the thread level.
We can use it in two main cases:
– We didn’t subclass the thread class while we have a state specific by thread instance.
– We want to transmit one or multiple information between functions without transmitting them explicitly but via the thread local to simplify the invocation of the functions.

Example

We need to define a transaction ID by thread and we want to store it into the thread local variable to allow any functions invoked by the thread to use it directly.

import logging
import threading
from threading import Thread, local
 
 
def some_order_process():
    logger.info(f'__local.transaction_id={__local.transaction_id}')
 
 
def foo_process(transaction_id: int):
    __local.transaction_id = transaction_id
    logger.info(f'__local.transaction_id={__local.transaction_id}')
    some_order_process()
 
 
format = '%(asctime)-15s - %(threadName)s - %(levelname)s : %(name)s.%(funcName)s %(message)s'
logging.basicConfig(level=logging.INFO,
                    format=format)
logger: logging.Logger = logging.getLogger('main')
__local: local = threading.local()
Thread(target=lambda: foo_process(123), name='foo_process_1').start()
Thread(target=lambda: foo_process(456), name='foo_process_2').start()

Output:

2023-04-19 18:50:36,917 - foo_process_1 - INFO : main.foo_process __local.transaction_id=123
2023-04-19 18:50:36,917 - foo_process_1 - INFO : main.some_order_process __local.transaction_id=123
2023-04-19 18:50:36,918 - foo_process_2 - INFO : main.foo_process __local.transaction_id=456
2023-04-19 18:50:36,918 - foo_process_2 - INFO : main.some_order_process __local.transaction_id=456

Lock or not lock? We must aware of the GIL

GIL in CPython

In CPython, the global interpreter lock, or GIL, is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes at once.
The GIL is not ideal, since it prevents multithreaded CPython programs from taking full advantage of multiprocessor systems in certain situations.
Luckily, operations, such as I/O, image processing, and NumPy number crunching, happen outside the GIL.
Therefore it is only in multithreaded programs that spend a lot of time inside the GIL, interpreting CPython bytecode, that the GIL becomes a bottleneck.

Locks

A lock is in one of two states: locked or unlocked.
It is created in the unlocked state.
It has two basic methods: acquire() and release().
When the state is unlocked, acquire() changes the state to locked and returns immediately.
When the state is locked, acquire() blocks until a call to release() in another thread changes it to unlocked, then the acquire() call resets it to locked and returns.

Locks also support the context management protocol.

Examples of multi-threading code that looks not thread-safe but it is thanks to the GIL

We start two threads that update a shared counter and inside their logic we do a processing according to the value of the counter.
And inside this processing, we check the value of the counter and normally it should be updated from time to time but it will never be: indeed, the exception is never risen.

import logging
from threading import Thread
from time import sleep
 
counter: int = 0
logger: logging.Logger = logging.getLogger('main')
 
 
def increment_counter_according_to_a_rule(should_sleep: bool):
    global counter
    for i in range(100):
        if counter % 2 == 0:
            if should_sleep:
                sleep(0.1)
                if counter % 2 != 0:
                    raise RuntimeError()
        else:
            counter += 1
        sleep(0.05)
 
 
format = '%(asctime)-15s - %(threadName)s - %(levelname)s : %(name)s.%(funcName)s %(message)s'
logging.basicConfig(level=logging.INFO,
                    format=format,
                    force=True)
logger.info('Start the first thread')
Thread(target=lambda: increment_counter_according_to_a_rule(True),
       name='increment_counter_according_to_a_rule 1').start()
 
logger.info('Start the second thread')
Thread(target=lambda: increment_counter_according_to_a_rule(False),
       name='increment_counter_according_to_a_rule 2').start()

Output:

2023-04-20 14:34:55,795 - MainThread - INFO : main.<module> Start the first thread
2023-04-20 14:34:55,796 - MainThread - INFO : main.<module> Start the second thread
 
Process finished with exit code 0

Example of dictionary state corruption without lock

We start two threads: the first one, iterates on a dictionary and the second one, updates the dictionary.
But updating a dictionary during iteration is not allowed in Python.

from threading import Thread
 
 
def iterate_dic() -> None:
    for key, value in my_dic.items():
        print(f'iterate_dic() : key={key}, value={value}', flush=True)
 
 
def update_dic() -> None:
    print(f'update_dic()', flush=True)
    my_dic['danger'] = 1
 
 
my_dic = {'key-' + str(i): i for i in range(100)}
check_dic_thread = Thread(target=iterate_dic)
check_dic_thread.start()
 
add_to_dic_thread = Thread(target=update_dic)
add_to_dic_thread.start()

Output:

iterate_dic() : key=key-0, value=0
iterate_dic() : key=key-1, value=1
iterate_dic() : key=key-2, value=2
iterate_dic() : key=key-3, value=3
iterate_dic() : key=key-4, value=4
iterate_dic() : key=key-5, value=5
iterate_dic() : key=key-6, value=6
update_dic()
Exception in thread Thread-1 (iterate_dic):
Traceback (most recent call last):
  File "C:\Users\david\AppData\Local\Programs\Python\Python310\lib\threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "C:\Users\david\AppData\Local\Programs\Python\Python310\lib\threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "C:\programmation\workspace-python\python_blog_examples\thread_safety\a.py", line 7, in iterate_dic
    for key, value in my_dic.items():
RuntimeError: dictionary changed size during iteration

Example of fix dictionary state corruption with lock

from threading import Lock
from threading import Thread
 
 
def iterate_dic() -> None:
    with dic_lock:
        for key, value in my_dic.items():
            print(f'iterate_dic() : key={key}, value={value}', flush=True)
 
 
def update_dic() -> None:
    print(f'update_dic()', flush=True)
    with dic_lock:
        my_dic['safe'] = 1
 
 
my_dic = {'key-' + str(i): i for i in range(100)}
dic_lock = Lock()
check_dic_thread = Thread(target=iterate_dic)
check_dic_thread.start()
 
add_to_dic_thread = Thread(target=update_dic)
add_to_dic_thread.start()
Ce contenu a été publié dans Non classé. Vous pouvez le mettre en favoris avec ce permalien.

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *