- Python Thread-safe Queue
- Introduction to the Python thread-safe queue
- Creating a new queue
- Adding an item to the queue
- Getting an item from the queue
- Getting the size of the queue
- Marking a task as completed
- Waiting for all tasks on the queue to be completed
- Python thread-safe queue example
- Summary
- Python Queue Module
- Python Queue
- Queue.get() and Queue.put() methods
- Empty a Python Queue
- Priority Queues in Python
- Python heapq Queue
- Conclusion
- References
Python Thread-safe Queue
Summary: in this tutorial, you’ll learn how to use a Python thread-safe queue to exchange data safely between multiple threads.
Introduction to the Python thread-safe queue
The built-in queue module allows you to exchange data safely between multiple threads. The Queue class in the queue module implements all required locking semantics.
Creating a new queue
To create a new queue, you import the Queue class from the queue module:
from queue import Queue
Code language: Python (python)
and use the Queue constructor as follows:
queue = Queue()
Code language: Python (python)
To create a queue with a size limit, you can use the maxsize parameter. For example, the following creates a queue that can store up to 10 items:
queue = Queue(maxsize=10)
Code language: Python (python)
Adding an item to the queue
To add an item to the queue, you use the put() method like this:
queue.add(item)
Code language: Python (python)
Once the queue is full, you won’t be able to add an item to it. Also, the call to the put() method will block until the queue has space available.
If you don’t want the put() method to block if the queue is full, you can set the block argument to False :
queue.put(item, block=False)
Code language: Python (python)
In this case, the put() method will raise the queue.Full exception if the queue is full:
try: queue.put(item, block=False) except queue.Full as e: # handle exceptoin
Code language: Python (python)
To add an item to a sized limited queue and block with a timeout, you can use the timeout parameter like this:
try: queue.put(item, timeout=3) except queue.Full as e: # handle exceptoin
Code language: Python (python)
Getting an item from the queue
To get an item from the queue, you can use the get() method:
item = queue.get()
Code language: Python (python)
The get() method will block until an item is available for retrieval from the queue.
To get an item from the queue without blocking, you can set the block parameter to False :
try: queue.get(block=False) except queue.Empty: # handle exception
Code language: Python (python)
To get an item from the queue and block it with a time limit, you can use the get() method with a timeout:
try: item = queue.get(timeout=10) except queue.Empty: # .
Code language: Python (python)
Getting the size of the queue
The qsize() method returns the number of items in the queue:
size = queue.size()
Code language: Python (python)
Also, the empty() method returns True if the queue is empty or False otherwise. On the other hand, the full() method returns True if the queue is full or False otherwise.
Marking a task as completed
An item that you add to the queue represents a unit of work or a task.
When a thread calls the get() method to get the item from the queue, it may need to process it before the task is considered completed.
Once completed, the thread may call the task_done() method of the queue to indicate that it has processed the task completely:
item = queue.get() # process the item # . # mark the item as completed queue.task_done()
Code language: Python (python)
Waiting for all tasks on the queue to be completed
To wait for all tasks on the queue to be completed, you can call the join() method on the queue object:
queue.join()
Code language: Python (python)
Python thread-safe queue example
The following example illustrates how to use the thread-safe queue to exchange data between two threads:
import time from queue import Empty, Queue from threading import Thread def producer(queue): for i in range(1, 6): print(f'Inserting item into the queue') time.sleep(1) queue.put(i) def consumer(queue): while True: try: item = queue.get() except Empty: continue else: print(f'Processing item ') time.sleep(2) queue.task_done() def main(): queue = Queue() # create a producer thread and start it producer_thread = Thread( target=producer, args=(queue,) ) producer_thread.start() # create a consumer thread and start it consumer_thread = Thread( target=consumer, args=(queue,), daemon=True ) consumer_thread.start() # wait for all tasks to be added to the queue producer_thread.join() # wait for all tasks on the queue to be completed queue.join() if __name__ == '__main__': main()
Code language: Python (python)
First, define the producer() function that adds numbers from 1 to 11 to the queue. It delays one second in each iteration:
def producer(queue): for i in range(1, 6): print(f'Inserting item into the queue') time.sleep(1) queue.put(i)
Code language: Python (python)
Second, define the consumer() function that gets an item from the queue and processes it. It delays two seconds after processing each item on the queue:
def consumer(queue): while True: try: item = queue.get() except Empty: continue else: print(f'Processing item ') time.sleep(2) queue.task_done()
Code language: Python (python)
The queue. task_done() indicates that the function has processed the item on the queue.
Third, define the main() function that creates two threads, one thread adds a number to the queue every second while another thread processes an item on the queue every two seconds:
def main(): queue = Queue() # create a producer thread and start it producer_thread = Thread( target=producer, args=(queue,) ) producer_thread.start() # create a consumer thread and start it consumer_thread = Thread( target=consumer, args=(queue,), daemon=True ) consumer_thread.start() # wait for all tasks to be added to the queue producer_thread.join() # wait for all tasks on the queue to be completed queue.join()
Code language: Python (python)
Inserting item 1 into the queue Inserting item 2 into the queue Processing item 1 Inserting item 3 into the queue Processing item 2 Inserting item 4 into the queue Inserting item 5 into the queue Processing item 3 Processing item 4 Processing item 5
Code language: Python (python)
The following are steps in the main() function:
- Create a new queue by calling the Queue() constructor
- Create a new thread called producer_thread and start it immediately
- Create a daemon thread called consumer_thread and start it immediately.
- Wait for all the numbers to be added to the queue using the join() method of the thread.
- Wait for all the tasks on the queue to be completed by calling the join() method of the queue.
The producer adds a number to the queue every second, and the consumer process a number from the queue every two seconds. It also displays the numbers on the queue every second.
Summary
Python Queue Module
In this article, we shall look at the Python Queue module, which is an interface for the Queue data structure.
Python Queue
A Queue is a data structure where the first element to be inserted is also the first element popped. It’s just like a real-life queue, where the first in line is also the first one out.
In Python, we can use the queue module to create a queue of objects.
This is a part of the standard Python library, so there’s no need to use pip .
To create a Queue object, we can instantiate it using:
By default, this has a capacity of 0, but if you want to explicitly mention it, you can do so using:
q = queue.Queue(max_capacity)
Queue.get() and Queue.put() methods
We can insert and retrieve values into the Queue using the queue.get() and queue.put() methods.
Let’s create a queue and insert numbers from 1 to 5.
import queue # Instantiate the Queue object q = queue.Queue() # Insert elements 1 to 5 in the queue for i in range(1, 6): q.put(i) print('Now, q.qsize() =', q.qsize()) # Now, the queue looks like this: # (First) 1As you can see, the output shows that the first index is indeed 1, so that’s the top of the Queue. The rest of the elements follow it in a similar fashion.
Empty a Python Queue
We can empty a queue object using q.empty() . This sets the size to 0, and empties the queue.
import queue # Instantiate the Queue object q = queue.Queue() # Insert elements 1 to 5 in the queue for i in range(1, 6): q.put(i) print('Now, q.qsize() =', q.qsize()) # Empty queue q.empty() print('After emptying, size =', q.qsize()) for i in range(q.qsize()): print(q.get())Now, q.qsize() = 5 After emptying, size = 0While most typical queue implementations have a pop (or dequeue operation), the queue module does not have a method for this.
So, if you want to pop elements from the queue, you must use a different queue class yourself. A simple solution would be to use Python’s list.
We’ll use list.append(value) to insert elements into the queue, since insertion happens at the end, and remove elements using list.pop(0) , since the first element is removed.
class MyQueue(): # Using Python Lists as a Queue def __init__(self): self.queue = [] def enqueue(self, value): # Inserting to the end of the queue self.queue.append(value) def dequeue(self): # Remove the furthest element from the top, # since the Queue is a FIFO structure return self.queue.pop(0) my_q = MyQueue() my_q.enqueue(2) my_q.enqueue(5) my_q.enqueue(7) for i in my_q.queue: print(i) print('Popped,', my_q.dequeue()) for i in my_q.queue: print(i)We’ve written our own queue class with a dequeue operation! Now, we’ll show you how you could use other modules for using other types of Queues.
Priority Queues in Python
Priority Queue is a type of queue that adds to the queue on the basis of an item’s priority, which is typically an integer value.
Items with a lower priority number are given a higher preference and are at the front of the queue, while others are behind.
The queue module also supports the Priority Queue structure, so let’s see how we can use it.
import queue priority_q = queue.PriorityQueue() priority_q.put((1, 'Hello')) priority_q.put((3, 'AskPython')) priority_q.put((2, 'from')) for i in range(priority_q.qsize()): print(priority_q.get())(1, 'Hello') (2, 'from') (3, 'AskPython')As you can see, elements are inserted on the basis of their priority.
Python heapq Queue
We can also use the heapq module to implement our priority queues.
>>> import heapq >>> q = [] >>> heapq.heappush(q, (1, 'hi')) >>> q [(1, 'hi')] >>> heapq.heappush(q, (3, 'AskPython')) >>> q [(1, 'hi'), (3, 'AskPython')] >>> heapq.heappush(q, (2, 'from')) >>> q [(1, 'hi'), (3, 'AskPython'), (2, 'from')] >>> heapq.heappop(q) (1, 'hi') >>> heapq.heappop(q) (2, 'from') >>> heapq.heappop(q) (3, 'AskPython') >>> heapq.heappop(q) Traceback (most recent call last): File "", line 1, in IndexError: index out of rangeSo we are creating a priority queue and popping from it until it is empty. The same can also be achieved using the below program
import heapq q = [] heapq.heappush(q, (2, 'from')) heapq.heappush(q, (1, 'Hello')) heapq.heappush(q, (3, 'AskPython')) while q: # Keep popping until the queue is empty item = heapq.heappop(q) print(item)(1, 'Hello') (2, 'from') (3, 'AskPython')Conclusion
In this article, we learned about how we can implement and use different Queues in Python.
References