Python multiprocessing остановить процесс

Python Multiprocessing изящное завершение работы в правильном порядке

Сначала останавливается процесс worker(), затем данные процесса result_queue() промываются и останавливаются.

post main image

Для нового проекта мне понадобился процесс deamon, который должен выполнять множество более или менее одинаковых операций на различных ресурсах. В данном случае операция связана с IO, и я решил эту проблему с помощью ThreadPoolExecutor. Пока все хорошо.

Далее я хотел хранить результаты в файлах. Конечно, мы используем очередь для связи между процессами. Процесс worker() использует q.put() для добавления элементов в очередь, а процесс result_queue() использует q.get() для получения элементов из очереди.

Мой процесс result_queue() также буферизирует полученные результаты. Как только достигается порог, все результаты сразу записываются в файл.

И вот в чем проблема. Когда вы нажимаете CTRL-C или посылаете сигнал SIGTERM , то процессы резко завершаются. Это означает:

  • Ресурсы, к которым обращался worker() , могут быть оставлены в плохом состоянии.
  • result_queue() может содержать много результатов, которые не будут сохранены.

Нехорошо! Ниже я покажу, как я решил эту проблему. Как всегда, я использую Ubuntu (20.04).

Использование событий для остановки дочерних процессов

Страницы в интернете о Python и Multiprocessing часто сбивают с толку, в основном потому, что многие ссылаются на Python2. Хорошо то, что вы вынуждены читать официальную документацию .

Существует два способа, с помощью которых процессы могут взаимодействовать (надежно) друг с другом:

В данном случае, чтобы остановить процессы, мы используем события. Мы называем наши события stop-events, потому что они требуют остановки дочернего процесса. Это работает следующим образом:

  1. Создайте событие (объект)
  2. Передайте событие в качестве параметра при запуске дочернего процесса.
  3. В дочернем процессе: проверьте событие и остановитесь, если событие ‘is_set()’
  4. В главном процессе: ‘set()’ событие, если дочерний процесс должен быть остановлен.

Замените обработчики для SIGINT / SIGTERM

Вот трюк, который заставляет все работать:

Когда вы порождаете дочерний процесс, этот процесс также наследует обработчики сигналов от родительского.

Мы не хотим, чтобы наши дочерние процессы реагировали на CTRL-C (сигнал SIGINT ) или SIGTERM . Вместо этого мы хотим, чтобы главный процесс решал, как будут остановлены дочерние процессы. Это означает, что мы должны:

  1. Заменить обработчики SIGINT и SIGTERM для дочерних процессов на фиктивные обработчики. Эти обработчики ничего не делают, что означает, что дочерние процессы не завершаются по этим сигналам.
  2. Замените обработчики SIGINT и SIGTERM главного процесса на нашу собственную версию. Эти обработчики сообщают главному процессу, что он должен выключиться. Это делается с помощью события остановки главного процесса (объект)

При такой схеме главный процесс проверяет событие main stop. Если ‘is_set()’, он использует ‘set()’, чтобы сообщить дочернему процессу X о необходимости остановиться. Поскольку мы можем передавать события остановки всем дочерним процессам, главный процесс имеет полный контроль. Он может дождаться остановки дочернего процесса, прежде чем остановить другой дочерний процесс.

Код

Когда поток worker() завершает работу, он отправляет результат в result_queue(). Для изящной остановки мы должны сначала попросить процесс worker() остановиться, а как только он остановился, мы должны попросить остановиться процесс result_queue() .

Думаю, код несложно прочитать. После запуска дочерних процессов главный процесс в цикле проверяет, не запрошена ли остановка.

Единственное, что я не сказал, это то, что я добавил код для принудительного выключения через определенное количество секунд в случае возникновения какой-либо проблемы.

