- Multiprocessing Pool Wait For All Tasks To Finish in Python
- Need Wait For All Tasks in the Process Pool
- How to Wait For All Tasks to Finish
- How to Wait For All Tasks in a Batch
- Multiprocessing Pool Life-Cycle in Python
- Multiprocessing Pool Life-Cycle
- Step 1. Create the Process Pool
- Step 2. Submit Tasks to the Process Pool
- Step 2a. Issue Tasks Synchronously
- Step 2b. Issue Tasks Asynchronously
- Step 3. Wait for Tasks to Complete (Optional)
- 3a. Wait on AsyncResult objects to Complete
Multiprocessing Pool Wait For All Tasks To Finish in Python
You can wait for tasks issued to the multiprocessing pool to complete by calling AsyncResult.wait() or calling Pool.join().
In this tutorial you will discover how to wait for tasks to complete in the process pool in Python.
Need Wait For All Tasks in the Process Pool
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using the process pool, we may need to wait for all tasks to complete.
This may be for many reasons, such as:
- Waiting for all tasks to complete before issuing follow-up tasks.
- Waiting for all task results so that they can be combined or used.
- Waiting for all tasks to complete before continuing on with the program.
How can we wait for all tasks to complete in the process pool?
Run your loops using all CPUs, download my FREE book to learn how.
How to Wait For All Tasks to Finish
There are two ways that we can wait for tasks to finish in the multiprocessing.pool.Pool.
- Wait for an asynchronous set of tasks to complete with the wait() function.
- Wait for all issued tasks to complete after shutdown with the join() function.
Let’s take a closer look at each approach.
How to Wait For All Tasks in a Batch
Tasks may be issued asynchronously to the process pool.
This can be achieved using a function such as apply_async(), map_async(), and starmap_async(). These functions return an AsyncResult object.
We can wait for a single batch of tasks issued asynchronously to the process pool to complete by calling the wait() function on the returned AsyncResult object.
Multiprocessing Pool Life-Cycle in Python
The multiprocessing.Pool is a flexible and powerful process pool for executing ad hoc tasks in an asynchronous manner.
In this tutorial, you will discover how to get started using the multiprocessing.Pool quickly in Python.
Multiprocessing Pool Life-Cycle
The multiprocessing.Pool provides a pool of generic worker processes.
It was designed to be easy and straightforward to use.
There are four main steps in the life-cycle of using the multiprocessing.Pool class, they are: create, submit, wait, and shutdown.
- 1. Create: Create the process pool by calling the constructor Pool().
- 2. Submit: Submit tasks synchronously or asynchronously.
- 2a. Submit Tasks Synchronously
- 2b. Submit Tasks Asynchronously
- 3a. Wait on AsyncResult objects to Complete
- 3b. Wait on AsyncResult objects for Result
- 4a. Shutdown Automatically with the Context Manager
The following figure helps to picture the life-cycle of the multiprocessing.Pool class.
Let’s take a closer look at each life-cycle step in turn.
Run your loops using all CPUs, download my FREE book to learn how.
Step 1. Create the Process Pool
First, a multiprocessing.Pool instance must be created.
When an instance of a multiprocessing.Pool is created it may be configured.
The process pool can be configured by specifying arguments to the multiprocessing.Pool class constructor.
The arguments to the constructor are as follows:
- processes: Maximum number of worker processes to use in the pool.
- initializer: Function executed after each worker process is created.
- initargs: Arguments to the worker process initialization function.
- maxtasksperchild: Limit the maximum number of tasks executed by each worker process.
- context: Configure the multiprocessing context such as the process start method.
Perhaps the most important argument is “processes” that specifies the number of worker child processes in the process pool.
By default the multiprocessing.Pool class constructor does not take any arguments.
This will create a process pool that will use a number of worker processes that matches the number of logical CPU cores in your system.
For example, if we had 4 physical CPU cores with hyperthreading, this would mean we would have 8 logical CPU cores and this would be the default number of workers in the process pool.
We can set the “processes” argument to specify the number of child processes to create and use as workers in the process pool.
It is a good idea to test your application in order to determine the number of worker processes that result in the best performance.
For example, for many computationally intensive tasks, you may achieve the best performance by setting the number of processes to be equal to the number of physical CPU cores (before hyperthreading), instead of the logical number of CPU cores (after hyperthreading).
You can learn more about how to configure the number of child worker processes in the tutorial:
You can learn more about how to configure the process pool more generally in the tutorial:
Next, let’s look at how we might issue tasks to the process pool.
Confused by the Pool class API?
Download my FREE PDF cheat sheetStep 2. Submit Tasks to the Process Pool
Once the multiprocessing.Pool has been created, you can submit tasks execution.
As discussed, there are two main approaches for submitting tasks to the process pool, they are:
Let’s take a closer look at each approach in turn.
Step 2a. Issue Tasks Synchronously
Issuing tasks synchronously means that the caller will block until the issued task or tasks have completed.
Blocking calls to the process pool include apply(), map(), and starmap().
We can issue one-off tasks to the process pool using the apply() function.
The apply() function takes the name of the function to execute by a worker process. The call will block until the function is executed by a worker process, after which time it will return.
The process pool provides a parallel version of the built-in map() function for issuing tasks.
The starmap() function is the same as the parallel version of the map() function, except that it allows each function call to take multiple arguments. Specifically, it takes an iterable where each item is an iterable of arguments for the target function.
Step 2b. Issue Tasks Asynchronously
Issuing tasks asynchronously to the process pool means that the caller will not block, allowing the caller to continue on with other work while the tasks are executing.
The non-blocking calls to issue tasks to the process pool return immediately and provide a hook or mechanism to check the status of the tasks and get the results later. The caller can issue tasks and carry on with the program.
Non-blocking calls to the process pool include apply_async(), map_async(), and starmap_async().
The imap() and imap_unordered() are interesting. They return immediately, so they are technically non-blocking calls. The iterable that is returned will yield return values as tasks are completed. This means traversing the iterable will block.
The apply_async(), map_async(), and starmap_async() functions are asynchronous versions of the apply(), map(), and starmap() functions described above.
They all return an AsyncResult object immediately that provides a handle on the issued task or tasks.
The imap() function takes the name of a target function and an iterable like the map() function.
The difference is that the imap() function is more lazy in two ways:
- imap() issues multiple tasks to the process pool one-by-one, instead of all at once like map().
- imap() returns an iterable that yields results one-by-one as tasks are completed, rather than one-by-one after all tasks have completed like map().
The imap_unordered() is the same as imap(), except that the returned iterable will yield return values in the order that tasks are completed (e.g. out of order).
You can learn more about how to issue tasks to the process pool in the tutorial:
Now that we know how to issue tasks to the process pool, let’s take a closer look at waiting for tasks to complete or getting results.
Free Python Multiprocessing Pool Course
Download my Pool API cheat sheet and as a bonus you will get FREE access to my 7-day email course.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Step 3. Wait for Tasks to Complete (Optional)
An AsyncResult object is returned when issuing tasks to multiprocessing.Pool the process pool asynchronously.
This can be achieved via any of the following methods on the process pool:
- Pool.apply_async() to issue one task.
- Pool.map_async() to issue multiple tasks.
- Pool.starmap_async() to issue multiple tasks that take multiple arguments.
A AsyncResult provides a handle on one or more issued tasks.
It allows the caller to check on the status of the issued tasks, to wait for the tasks to complete, and to get the results once tasks are completed.
We do not need to use the returned AsyncResult, such as if issued tasks do not return values and we are not concerned with when the tasks complete or whether they are completed successfully.
That is why this step in the life-cycle is optional.
Nevertheless, there are two main ways we can use an AsyncResult to wait, they are:
Let’s take a closer look at each approach in turn.
3a. Wait on AsyncResult objects to Complete
We can wait for all tasks to complete via the AsyncResult.wait() function.
This will block until all issued tasks are completed.