Python global variables multiprocessing

Alexander Korznikov. A bit of security.

In the beginning, i want to say sorry, if this article will be «messy».

One day i’ve noticed, that threading module in python does not working as should be.
Some times it was much slower than in sequential process. Then i learned about GIL (Global Interpreter Lock).

My teacher advised me to use Multiprocessing module.
Fine. It is very simple, just copy/replace:

threading >> multiprocessing
Thread >> Process

#Threading Example from threading import Thread #defining a global variable mylist = [] def somefunc(a): global mylist mylist.append(a) def main() for i in range(100): t = Thread(target=somefunc,args=(i,)) t.start() t.join()
#Multiprocessing Example from multiprocessing import Process #defining a global variable mylist = [] def somefunc(a): global mylist mylist.append(a) def main() for i in range(100): t = Process(target=somefunc,args=(i,)) t.start() t.join()
from multiprocessing import Process,Manager mylist = Manager.list() def somefunc(a): mylist.append(a) def main() for i in range(100): t = Process(target=somefunc,args=(i,)) t.start() t.join()

In one hand, this will help, but in another you will get headache. Because, if you add for example KeyboardInterrupt (^C) support, you will get nothing. Manager object will be empty. OK. Maybe my knowledge is not so good, but i’ve found another solution to manage variables: Callback function.

But before that, let’s add some process control. I want to control how many processes running simultaneously:

from multiprocessing import Pool,cpu_count,active_children mylist = Manager.list() def somefunc(a): mylist.append(a) def main() #creating pool of worker processes, for 4 Cores will be 40 processes. pool = Pool(processes=cpu_count()*10) for i in range(100): #start processes asynchronous, without waiting until process ends. pool.apply_async(somefunc, (i,)) pool.close() #waiting for results of ALL processes while len(active_children()) > 1: sleep(0.5) pool.join()

In this example, there will be no more than 40 processes running at the same time.

Читайте также:  Форма ответа на javascript

Now, will add the Callback function:

from multiprocessing import Pool,cpu_count,active_children mylist = [] def somefunc(a): a += 1 return a def main() def cb(data): if data: global mylist mylist.append(data) pool = Pool(processes=cpu_count()*10) for i in range(100): pool.apply_async(somefunc, (i,), callback=cb) pool.close() while len(active_children()) > 1: sleep(0.5) pool.join()

Every process will return some data to main process, then will be called callback function, that will manipulate with the data.
For me, callback function is much more easy for use and understand.

Next time will try to tell about successful implementation of KeyboardInterrupt ^C into multiprocessing script. It’s another issue.

Источник

Multiprocessing Pool Share Global Variable With All Workers

You can share a global variable with all child workers processes in the multiprocessing pool by defining it in the worker process initialization function.

In this tutorial you will discover how to share global variables with all workers in the Python process pool.

Need To Share Global Variable With All Workers in Process Pool

The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.

A process pool can be configured when it is created, which will prepare the child workers.

A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.

— multiprocessing — Process-based parallelism

We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().

Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().

When using the process pool, we may need to share a global variable with all child worker processes in the process pool.

This would allow all tasks executed in the process pool to use the shared global variable.

We may need this capability for many reasons, such as:

  • Allow all tasks to use a shared log.
  • Allow all tasks to use a shared queue or pipe.
  • Allow all tasks to use a shared synchronization primitive like a lock, semaphore, or event.

The process pool does not provide this capability.

How can we share a global variable with all child worker processes?

How can a shared global variable be accessed by all tasks executed by the process pool in Python?

Run your loops using all CPUs, download my FREE book to learn how.

How to Share a Global Variable With All Workers

We can share a global variable with all child process workers in the process pool.

This can be achieved by configuring the process pool to initialize each worker process using a custom function.

The global variable data required by each child worker process can be passed as an argument to the initialization function. It can then be stored in a global variable. This will make it available to each child worker process.

Recall, declaring a variable “global” in a function will define a global variable for the process, rather than a local variable for the function.

You may also recall that the worker initialization function is executed by the main thread of each new worker process. Therefore, a global variable defined in the initialization function will be available to the process later.

Because each child worker process in the process pool will be initialized using the same function, the global variable or (or variables) will be accessible by all child worker processes in the process pool.

This means that any tasks executed in the process pool can access the global variable, such as custom functions executed as tasks in the process pool.

You can learn more about configuring the child worker process initialization function in the tutorial:

Now that we know how to share global variables with all worker processes, let’s look at a worked example.

Confused by the Pool class API?
Download my FREE PDF cheat sheet

Example of Sharing a Global Variable With All Workers

We can explore how to share a global variable with all child worker processes.