# procsig.py import logging import multiprocessing import os import signal import sys import time def init_logger(): logger_format = "%(asctime)s %(levelname)-8.8s [%(funcName)24s():%(lineno)-3s] %(message)s" formatter = logging.Formatter(logger_format) logger = logging.getLogger() logger.setLevel(logging.DEBUG) handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) return logger logger = init_logger() # main sigint/sigterm handlers main_stop_event = multiprocessing.Event() def main_sigint_handler(signum, frame): logger.debug('') main_stop_event.set() def main_sigterm_handler(signum, frame): logger.debug('') main_stop_event.set() # children sigint/sigterm handlers, let main process handle this def children_sigint_handler(signum, frame): logger.debug('') def children_sigterm_handler(signum, frame): logger.debug('') def worker(rq, stop_event): i = 0 while True: if stop_event.is_set(): break logger.debug('worker: <>'.format(i)) time.sleep(1) i += 1 logger.debug('worker STOPPING . ') time.sleep(1) logger.debug('worker STOPPED') def result_queue(rq, stop_event): i = 0 while True: if stop_event.is_set(): break logger.debug('result_queue: <>'.format(i)) time.sleep(1) i += 1 logger.debug('result_queue: STOPPING') time.sleep(1) logger.debug('result_queue: STOPPED') def main(): # events global main_stop_event worker_stop_event = multiprocessing.Event() result_queue_stop_event = multiprocessing.Event() # setup result queue rq = multiprocessing.Queue() # children: capture sigint/sigterm signal.signal(signal.SIGINT, children_sigint_handler) signal.signal(signal.SIGTERM, children_sigterm_handler) # start worker worker_proc = multiprocessing.Process( name='worker', target=worker, kwargs=, ) worker_proc.daemon = True worker_proc.start() # start result_queue result_queue_proc = multiprocessing.Process( name='result_queue', target=result_queue, kwargs=, ) result_queue_proc.daemon = True result_queue_proc.start() # main: capture sigint/sigterm signal.signal(signal.SIGINT, main_sigint_handler) signal.signal(signal.SIGTERM, main_sigterm_handler) child_procs = [worker_proc, result_queue_proc] worker_stop_sent = False result_queue_stop_sent = False max_allowed_shutdown_seconds = 10 shutdown_et = None keep_running = True while keep_running: if shutdown_et is not None and shutdown_et < int(time.time()): logger.debug('Forced shutdown . ') break run_count = 0 for p in child_procs: logger.debug('[P> - <> - <> - <>'.format(p.name, p.pid, p.is_alive(), p.exitcode)) if p.is_alive(): run_count += 1 # send 'stop' to result_queue()? if p.pid == worker_proc.pid and not p.is_alive() and not result_queue_stop_sent: logger.debug('Sending stop to result_queue . ') result_queue_stop_event.set() result_queue_stop_sent = True # send 'stop' to worker()? if main_stop_event.is_set() and not worker_stop_sent: logger.debug('Sending stop to worker . ') worker_stop_event.set() worker_stop_sent = True shutdown_et = int(time.time()) + max_allowed_shutdown_seconds time.sleep(1) if run_count == 0: keep_running = False logger.debug('terminating children . ') try: worker_proc.terminate() result_queue_proc.terminate() worker_proc.kill() result_queue_proc.terminate() except Exception as e: # who cares logger.debug('Exception <>, e = <>'.format(type(e).__name__, e)) logger.debug('Done') if __name__=='__main__': main()

Чтобы проверить это, нажмите CTRL-C, или откройте другое окно терминала и завершите этот процесс:

Результат журнала

Вы можете попробовать это сами, ниже я покажу, что будет выведено в консоль. В ’19:28:37′ я нажал CTRL-C. Сразу же вызываются обработчики сигналов. Процесс main() посылает сообщение об остановке процессу worker() , используя ‘set()’. Как только процесс worker() остановился, проверив ‘is_alive()’, процессу result_queue() посылается сообщение об остановке.

