Python thread queue empty

How to make sure queue is empty before exiting main thread

I have a program that has two threads, the main thread and one additional that works on handling jobs from a FIFO queue. Something like this:

import queue import threading q = queue.Queue() def _worker(): while True: msg = q.get(block=True) print(msg) q.task_done() t = threading.Thread(target=_worker) #t.daemon = True t.start() q.put('asdf-1') q.put('asdf-2') q.put('asdf-4') q.put('asdf-4') 

What I want to accomplish is basically to make sure the queue is emptied before the main thread exits. If I set t.daemon to be True the program will exit before the queue is emptied, however if it’s set to False the program will never exit. Is there some way to make sure the thread running the _worker() method clears the queue on main thread exit?

I am not a concurrent programming expert, but maybe, you need to add t.join() at the end of program to make the main thread wait till t finishes its execution.

@ozgur: the thread will never terminate, infinite loop.. so using join won’t work — docs.python.org/2/library/threading.html#threading.Thread.join

@ozgur: easier code, I don’t need to catch queue.Empty and retry all over again. I think this is a use case where actually block=True is intended to be used.. please correct me if I’m worng

Читайте также:  Проверка большого числа php

Did you try adding q.join() at the end and uncommenting t.deamon = True ? Maybe you might want to look into atexit module to actually clear the queue at program exit.

Источник

Потокобезопасная очередь в Python

Встроенный модуль queue позволяет безопасно обмениваться данными между несколькими потоками. Класс Queue из модуля queue реализует всю необходимую семантику блокировки.

Создаем новую очередь

Чтобы создать новую очередь, нужно использовать конструктор Queue следующим образом:

from queue import Queue queue = Queue()

Чтобы создать очередь с ограничением по размеру, можно использовать параметр maxsize . Например, ниже создается очередь, которая может хранить до 10 элементов:

from queue import Queue queue = Queue(maxsize=10)

Добавляем элемент в очередь

Чтобы добавить элемент в очередь, нужно использовать метод put() следующим образом:

Как только очередь заполнится, вы не сможете добавить в нее элемент. Вызов метода put() будет блокироваться до тех пор, пока в очереди не освободится место.

Если вы не хотите, чтобы метод put() блокировался, если очередь переполнена, вы можете передать аргумент block=False :

В примере ниже метод put() вызовет исключение queue.Full , если очередь переполнена:

try: queue.put(item, block=False) except queue.Full as e: # обработка исключения

Чтобы добавить элемент в ограниченную по размеру очередь и заблокировать его по таймауту, вы можете использовать параметр таймаута следующим образом:

try: queue.put(item, timeout=3) except queue.Full as e: # обработка исключения

Получаем элемент из очереди

Чтобы получить элемент из очереди, вы можете использовать метод get() :

Метод get() будет блокироваться до тех пор, пока элемент не будет доступен для получения из очереди.

Чтобы получить элемент из очереди без блокировки, можно передать аргумент block=False :

try: queue.get(block=False) except queue.Empty: # обработка исключения

Чтобы получить элемент из очереди и блокировать его с ограничением по времени, можно использовать метод get() с таймаутом:

try: item = queue.get(timeout=10) except queue.Empty: # . 

Получаем размер очереди

Метод qsize() возвращает количество элементов в очереди:

Кроме того, метод empty() возвращает True , если очередь пуста, или False в противном случае. С другой стороны, метод full() возвращает True , если очередь заполнена, или False в противном случае.

Помечаем задачу как выполненную

Элемент, который вы добавляете в очередь, представляет собой единицу работы или задачу.

Когда поток вызывает метод get() для получения элемента из очереди, ему может потребоваться обработать его, прежде чем задача будет считаться выполненной.

После завершения поток может вызвать метод task_done() , чтобы указать, что он полностью обработал задачу:

item = queue.get() # обработка элемента # . # помечаем элемент как выполненный queue.task_done()

Ждем завершение всех задач в очереди

Чтобы дождаться завершения всех задач в очереди, можно вызвать метод join() на объекте очереди:

Пример потокобезопасной очереди

Следующий пример демонстрирует использование потокобезопасной очереди для обмена данными между двумя потоками:

import time from queue import Empty, Queue from threading import Thread def producer(queue): for i in range(1, 6): print(f'Вставляем элемент в очередь') time.sleep(1) queue.put(i) def consumer(queue): while True: try: item = queue.get() except Empty: continue else: print(f'Обрабатываем элемент ') time.sleep(2) queue.task_done() def main(): queue = Queue() # создаем поток-производитель и запускаем его producer_thread = Thread( target=producer, args=(queue,) ) producer_thread.start() # создаем поток-потребитель и запускаем его consumer_thread = Thread( target=consumer, args=(queue,), daemon=True ) consumer_thread.start() # дожидаемся, пока все задачи добавятся в очередь producer_thread.join() # дожидаемся, пока все задачи в очереди будут завершены queue.join() if __name__ == '__main__': main()

Как это работает