In this example, we will define a shared multiprocessing queue. We will then share this queue with each child worker process via its initialization function. Each child worker will store reference to the queue in a global variable so that all tasks executed by each worker can access it. We will then execute tasks in the process pool that put task results into the shared queue. The main process will then read results as they become available via the shared queue.

Firstly, we need to define the custom function used to initialize the child worker processes.

The initialization function must take the shared queue as an argument. It will then declare a new global variable for the child process and store a reference to the shared queue in the global variable.

The init_worker() function below implements this.

Next, we can define a custom task function to execute in the process pool.

The task function will take an integer identifier as a number as an argument. It will then generate a number between 0 and 5, then block for that many seconds to simulate a variable amount of computational effort. Finally, it will send the generated number and integer identifier as a tuple into the shared queue.

The task() function below implements this.

Note that we explicitly define the scope of the queue global variable. This is technically not required, but I believe it helps make the code more readable.

Next, in the main process we can first create the shared multiprocessing queue. We will use a multiprocessing.SimpleQueue in this case.

You can learn more about the multiprocessing.SimpleQueue in the tutorial:

Next, we can create and configure the process pool.

In this case, we will configure it so that each worker process is initialized using our init_worker() custom initialization function and pass the shared queue as an argument.

We will use the context manager interface so that the process pool is closed for us automatically once we are finished with it.

You can learn more about the context manager interface in the tutorial:

Next, we will issue 10 calls to our custom task function asynchronously using the map_async() function.

We will then consume the results of the tasks as they are available (e.g. simulating the imap_unordered() function). This can be achieved by iterating over the expected number of results and calling get() ob the shared queue for each task result.

Tying this together, the complete example is listed below.

Running the example first creates the shared queue.

Next, the process pool is created and configured to use the custom initialization function.

Each worker process is created and started then initialized with the custom initialization function. Each worker creates a new global variable named “queue” and stores the passed in shared queue against the global variable. This makes “queue” available to all tasks executed by the worker process, and all worker processes are initialized the same way.

Next, 10 tasks are issued into the process pool.

The main process then iterates the 10 results, calling get() on the queue which will block and not return until a result is available.

Each task first generates a random number between 0 and 5, then blocks for that many seconds to simulate computational effort. The “queue” global variable for the process is declared explicitly, then accessed. The result for the task is put on the queue and the task completes.

Results are reported in the main process as they become available.

After all 10 results are retrieved from the shared queue, the main process continues on, automatically closing the process pool and then closing the application.

Note, the specific results will differ each time the program is run due to the use of random numbers.

Источник

Sharing Global Variables in Python Using Multiprocessing

While I was using multiprocessing, I found out that global variables are not shared between processes.

ruanbekker-cheatsheets

Example of the Issue

Let me first provide an example of the issue that I was facing.

I have 2 input lists, which 2 processes wil read from and append them to the final list and print the aggregated list to stdout

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 
import multiprocessing final_list = []  input_list_one = ['one', 'two', 'three', 'four', 'five'] input_list_two = ['six', 'seven', 'eight', 'nine', 'ten']  def worker(data):  for item in data:  final_list.append(item)  process1 = multiprocessing.Process(target=worker, args=[final_list_one]) process2 = multiprocessing.Process(target=worker, args=[final_list_two])  process1.start() process2.start() process1.join() process2.join()  print(final_list) 

As you can see the response from the list is still empty.

Resolution

From Python’s Documentation:

“The multiprocessing.Manager returns a started SyncManager object which can be used for sharing objects between processes. The returned manager object corresponds to a spawned child process and has methods which will create shared objects and return corresponding proxies.”

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 
import multiprocessing manager = multiprocessing.Manager() final_list = manager.list()  input_list_one = ['one', 'two', 'three', 'four', 'five'] input_list_two = ['six', 'seven', 'eight', 'nine', 'ten']  def worker(data):  for item in data:  final_list.append(item)  process1 = multiprocessing.Process(target=worker, args=[final_list_one]) process2 = multiprocessing.Process(target=worker, args=[final_list_two])  process1.start() process2.start() process1.join() process2.join()  print(final_list) 

Now when we run our script, we can see that our processes are aware of our defined list:

$ python3 mp_list.py ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten'] 

Thank You

Please feel free to show support by, sharing this post, making a donation, subscribing or reach out to me if you want me to demo and write up on any specific tech topic.

Posted by Ruan Feb 19 th , 2019 9:02 am multiprocessing, python

Ruan Bekker

My name is Ruan, I’m a DevOps Engineer from South Africa. I’m passionate about AWS, OpenSource, Observability, Containers, Linux, Automation and sharing my findings with the world. More info about me on my website, ruan.dev.
Follow @ruanbekker

Источник

Оцените статью