Python очередь и многопоточность

Потокобезопасная очередь в Python

Встроенный модуль queue позволяет безопасно обмениваться данными между несколькими потоками. Класс Queue из модуля queue реализует всю необходимую семантику блокировки.

Создаем новую очередь

Чтобы создать новую очередь, нужно использовать конструктор Queue следующим образом:

from queue import Queue queue = Queue()

Чтобы создать очередь с ограничением по размеру, можно использовать параметр maxsize . Например, ниже создается очередь, которая может хранить до 10 элементов:

from queue import Queue queue = Queue(maxsize=10)

Добавляем элемент в очередь

Чтобы добавить элемент в очередь, нужно использовать метод put() следующим образом:

Как только очередь заполнится, вы не сможете добавить в нее элемент. Вызов метода put() будет блокироваться до тех пор, пока в очереди не освободится место.

Если вы не хотите, чтобы метод put() блокировался, если очередь переполнена, вы можете передать аргумент block=False :

В примере ниже метод put() вызовет исключение queue.Full , если очередь переполнена:

try: queue.put(item, block=False) except queue.Full as e: # обработка исключения

Чтобы добавить элемент в ограниченную по размеру очередь и заблокировать его по таймауту, вы можете использовать параметр таймаута следующим образом:

try: queue.put(item, timeout=3) except queue.Full as e: # обработка исключения

Получаем элемент из очереди

Чтобы получить элемент из очереди, вы можете использовать метод get() :

Метод get() будет блокироваться до тех пор, пока элемент не будет доступен для получения из очереди.

Чтобы получить элемент из очереди без блокировки, можно передать аргумент block=False :

try: queue.get(block=False) except queue.Empty: # обработка исключения

Чтобы получить элемент из очереди и блокировать его с ограничением по времени, можно использовать метод get() с таймаутом:

try: item = queue.get(timeout=10) except queue.Empty: # . 

Получаем размер очереди

Метод qsize() возвращает количество элементов в очереди:

Кроме того, метод empty() возвращает True , если очередь пуста, или False в противном случае. С другой стороны, метод full() возвращает True , если очередь заполнена, или False в противном случае.

Помечаем задачу как выполненную

Элемент, который вы добавляете в очередь, представляет собой единицу работы или задачу.

Когда поток вызывает метод get() для получения элемента из очереди, ему может потребоваться обработать его, прежде чем задача будет считаться выполненной.

После завершения поток может вызвать метод task_done() , чтобы указать, что он полностью обработал задачу:

item = queue.get() # обработка элемента # . # помечаем элемент как выполненный queue.task_done()

Ждем завершение всех задач в очереди

Чтобы дождаться завершения всех задач в очереди, можно вызвать метод join() на объекте очереди:

Пример потокобезопасной очереди

Следующий пример демонстрирует использование потокобезопасной очереди для обмена данными между двумя потоками:

import time from queue import Empty, Queue from threading import Thread def producer(queue): for i in range(1, 6): print(f'Вставляем элемент в очередь') time.sleep(1) queue.put(i) def consumer(queue): while True: try: item = queue.get() except Empty: continue else: print(f'Обрабатываем элемент ') time.sleep(2) queue.task_done() def main(): queue = Queue() # создаем поток-производитель и запускаем его producer_thread = Thread( target=producer, args=(queue,) ) producer_thread.start() # создаем поток-потребитель и запускаем его consumer_thread = Thread( target=consumer, args=(queue,), daemon=True ) consumer_thread.start() # дожидаемся, пока все задачи добавятся в очередь producer_thread.join() # дожидаемся, пока все задачи в очереди будут завершены queue.join() if __name__ == '__main__': main()

Как это работает

1. Сначала мы создаем функцию producer() , которая добавляет в очередь числа от 1 до 11. На каждой итерации она задерживается на одну секунду:

def producer(queue): for i in range(1, 6): print(f'Вставляем элемент в очередь') time.sleep(1) queue.put(i)

2. Создаем функцию consumer() , которая получает элемент из очереди и обрабатывает его. Она задерживается на две секунды после обработки каждого элемента в очереди:

