Обработка потока данных python

Python. Урок 22. Потоки и процессы в Python. Часть 1. Управление потоками

Follow us on Google Plus Follow us on rss

Этот урок открывает цикл статей, посвященных параллельному программированию в Python. В рамках данного урока будут рассмотрены вопросы терминологии, относящиеся к параллельному программированию, GIL, создание и управление потоками в Python.

Синхронность и асинхронность. Параллелизм и конкурентность

Для начала разберемся с терминологией, которую мы будем использовать в рамках данного цикла статей, посвященного параллельному программированию на Python .

Синхронное выполнение программы подразумевает последовательное выполнение операций. Асинхронное – предполагает возможность независимого выполнения задач.

Приведем пример из математики, представьте, что у нас есть функция:

Для того, чтобы определить, чему равно значение функции при x=4, нам необходимо вначале вычислить выражение (x+1) и только потом, полученное значение возвести в квадрат:

Это пример синхронного порядка вычисления: операции были выполнены последовательно и, в данном случае, по-другому быть не могло.

Теперь посмотрите на такую функцию:

Для вычисления значения функции в точке x=4 мы также можем придерживаться синхронного порядка: вначале выполнить операцию возведения в квадрат, потом вычислим произведение и просуммируем полученные результаты:

Если внимательно посмотреть на эту функцию, то можно заметить, что для того, чтобы вычислить x^2 не нужно знать значение произведения 2*x и наоборот. Операции вычисления квадратного корня и произведения можно выполнять независимо друг от друга.

… значения 4^2 и 2*4 вычисляются независимо разными вычислителями…

Более житейский пример будет выглядеть так: синхронность — это когда вы сначала сварили картошку, а потом помыли кастрюлю, и помыть ее раньше того, как в ней приготовили вы не можете. Асинхронность — это когда вы варите картошку и одновременно прибираетесь на кухне – эти задачи можно выполнять параллельно.

Теперь несколько слов о конкурентности и параллелизме . Конкурентность предполагает выполнение нескольких задач одним исполнителем. Из примера с готовкой: один человек варит картошку и прибирается, при этом, в процессе, он может переключаться: немного прибрался, пошел помешал-посмотрел на картошку, и делает он это до тех пор, пока все не будет готово.

Параллельность предполагает параллельное выполнение задач разными исполнителями: один человек занимается готовкой, другой приборкой. В примере с математикой операции 4^2 и 2*4 могут выполнять два разных процессора.

Несколько слов о GIL

Для того, чтобы двигаться дальше необходимо сказать несколько слов о GIL . GIL — это аббревиатура от Global Interpreter Lock – глобальная блокировка интерпретатора. Он является элементом эталонной реализации языка Python , которая носит название CPython . Суть GIL заключается в том, что выполнять байт код может только один поток. Это нужно для того, чтобы упростить работу с памятью (на уровне интерпретатора) и сделать комфортной разработку модулей на языке C . Это приводит к некоторым особенностям, о которых необходимо помнить. Условно, все задачи можно разделить на две большие группы: в первую входят те, что преимущественно используют процессор для своего выполнения, например, математические, их ещё называют CPU-bound , во вторую – задачи работающие с вводом выводом (диск, сеть и т.п.), такие задачи называют IO-bound . Если вы запустили в одном интерпретаторе несколько потоков, которые в основном используют процессор, то скорее всего получите общее замедление работы, а не прирост производительности. Пока выполняется одна задача, остальные простаивают (из-за GIL), переключение происходит через определенные промежутки времени. Таким образом, в каждый конкретный момент времени, будет выполняться только один поток, несмотря на то, что у вас может быть многоядерный процессор (или многопроцессорный сервер), плюс ко всему, будет тратиться время на переключение между задачами. Если код в потоках в основном выполняет операции ввода-вывода, то в этом случае ситуация будет в вашу пользу. В CPython все стандартные библиотечные функций, которые выполняют блокирующий ввод-вывод, освобождают GIL , это дает возможность поработать другим потокам, пока ожидается ответ от ОС.

Потоки в Python

Потоки позволяют запустить выполнение нескольких задач в конкурентном режиме в рамках одного процесса интерпретатора. При этом, нужно помнить о GIL . Все потоки будут выполняться на одном CPU , даже если задачи могут выполняться параллельно. Поэтому есть такое правило, если ваши задачи в основном потребляют ресурсы процессора, то используйте процессы, если ввод-вывод, то потоки и другие инструменты асинхронного программирования, которые в Python обладают довольно мощным функционалом.

Создание и ожидание завершения работы потоков. Класс Thread

За создание, управление и мониторинг потоков отвечает класс Thread из модуля threading . Поток можно создать на базе функции, либо реализовать свой класс – наследник Thread и переопределить в нем метод run() . Для начала рассмотрим вариант создания потока на базе функции:

from threading import Thread from time import sleep def func(): for i in range(5): print(f"from child thread: ") sleep(0.5) th = Thread(target=func) th.start() for i in range(5): print(f"from main thread: ") sleep(1)

В приведенном выше примере мы импортировали нужные модули. После этого объявили функцию func() , которая выводит пять раз сообщение с числовым маркером с задержкой в 500 мс. Далее создали объект класса Thread , в нем, через параметр target, указали, какую функцию запускать как поток и запустили его. В главном потоке добавили код вывода сообщений с интервалом в 1000 мс.