2021-06-16 19:28:34,238 DEBUG [ worker():182] worker: 6 2021-06-16 19:28:35,239 DEBUG [ worker():182] worker: 7 2021-06-16 19:28:35,239 DEBUG [ result_queue():197] result_queue: 7 2021-06-16 19:28:35,239 DEBUG [ main():259] [P> worker - 465189 - True - None 2021-06-16 19:28:35,239 DEBUG [ main():259] [P> result_queue - 465190 - True - None 2021-06-16 19:28:36,239 DEBUG [ worker():182] worker: 8 2021-06-16 19:28:36,240 DEBUG [ result_queue():197] result_queue: 8 2021-06-16 19:28:36,240 DEBUG [ main():259] [P> worker - 465189 - True - None 2021-06-16 19:28:36,241 DEBUG [ main():259] [P> result_queue - 465190 - True - None 2021-06-16 19:28:37,239 DEBUG [ worker():182] worker: 9 2021-06-16 19:28:37,241 DEBUG [ result_queue():197] result_queue: 9 2021-06-16 19:28:37,242 DEBUG [ main():259] [P> worker - 465189 - True - None 2021-06-16 19:28:37,242 DEBUG [ main():259] [P> result_queue - 465190 - True - None ^C2021-06-16 19:28:37,598 DEBUG [ children_sigint_handler():170] 2021-06-16 19:28:37,598 DEBUG [ children_sigint_handler():170] 2021-06-16 19:28:37,598 DEBUG [ main_sigint_handler():160] 2021-06-16 19:28:38,240 DEBUG [ worker():182] worker: 10 2021-06-16 19:28:38,242 DEBUG [ result_queue():197] result_queue: 10 2021-06-16 19:28:38,243 DEBUG [ main():259] [P> worker - 465189 - True - None 2021-06-16 19:28:38,243 DEBUG [ main():259] [P> result_queue - 465190 - True - None 2021-06-16 19:28:38,243 DEBUG [ main():271] Sending stop to worker . 2021-06-16 19:28:39,241 DEBUG [ worker():186] worker STOPPING . 2021-06-16 19:28:39,243 DEBUG [ result_queue():197] result_queue: 11 2021-06-16 19:28:39,244 DEBUG [ main():259] [P> worker - 465189 - True - None 2021-06-16 19:28:39,244 DEBUG [ main():259] [P> result_queue - 465190 - True - None 2021-06-16 19:28:40,242 DEBUG [ worker():188] worker STOPPED 2021-06-16 19:28:40,244 DEBUG [ result_queue():197] result_queue: 12 2021-06-16 19:28:40,245 DEBUG [ main():259] [P> worker - 465189 - False - 0 2021-06-16 19:28:40,245 DEBUG [ main():265] Sending stop to result_queue . 2021-06-16 19:28:40,246 DEBUG [ main():259] [P> result_queue - 465190 - True - None 2021-06-16 19:28:41,246 DEBUG [ result_queue():201] result_queue: STOPPING 2021-06-16 19:28:41,247 DEBUG [ main():259] [P> worker - 465189 - False - 0 2021-06-16 19:28:41,247 DEBUG [ main():259] [P> result_queue - 465190 - True - None 2021-06-16 19:28:42,247 DEBUG [ result_queue():203] result_queue: STOPPED 2021-06-16 19:28:42,248 DEBUG [ main():259] [P> worker - 465189 - False - 0 2021-06-16 19:28:42,248 DEBUG [ main():259] [P> result_queue - 465190 - False - 0 2021-06-16 19:28:43,249 DEBUG [ main():280] terminating children . 2021-06-16 19:28:43,249 DEBUG [ main():290] Done

Резюме

Я искал способ изящной остановки процессов с помощью Python Multiprocessing , а также хотел контролировать порядок остановки процессов. Я решил эту проблему, используя события и заменив обработчики сигналов. Это дает мне полный контроль над главным процессом. Теперь я могу попросить остановить процесс B после остановки процесса A.

Источник

Читайте также:  Java как удалить file
Оцените статью