def consumer(queue): while True: try: item = queue.get() except Empty: continue else: print(f'Обрабатываем элемент ') time.sleep(2) queue.task_done()

queue.task_done() указывает на то, что функция обработала элемент очереди.

3. Создаем функцию main() , которая создает два потока. Один поток добавляет номер в очередь каждую секунду, а другой обрабатывает элемент в очереди каждые две секунды:

def main(): queue = Queue() # создаем поток-производитель и запускаем его producer_thread = Thread( target=producer, args=(queue,) ) producer_thread.start() # создаем поток-потребитель и запускаем его consumer_thread = Thread( target=consumer, args=(queue,), daemon=True ) consumer_thread.start() # дожидаемся, пока все задачи добавятся в очередь producer_thread.join() # дожидаемся, пока все задачи в очереди будут завершены queue.join()
  • Создаем новую очередь, вызвав конструктор Queue() .
  • Создаем новый поток с именем producer_thread и немедленно запускаем его.
  • Создаем поток-демон consumer_thread и немедленно запускаем его.
  • Ждем добавления всех номеров в очередь с помощью метода join() .
  • Ждем завершения всех задач в очереди с помощью метода join() .

Поток-производитель добавляет число в очередь каждую секунду, а поток-потребитель обрабатывает число из очереди каждые две секунды. Он также отображает числа в очереди каждую секунду.

Вставляем 1 элемент в очередь
Вставляем 2 элемент в очередь
Обрабатываем элемент 1
Вставляем 3 элемент в очередь
Обрабатываем элемент 2
Вставляем 4 элемент в очередь
Вставляем 5 элемент в очередь
Обрабатываем элемент 3
Обрабатываем элемент 4
Обрабатываем элемент 5

Источник

Многопоточность на примерах — модуль threading

Python включает в себя ряд разных параллельных конструкций, таких как как threading, queues и multiprocessing. Модуль threading использовался как главный способ достижения параллельности. Несколько лет назад, модуль multiprocessing был добавлен в пакет стандартных библиотек Python. В этой статье мы сфокусируемся на том, как использовать очереди и потоки (queues и threads).

Использование потоков

Мы начнем с простого примера, который демонстрирует работу потоков. Мы наследуем класс Thread в класс MyThread и укажем, чтобы его имя выводилось как stdout. Попробуем!

В этом коде мы импортировали модули random и time, также мы импортировали класс Thread из модуля threading Python. Далее, мы наследуем класс Thread, и переопределили его метод __init__ для принятия аргумента, под названием name. Для начала потока, вам нужно вызывать метод start().

После запуска потока, он автоматически вызовет метод run. Мы переопределили метод run таким образом, чтобы он выбирал случайный отсчет времени для «сна». Пример random.randint указывает Python выбрать случайное число от 3 до 15. После этого мы указываем потоку «спать» столько секунд, сколько было выбрано случайным способом, для симуляции его настоящей работы. Далее мы ввели имя потока, чтобы сказать пользователю, что он закончился. Функция create_threads создаст 5 потоков, дав каждому из них уникальное имя. Если вы запустите данный код, вы увидите что-то вроде этого:

Порядок выхода каждый раз будет разным. Попробуйте запустить код несколько раз, чтобы увидеть смену порядка. Теперь давайте напишем что-нибудь более практичное!

Написание потокового загрузчика

Предыдущий пример был не слишком полезным в качестве инструмента, показывающего, как именно работают Python потоки. Так что в данном примере, мы создадим класс Thread, который скачивает параллельно файлы из интернета. Мы воспользуемся бесплатным ресурсом в нашем демо. Посмотрим на код:

Это, в общем, полностью переписанный первый скрипт. Здесь мы импортировали наши модули os, urllib2, и threading python. Мы используем urllib2 для непосредственной загрузки в класс потока. Модуль os мы используем для извлечения имени файла, который мы загружаем, так что мы можем использовать его для создания файла с таким же названием на нашем компьютере. В классе DownloadThread мы настраиваем __init__ для принятия url и наименований для потока. В методе run, мы открываем url, извлекаем название файла, после чего используем это название для того, чтобы создать файл на диске.

