Что такое мультипроцессинг питон

Модуль multiprocessing в Python

В настоящее время параллельной обработке уделяется больше внимания.

Поскольку производители ЦП начинают добавлять в свои процессоры все больше и больше ядер, создание параллельного кода – отличный способ повысить производительность. В языке Питон представил модуль multiprocessing в Python, позволяющий писать параллельный код.

Чтобы понять основную мотивацию этого модуля, мы должны знать некоторые основы параллельного программирования. Мы надеемся, что после прочтения этой статьи вы сможете почерпнуть некоторые знания по этой теме.

Очередь и блокировки

В модуле multiprocessing в python есть множество классов для создания параллельной программы. Среди них три основных класса: Process, Queue и Lock. Эти классы помогут вам построить параллельную программу.

Чтобы сделать параллельную программу полезной, вы должны знать, сколько ядер у вас на компьютере. Модуль Multiprocessing позволяет вам это знать. Следующий простой код напечатает количество ядер на вашем компьютере.

import multiprocessing print("Number of cpu : ", multiprocessing.cpu_count())

Следующий вывод может отличаться для вашего компьютера. Для меня количество ядер 8.

Пример модуля multiprocessing в python

Что такое класс Process в Python?

Класс Process в Python – это абстракция, которая настраивает другой процесс Python, предоставляет его для выполнения кода.

К классу Process принадлежат две важные функции – start() и join().

Сначала нам нужно написать функцию, которая будет запускаться процессом. Затем нам нужно создать экземпляр объекта процесса.

Если мы создадим объект процесса, ничего не произойдет, пока мы не скажем ему начать обработку с помощью функции start(). Затем процесс запустится и вернет свой результат. После этого мы говорим процессу завершиться через функцию join().

Без вызова функции join() процесс останется бездействующим и не завершится.

Поэтому, если вы создадите много процессов и не завершите их, вы можете столкнуться с нехваткой ресурсов. Тогда вам может потребоваться удалить их вручную.

Одна важная вещь: если вы хотите передать какой-либо аргумент через процесс, вам нужно использовать аргумент ключевого слова args. Следующий код будет полезен для понимания использования класса Process.

from multiprocessing import Process def print_func(continent='Asia'): print('The name of continent is : ', continent) if __name__ == "__main__": # confirms that the code is under main function names = ['America', 'Europe', 'Africa'] procs = [] proc = Process(target=print_func) # instantiating without any argument procs.append(proc) proc.start() # instantiating process with arguments for name in names: # print(name) proc = Process(target=print_func, args=(name,)) procs.append(proc) proc.start() # complete the processes for proc in procs: proc.join()

Класс process

Класс Queue

Модули Multiprocessing предоставляют класс Queue, который в точности является структурой данных «первым пришел – первым обслужен». Они могут хранить любой объект pickle (хотя лучше всего подходят простые) и чрезвычайно полезны для обмена данными между процессами.

Очереди особенно полезны, когда передаются в качестве параметра целевой функции процесса, чтобы позволить ему потреблять данные. Используя функцию put(), мы можем вставлять данные в очередь, а с помощью get() мы можем получать элементы из очередей. Смотрите следующий код для быстрого примера.

from multiprocessing import Queue colors = ['red', 'green', 'blue', 'black'] cnt = 1 # instantiating a queue object queue = Queue() print('pushing items to queue:') for color in colors: print('item no: ', cnt, ' ', color) queue.put(color) cnt += 1 print('\npopping items from queue:') cnt = 0 while not queue.empty(): print('item no: ', cnt, ' ', queue.get()) cnt += 1

Класс queue

Класс Lock

Задача класса Lock довольно проста. Он позволяет коду требовать блокировку, чтобы никакой другой процесс не мог выполнить аналогичный код, пока блокировка не будет снята. Таким образом, задача класса Lock состоит в основном из двух процессов. Один – потребовать блокировку, а другой – снять блокировку. Чтобы потребовать блокировку, используется функция Acquire(), а для снятия блокировки – функция Release().

Пример

В этом примере мы объединим все наши знания воедино.

Предположим, нам нужно выполнить какие-то задачи. Для этого мы будем использовать несколько процессов. Один будет содержать задачи, а другой – журнал выполненных задач.

Затем мы создаем экземпляры процессов для выполнения задачи. Обратите внимание, что класс Lock уже синхронизирован. Это означает, что нам не нужно использовать класс Lock, чтобы блокировать несколько процессов для доступа к одному и тому же объекту очереди. Поэтому в этом случае нам не нужно использовать класс Lock.

Ниже приведена реализация, в которой мы добавляем задачи в очередь, затем создаем процессы и запускаем их, а затем используем join() для завершения процессов. Наконец, печатаем журнал из второй очереди.

from multiprocessing import Lock, Process, Queue, current_process import time import queue # imported for using queue.Empty exception def do_job(tasks_to_accomplish, tasks_that_are_done): while True: try: ''' try to get task from the queue. get_nowait() function will raise queue.Empty exception if the queue is empty. queue(False) function would do the same task also. ''' task = tasks_to_accomplish.get_nowait() except queue.Empty: break else: ''' if no exception has been raised, add the task completion message to task_that_are_done queue ''' print(task) tasks_that_are_done.put(task + ' is done by ' + current_process().name) time.sleep(.5) return True def main(): number_of_task = 10 number_of_processes = 4 tasks_to_accomplish = Queue() tasks_that_are_done = Queue() processes = [] for i in range(number_of_task): tasks_to_accomplish.put("Task no " + str(i)) # creating processes for w in range(number_of_processes): p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done)) processes.append(p) p.start() # completing process for p in processes: p.join() # print the output while not tasks_that_are_done.empty(): print(tasks_that_are_done.get()) return True if __name__ == '__main__': main()

