Multiprocessing Pool Exception Handling in Python
Exceptions may be raised when initializing worker processes, in target task processes, and in callback functions once tasks are completed.
In this tutorial you will discover how to handle exceptions in a Python multiprocessing pool.
Multiprocessing Pool Exception Handling
Exception handling is an important consideration when using processes.
Code may raise an exception when something unexpected happens and the exception should be dealt with by your application explicitly, even if it means logging it and moving on.
There are three points you may need to consider exception handling when using the multiprocessing.pool.Pool, they are:
Let’s take a closer look at each point in turn.
Run your loops using all CPUs, download my FREE book to learn how.
Exception Handling in Worker Initialization
You can specify a custom initialization function when configuring your multiprocessing.pool.Pool.
This can be set via the “initializer” argument to specify the function name and “initargs” to specify a tuple of arguments to the function.
Each process started by the process pool will call your initialization function before starting the process.
You can learn more about configuring the pool with worker initializer functions in the tutorial:
If your initialization function raises an exception it will break your process pool.
We can demonstrate this with an example of a contrived initializer function that raises an exception.
Running the example fails with an exception, as we expected.
The process pool is created and nearly immediately, the internal child worker processes are created and initialized.
Each worker process fails to be initialized given that the initialization function raises an exception.
The process pool then attempts to restart new replacement child workers for each process that was started and failed. These too fail with exceptions.
The process repeats many times until some internal limit is reached and the program exits.
A truncated example of the output is listed below.
This highlights that if you use a custom initializer function, that you must carefully consider the exceptions that may be raised and perhaps handle them, otherwise out at risk all tasks that depend on the process pool.
Confused by the Pool class API?
Download my FREE PDF cheat sheet
Exception Handling in Task Execution
An exception may occur while executing your task.
This will cause the task to stop executing, but will not break the process pool.
If tasks were issued with a synchronous function, such as apply(), map(), or starmap() the exception will be re-raised in the caller.
If tasks are issued with an asynchronous function such as apply_async(), map_async(), or starmap_async(), an AsyncResult object will be returned. If a task issued asynchronously raises an exception, it will be caught by the process pool and re-raised if you call get() function in the AsyncResult object in order to get the result.
It means that you have two options for handling exceptions in tasks, they are:
- Handle exceptions within the task function.
- Handle exceptions when getting results from tasks.
Let’s take a closer look at each approach in turn.
Exception Handling Within the Task
Handling the exception within the task means that you need some mechanism to let the recipient of the result know that something unexpected happened.
This could be via the return value from the function, e.g. None.
Alternatively, you can re-raise an exception and have the recipient handle it directly. A third option might be to use some broader state or global state, perhaps passed by reference into the call to the function.
The example below defines a work task that will raise an exception, but will catch the exception and return a result indicating a failure case.
Python: Multiprocessing and Exceptions
Python’s multiprocessing module provides an interface for spawning and managing child processes that is familiar to users of the threading module. One problem with the multiprocessing module, however, is that exceptions in spawned child processes don’t print stack traces:
Consider the following snippet:
import multiprocessing import somelib def f(x): return 1 / somelib.somefunc(x) if __name__ == '__main__': with multiprocessing.Pool(5) as pool: print(pool.map(f, range(5)))
and the following error message:
Traceback (most recent call last): File "test.py", line 9, in print(pool.map(f, range(5))) File "/usr/lib/python3.3/multiprocessing/pool.py", line 228, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "/usr/lib/python3.3/multiprocessing/pool.py", line 564, in get raise self._value ZeroDivisionError: division by zero
What triggered the ZeroDivisionError ? Did somelib.somefunc(x) return 0, or did some other computation in somelib.somefunc() cause the exception? You will notice that we only see the stack trace of the main process, whereas the stack trace of the code that actually triggered the exception in the worker processes is not shown at all.
Luckily, Python provides a handy traceback module for working with exceptions and stack traces. All we have to do is catch the exception inside the worker process, and print it. Let’s change the code above to read:
import multiprocessing import traceback import somelib def f(x): try: return 1 / somelib.somefunc(x) except Exception as e: print('Caught exception in worker thread (x = %d):' % x) # This prints the type, value, and stack trace of the # current exception being handled. traceback.print_exc() print() raise e if __name__ == '__main__': with multiprocessing.Pool(5) as pool: print(pool.map(f, range(5)))
Now, if you run the same code again, you will see something like this:
Caught exception in worker thread (x = 0): Traceback (most recent call last): File "test.py", line 7, in f return 1 / somelib.somefunc(x) File "/path/to/somelib.py", line 2, in somefunc return 1 / x ZeroDivisionError: division by zero Traceback (most recent call last): File "test.py", line 16, in print(pool.map(f, range(5))) File "/usr/lib/python3.3/multiprocessing/pool.py", line 228, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "/usr/lib/python3.3/multiprocessing/pool.py", line 564, in get raise self._value ZeroDivisionError: division by zero
The printed traceback reveals somelib.somefunc() to be the actual culprit.
In practice, you may want to save the exception and the stack trace somewhere. For that, you can use the file argument of print_exc in combination with StringIO . For example:
import logging import io # Import StringIO in Python 2 . def Work(. ): try: . except Exception as e: exc_buffer = io.StringIO() traceback.print_exc(file=exc_buffer) logging.error( 'Uncaught exception in worker process:\n%s', exc_buffer.getvalue()) raise e
Catching Errors in MultiProcessing Pool Map
I have a python code which uses multiprocessing Pool map. I am spawning multiple children from map, each of them reads a separate file, and I collect them in the end. My goal is to have a pandas dataframe in the end that is a concatenation of all the output from the children, with duplicates dropped. I use this dataframe to do more processing (the rest of the code seems unrelated to the question I ask here, so I am omitting that part for brevity). This code runs periodically at the end of the week with new input files to read every time. Sometimes there are errors in the files children read, like null values in integer columns, or missing files, etc.. If any of these errors occur, I want the main script to die, ideally as soon as possible. I do not know how to make this happen in the most efficient way. I have tried, in turn: 1-Making the child die by a raising SystemExit(1) if it encounters an error. I couldn’t make parent die. 2-Making child return an empty value or pandas dataframe in case of an error by try except blocks. I couldn’t detect it properly in the parent. 3-Using map_async with callback functions instead of map. The last one seems to work. However, I am not sure if this is the correct and most efficient way of doing this, as I do not use any output from the error callback function. Any comments and suggestions are appreciated. Edit: Sample input file: a.txt:
shipmentId,processing_time_epoch 4001,1455408024132 4231,1455408024373
shipmentId,processing_time_epoch 5001,1455408024132 4231,1455408024373
shipmentId,processing_time_epoch 4001,1455408024132 4231,1455408024373 5001,1455408024132
import pandas as pd import csv,glob,datetime,sys,pdb,subprocess,multiprocessing,io,os,shlex from itertools import repeat def myerrorcallback(x): print('There seems to be an error in the child. Parent: Please die.') return def mycallback(x): print('Returned successfully.') return def PrintException(): exc_type, exc_obj, tb = sys.exc_info() f = tb.tb_frame lineno = tb.tb_lineno filename = f.f_code.co_filename print('EXCEPTION IN (<>, LINE <> ): <> (<>)'.format(filename, lineno, exc_obj, exc_type)) return # =================================================================== def Read_Processing_Times_v1(full_path_name): try: df = pd.read_csv(full_path_name,dtype=, usecols=['shipmentId','processing_time_epoch']) return df.drop_duplicates() except: print("exception in file "+full_path_name) PrintException() raise(SystemExit(1)) # =================================================================== def Read_Processing_Times_v2(full_path_name): try: df = pd.read_csv(full_path_name,dtype=, usecols=['shipmentId','processing_time_epoch']) return df.drop_duplicates() except: print("exception in file "+full_path_name) PrintException() return pd.DataFrame() # =================================================================== def Read_Processing_Times_v3(full_path_name): df = pd.read_csv(full_path_name,dtype=, usecols=['shipmentId','processing_time_epoch']) return df.drop_duplicates() # =========================================================================================================================== # Top-level if __name__ == '__main__': mycols = ['shipmentId', 'processing_time_epoch'] mydtypes = # The following two files should not give an error: # files_to_read=["a.txt","b.txt"] # The following two files should give an error, as a2.txt does not exist: files_to_read=["a2.txt","b.txt"] # version 1: Works with the correct files. Does not work if one of the children has an error: the child dies, the parent does not and waits forever. # print("version 1") # pool = multiprocessing.Pool(15) # processing_times = pool.map(Read_Processing_Times_v1, files_to_read) # pool.close() # pool.join() # processing_times = pd.concat(processing_times,ignore_index=True).drop_duplicates() # print(processing_times) # version 2: Does not work. Don't know how to fix it. The idea is make child return something, and catch the error in the parent. # print("version 2") # pool = multiprocessing.Pool(15) # processing_times = pool.map(Read_Processing_Times_v2, files_to_read) # pool.close() # pool.join() # if(processing_times.count(pd.DataFrame()) > 0): # print("SLAM times are not read properly.") # raise SystemExit(1) # version 3: print("version 3") pool = multiprocessing.Pool(15) processing_times = pool.map_async(Read_Processing_Times_v3, files_to_read,callback=mycallback,error_callback=myerrorcallback) pool.close() pool.join() processing_times = processing_times.get() processing_times = pd.concat(processing_times,ignore_index=True).drop_duplicates() print("success!") # Do more processing with processing_times after this line.