Есть вопросы по Python?

На нашем форуме вы можете задать любой вопрос и получить ответ от всего нашего сообщества!

Telegram Чат & Канал

Вступите в наш дружный чат по Python и начните общение с единомышленниками! Станьте частью большого сообщества!

Одно из самых больших сообществ по Python в социальной сети ВК. Видео уроки и книги для вас!

После этого мы используем цикл для загрузки файла по килобайту за раз, и сохранять его на диск. После того, как сохранение файла завершится, мы выводим название потока и тот url, который загрузился. В Python 3 этот код немного отличаться. Нам нужно импортировать urllib вместо urllib2 и использовать urllib.request.urlopen вместо urllib2.urlopen. Вот код, в котором вы можете увидеть разницу:

Использование Queues

Очередь(Queues Python) может быть использована для стековых реализаций «пришел первым – ушел первым» (first-in-first-out (FIFO)) или же «пришел последним – ушел последним» (last-in-last-out (LILO)) , если вы используете их правильно.

В данном разделе, мы смешаем потоки и создадим простой скрипт файлового загрузчика, чтобы продемонстрировать, как работает Queues Python со случаями, которые мы хотим паралеллизировать. Чтобы помочь объяснить, как работает Queues, мы перепишем загрузочный скрипт из предыдущей секции для использования Queues. Приступим!

Давайте притормозим. В первую очередь, нам нужно взглянуть на определение главной функции для того, чтобы увидеть, как все протекает. Здесь мы видим, что она принимает список url адресов. Далее, функция main создаете экземпляр очереди, которая передана пяти демонизированным потокам. Основная разница между демонизированным и недемонизированным потоком в том, что вам нужно отслеживать недемонизированные потоки и закрывать их вручную, в то время как поток «демон» нужно только запустить и забыть о нем. Когда ваше приложение закроется, закроется и поток. Далее мы загрузили очередь (при помощи метода put) вместе с переданными url. Наконец, мы указываем очереди подождать, пока потоки выполнят свои процессы через метод join. В классе download у нас есть строчка self.queue.get(), которая выполняет функцию блока, пока очередь делает что-либо для возврата. Это значит, что потоки скромно будут дожидаться своей очереди. Также это значит, чтобы поток получал что-нибудь из очереди, он должен вызывать метод очереди под названием get. Таким образом, добавляя что-нибудь в очередь, пул потоков, поднимет или возьмет эти объекты и обработает их. Это также известно как dequeing. После того, как все объекты в очередь обработаны, скрипт заканчивается и закрывается. На моем компьютере были загружены первые 5 документов за секунду.

Подведем итоги

Теперь вы знаете, как использовать потоки и очереди, как в теории, так и на практике. Потоки особенно полезны, когда вы создаете пользовательский интерфейс и вам нужно поддерживать этот интерфейс юзабельным. Без потоков, пользовательский интерфейс может перестать реагировать и виснуть, пока вы загружаете большой файл, или создаете большой запрос к базе данных. Во избежание этого, вам нужно выполнять длительные процессы в потоках, и связать их с вашим интерфейсом.

Являюсь администратором нескольких порталов по обучению языков программирования Python, Golang и Kotlin. В составе небольшой команды единомышленников, мы занимаемся популяризацией языков программирования на русскоязычную аудиторию. Большая часть статей была адаптирована нами на русский язык и распространяется бесплатно.

E-mail: vasile.buldumac@ati.utm.md

Образование
Universitatea Tehnică a Moldovei (utm.md)

  • 2014 — 2018 Технический Университет Молдовы, ИТ-Инженер. Тема дипломной работы «Автоматизация покупки и продажи криптовалюты используя технический анализ»
  • 2018 — 2020 Технический Университет Молдовы, Магистр, Магистерская диссертация «Идентификация человека в киберпространстве по фотографии лица»

Источник

Читайте также:  Проверка подстроки в строке javascript
Оцените статью