В зависимости от количества задач, коду потребуется некоторое время, чтобы показать вам результат. Вывод следующего кода будет время от времени изменяться.

Пример multiprocessing

Многопроцессорный Pool

Pool многопроцессорной обработки в Python может использоваться для параллельного выполнения функции с несколькими входными значениями, распределяя входные данные по процессам (параллелизм данных). Ниже приведен простой пример многопроцессорного пула Python.

from multiprocessing import Pool import time work = (["A", 5], ["B", 2], ["C", 1], ["D", 3]) def work_log(work_data): print(" Process %s waiting %s seconds" % (work_data[0], work_data[1])) time.sleep(int(work_data[1])) print(" Process %s Finished." % work_data[0]) def pool_handler(): p = Pool(2) p.map(work_log, work) if __name__ == '__main__': pool_handler()

На изображении ниже показан результат работы вышеуказанной программы. Обратите внимание, что размер Pool равен 2, поэтому два выполнения функции work_log выполняются параллельно. Когда обработка одной из функций завершается, она выбирает следующий аргумент и так далее.

Источник

Многопроцессорность в Python

Многопроцессорность позволяет двум или более процессорам одновременно обрабатывать две или более различных частей программы. В Python для реализации мультипроцессинга используется модуль multiprocessing.

Пример мультипроцессорной программы

import time def task(n=100_000_000): while n: n -= 1 if __name__ == '__main__': start = time.perf_counter() task() task() finish = time.perf_counter() print(f'Выполнение заняло секунд.')
Выполнение заняло 12.94 секунд.

Как это работает

1. Определим функцию task() , которая имеет большой цикл while от 10 миллионов до 0. Функция task() привязана к процессору, потому что она выполняет вычисления.

def task(n=100_000_000): while n: n -= 1

2. Дважды вызовем функцию task() и засечем время обработки:

if __name__ == '__main__': start = time.perf_counter() task() task() finish = time.perf_counter() print(f'Выполнение заняло секунд.')

Используем модуль multiprocessing

В следующей программе используется модуль multiprocessing — и она выполняется быстрее:

import time import multiprocessing def task(n=100_000_000): while n: n -= 1 if __name__ == '__main__': start = time.perf_counter() p1 = multiprocessing.Process(target=task) p2 = multiprocessing.Process(target=task) p1.start() p2.start() p1.join() p2.join() finish = time.perf_counter() print(f'Выполнение заняло секунд.')
Выполнение заняло 6.45 секунд.

Как это работает

1. Импортируем модуль multiprocessing .

2. Создадим два процесса и передадим функцию task() каждому из них:

p1 = multiprocessing.Process(target=task) p2 = multiprocessing.Process(target=task)

Примечание. Конструктор Process() возвращает новый объект Process.

3. Вызовем метод start() объектов Process для запуска процесса:

4. Дожидаемся завершения процессов с помощью метода join() :

Практический пример программы с многопроцессорностью

Мы будем использовать модуль multiprocessing для изменения размера изображений высокого разрешения.

Для начала установим библиотеку Pillow для обработки изображений:

Теперь напишем программу, которая создает миниатюры изображений в папке images и сохраняет их в папке thumbs:

import time import os from PIL import Image, ImageFilter filenames = [ 'images/1.jpg', 'images/2.jpg', 'images/3.jpg', 'images/4.jpg', 'images/5.jpg', ] def create_thumbnail(filename, size=(50,50), thumb_dir ='thumbs'): img = Image.open(filename) img = img.filter(ImageFilter.GaussianBlur()) img.thumbnail(size) img.save(f'/') print(f'Файл обработан. ') if __name__ == '__main__': start = time.perf_counter() for filename in filenames: create_thumbnail(filename) finish = time.perf_counter() print(f'Выполнение заняло секунд.')
Файл images/1.jpg обработан. 
Файл images/2.jpg обработан.
Файл images/3.jpg обработан.
Файл images/4.jpg обработан.
Файл images/5.jpg обработан.
Выполнение заняло 1.28 секунд.

Теперь модифицируем программу для использования многопроцессорной обработки. Каждый процесс будет создавать миниатюру изображения:

import time import os import multiprocessing from PIL import Image, ImageFilter filenames = [ 'images/1.jpg', 'images/2.jpg', 'images/3.jpg', 'images/4.jpg', 'images/5.jpg', ] def create_thumbnail(filename, size=(50,50),thumb_dir ='thumbs'): img = Image.open(filename) img = img.filter(ImageFilter.GaussianBlur()) img.thumbnail(size) img.save(f'/') print(f'Файл обработан. ') if __name__ == '__main__': start = time.perf_counter() # создаем процесс processes = [multiprocessing.Process(target=create_thumbnail, args=[filename]) for filename in filenames] # запускаем процесс for process in processes: process.start() # дожидаемся выполнение for process in processes: process.join() finish = time.perf_counter() print(f'Выполнение заняло секунд.')
Файл images/5.jpg обработан. 
Файл images/4.jpg обработан.
Файл images/1.jpg обработан.
Файл images/3.jpg обработан.
Файл images/2.jpg обработан.
Выполнение заняло 0.82 секунд.

При использовании многопроцессорности программа обрабатывает изображения гораздо быстрее.

Что нужно запомнить

Используйте многопроцессорную обработку Python для параллельного запуска кода для решения задач, привязанных к процессору.

СodeСhick.io — простой и эффективный способ изучения программирования.

2023 © ООО «Алгоритмы и практика»

Источник

Читайте также:  Python context manager exceptions
Оцените статью