1. Сначала мы создаем функцию producer() , которая добавляет в очередь числа от 1 до 11. На каждой итерации она задерживается на одну секунду:

def producer(queue): for i in range(1, 6): print(f'Вставляем элемент в очередь') time.sleep(1) queue.put(i)

2. Создаем функцию consumer() , которая получает элемент из очереди и обрабатывает его. Она задерживается на две секунды после обработки каждого элемента в очереди:

def consumer(queue): while True: try: item = queue.get() except Empty: continue else: print(f'Обрабатываем элемент ') time.sleep(2) queue.task_done()

queue.task_done() указывает на то, что функция обработала элемент очереди.

3. Создаем функцию main() , которая создает два потока. Один поток добавляет номер в очередь каждую секунду, а другой обрабатывает элемент в очереди каждые две секунды:

def main(): queue = Queue() # создаем поток-производитель и запускаем его producer_thread = Thread( target=producer, args=(queue,) ) producer_thread.start() # создаем поток-потребитель и запускаем его consumer_thread = Thread( target=consumer, args=(queue,), daemon=True ) consumer_thread.start() # дожидаемся, пока все задачи добавятся в очередь producer_thread.join() # дожидаемся, пока все задачи в очереди будут завершены queue.join()
  • Создаем новую очередь, вызвав конструктор Queue() .
  • Создаем новый поток с именем producer_thread и немедленно запускаем его.
  • Создаем поток-демон consumer_thread и немедленно запускаем его.
  • Ждем добавления всех номеров в очередь с помощью метода join() .
  • Ждем завершения всех задач в очереди с помощью метода join() .

Поток-производитель добавляет число в очередь каждую секунду, а поток-потребитель обрабатывает число из очереди каждые две секунды. Он также отображает числа в очереди каждую секунду.

Вставляем 1 элемент в очередь
Вставляем 2 элемент в очередь
Обрабатываем элемент 1
Вставляем 3 элемент в очередь
Обрабатываем элемент 2
Вставляем 4 элемент в очередь
Вставляем 5 элемент в очередь
Обрабатываем элемент 3
Обрабатываем элемент 4
Обрабатываем элемент 5

Источник

Python: Queue.Empty Exception Handling

After a short debate with someone about exception handling in Python — sparked by the handling of a queue object — I thought I’d throw it out there.

METHOD 1:

import Queue q = Queue.Queue() try: task=q.get(False) #Opt 1: Handle task here and call q.task_done() except Queue.Empty: #Handle empty queue here pass #Opt2: Handle task here and call q.task_done() 

METHOD 2:

import Queue q = Queue.Queue() if q.empty(): #Handle empty queue here else: task = q.get() #Handle task here q.task_done() 

One argument is that Method 1 is wrong because the queue being empty is not an error, and therefore should not be handled using Queue.Empty exception. Additionally, it could make debugging more difficult when coded this way if you consider that the task handling part could potentially large. The other argument is that either way is acceptable in Python and that handling the task outside of the try/except could aid debugging if task handling is large, although agreed that this might look uglier than using Method 2. Opinions? UPDATE: A little more info after answer 1 came through. The debate was started after method 1 was using in some multithreaded code. In which case, the code will acquire the lock (from a threading.Lock object) and release it either once the task it returned or Queue.Empty is thrown UPDATE 2: It was unknown to both of us that the the queue object was thread safe. Looks like try/except is the way to go!

Источник

Checking for empty Queue in python’s multiprocessing

I have a program using python’s packages multiprocessing and Queue. One of my functions have this structure:

from multiprocessing import Process, Queue def foo(queue): while True: try: a = queue.get(block = False) doAndPrintStuff(a) except: print "the end" break if __name__ == "__main__" nthreads = 4 queue = Queue.Queue() # put stuff in the queue here for stuff in moreStuff: queue.put(stuff) procs = [Process(target = foo, args = (queue,)) for i in xrange(nthreads)] for p in procs: p.start() for p in procs: p.join() 

the idea is that when I try to extract from the queue and it is empty, it’ll raise an exception and terminate the loop. So I have two questions: 1) is this a safe idiom? Are there better ways to do this? 2) I tried to find what is the exact exception that is raised when I try to .get() from an empty queue. Currently my program is catching all exceptions, which sucks when the error is somewhere else and I only get a «the end» message. I tried:

 import Queue queue = Queue.Queue() [queue.put(x) for x in xrange(10)] try: print queue.get(block = False) except Queue.Empty: print "end" break 

4 Answers 4

The exception should be Queue.Empty . But are you sure you got the same error? In your second example, you also switched the queue itself from multiprocessing.Queue to Queue.Queue , which I think may be the problem.

It might seem strange, but you have to use the multiprocessing.Queue class, but use the Queue.Empty exception (which you have to import yourself from the Queue module)

It appears that the Queue is empty until the put buffers are flushed, which may take a while.

The solution to our problem is to use sentinels, or maybe the built-in task_done() call:

task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Источник

Оцените статью