В результате запуска этого кода получим следующее:

from child thread: 0 from main thread: 0 from child thread: 1 from main thread: 1 from child thread: 2 from child thread: 3 from main thread: 2 from child thread: 4 from main thread: 3 from main thread: 4

Как вы можете видеть, код из главного и дочернего потоков выполняются псевдопараллельно (во всяком случае создается такое ощущение), т.к. задержка в дочернем потоке меньше, то сообщение из него появляются чаще.

Если необходимо дождаться завершения работы потока(ов) перед тем как начать выполнять какую-то другую работу, то воспользуйтесь методом join() :

th1 = Thread(target=func) th2 = Thread(target=func) th1.start() th2.start() th1.join() th2.join() print("--> stop")

У join() есть параметр timeout , через который задается время ожидания завершения работы потоков.

Для того, чтобы определить выполняет ли поток какую-то работу или завершился используется метод is_alive() .

th = Thread(target=func) print(f"thread status: ") th.start() print(f"thread status: ") sleep(5) print(f"thread status: ")

В результате получим следующее:

thread status: False from child thread: 0 thread status: True from child thread: 1 from child thread: 2 from child thread: 3 from child thread: 4 thread status: False

Для задания потоку имени воспользуйтесь свойством name .

Создание классов наследников от Thread

Ещё одни способ создавать и управлять потоками – это реализовать класс наследник от Thread и переопределить у него метод run() .

class CustomThread(Thread): def __init__(self, limit): Thread.__init__(self) self._limit = limit def run(self): for i in range(self._limit): print(f"from CustomThread: ") sleep(0.5) cth = CustomThread(3) cth.start()

В терминале получим следующее:

from CustomThread: 0 from CustomThread: 1 from CustomThread: 2

Принудительное завершение работы потока

В Python у объектов класса Thread нет методов для принудительного завершения работы потока. Один из вариантов решения этой задачи – это создать специальный флаг, через который потоку будет передаваться сигнал остановки. Доступ к такому флагу должен управляться объектом синхронизации.

from threading import Thread, Lock from time import sleep lock = Lock() stop_thread = False def infinit_worker(): print("Start infinit_worker()") while True: print("--> thread work") lock.acquire() if stop_thread is True: break lock.release() sleep(0.1) print("Stop infinit_worker()") # Create and start thread th = Thread(target=infinit_worker) th.start() sleep(2) # Stop thread lock.acquire() stop_thread = True lock.release()

Если мы запустим эту программу, то в консоли увидим следующее:

Start infinit_worker() --> thread work --> thread work --> thread work --> thread work --> thread work Stop infinit_worker()

Разберемся с этим кодом более подробно. В строке 4 мы создаем объект класса Lock , он используется для синхронизации доступа к ресурсам из нескольких потоков, про них мы более подробно расскажем в следующей статье. В нашем случае, ресурс — это переменная stop_thread , объявленная в строке 6, которая используется как сигнал для остановки потока. После этого, в строке 8, объявляется функция infinit_worker() , ее мы запустим как поток. В ней выполняется бесконечный цикл, каждый проход которого отмечается выводом в терминал сообщения “ –> thread work ” и проверкой состояния переменной stop_thread . В главном потоке программы создается и запускается дочерний поток (строки 24, 25), выполняется функция задержки и принудительно завершается поток путем установки переменной stop_thread значения True .

Потоки-демоны

Есть такая разновидность потоков, которые называются демоны (терминология взята из мира Unix -подобных систем). Python-приложение не будет закрыто до тех пор, пока в нем работает хотя бы один недемонический поток.

def func(): for i in range(5): print(f"from child thread: ") sleep(0.5) th = Thread(target=func) th.start() print("App stop")
from child thread: 0 App stop from child thread: 1 from child thread: 2 from child thread: 3 from child thread: 4

Как вы можете видеть, приложение продолжает работать, даже после того, как главный поток завершился (сообщение: “App stop”).

Для того, чтобы потоки не мешали остановке приложения (т.е. чтобы они останавливались вместе с завершением работы программы) необходимо при создании объекта Thread аргументу daemon присвоить значение True , либо после создания потока, перед его запуском присвоить свойству deamon значение True . Изменим процесс создания потока в приведенной выше программе:

th = Thread(target=func, daemon=True)

Запустим ее, получим следующий результат:

from child thread: 0 App stop

Поток остановился вместе с остановкой приложения.

P.S.

Вводные уроки по “Линейной алгебре на Python” вы можете найти соответствующей странице нашего сайта . Все уроки по этой теме собраны в книге “Линейная алгебра на Python”.

Если вам интересна тема анализа данных, то мы рекомендуем ознакомиться с библиотекой Pandas. Для начала вы можете познакомиться с вводными уроками. Все уроки по библиотеке Pandas собраны в книге “Pandas. Работа с данными”.

Python. Урок 22. Потоки и процессы в Python. Часть 1. Управление потоками : 2 комментария

  1. Константин 19.08.2020 Замечательные уроки, коротко и понятно излагаете важные вещи!
    Жду следующие статьи, продолжайте в том же духе!

Источник

Читайте также:  Java деление остаток от деления
Оцените статью