Python process shared queue

Sharing many queues among processes in Python

I am aware of multiprocessing.Manager() and how it can be used to create shared objects, in particular queues which can be shared between workers. There is this question, this question, this question and even one of my own questions. However, I need to define a great many queues, each of which is linking a specific pair of processes. Say that each pair of processes and its linking queue is identified by the variable key . I want to use a dictionary to access my queues when I need to put and get data. I cannot make this work. I’ve tried a number of things. With multiprocessing imported as mp : Defining a dict like for key in all_keys: DICTPython process shared queue = mp.Queue in a config file which is imported by the multiprocessing module (call it multi.py ) does not return errors, but the queue DICTPython process shared queue is not shared between the processes, each one seems to have their own copy of the queue and thus no communication happens. If I try to define the DICT at the beginning of the main multiprocessing function that defines the processes and starts them, like

DICT = mp.Manager().dict() for key in all_keys: DICTPython process shared queue = mp.Queue() 
RuntimeError: Queue objects should only be shared between processes through inheritance 
DICT = mp.Manager().dict() for key in all_keys: DICTPython process shared queue = mp.Manager().Queue() 

only makes everything worse. Trying similar definitions at the head of multi.py rather than inside the main function returns similar errors. There must be a way to share many queues between processes without explicitly naming each one in the code. Any ideas? Edit Here is a basic schema of the program: 1- load the first module, which defines some variables, imports multi , launches multi.main() , and loads another module which starts a cascade of module loads and code execution. Meanwhile. 2- multi.main looks like this:

def main(): manager = mp.Manager() pool = mp.Pool() DICT2 = manager.dict() for key in all_keys: DICT2Python process shared queue = manager.Queue() proc_1 = pool.apply_async(targ1,(DICT1Python process shared queue,) ) #DICT1 is defined in the config file proc_2 = pool.apply_async(targ2,(DICT2Python process shared queue, otherargs,) 
mp.Process(target=targ1, args=(DICTPython process shared queue,)) 

3 — The function targ1 takes input data that is coming in (sorted by key ) from the main process. It is meant to pass the result to DICTPython process shared queue so targ2 can do its work. This is the part that is not working. There are an arbitrary number of targ1 s, targ2 s, etc. and therefore an arbitrary number of queues. 4 — The results of some of these processes will be sent to a bunch of different arrays / pandas dataframes which are also indexed by key , and which I would like to be accessible from arbitrary processes, even ones launched in a different module. I have yet to write this part and it might be a different question. (I mention it here because the answer to 3 above might also solve 4 nicely.)

Читайте также:  Syntax for color in html

How are you launching your child processes? Can you instantiate the Queue before launching the processes? Are the pairs of processes you’re talking about potentially two child processes, or is it always a parent-child relationship?

Hey man, I’ve added an edit that describes my program in brief outline. I am instantiating Queue before launching the processes. I am not sure how to distinguish a child from parent process so can’t answer your last question.

Why is DICT2 a manager.dict() in your code above? It doesn’t look like you actually try to pass the DICT2 object to any children. Couldn’t it just be a regular dict containing mp.Manager().Queue() instances?

I tried DICT2=<> earlier (not only in multi.main() but everywhere I could think might work), I get the same RunTime error. What do you mean ‘pass the DICT2 object to children’? Isn’t that what I’m doing by making it an argument of targ2 ?

Hey, so I think I made some progress, inspired by your second comment. DICT2Python process shared queue is meant to be read by targ2 , and written into by a function that targ1 was calling. But when I put DICT2Python process shared queue into targ1 as an argument (beside DICT1Python process shared queue ), the procedure worked and targ2 was able to read data from the queue DICT2Python process shared queue . Does this accord with your experience and make sense? Is it what you meant by parent/children processes ( proc_1 is the parent of proc_2 )?

1 Answer 1

It sounds like your issues started when you tried to share a multiprocessing.Queue() by passing it as an argument. You can get around this by creating a managed queue instead:

import multiprocessing manager = multiprocessing.Manager() passable_queue = manager.Queue() 

When you use a manager to create it, you are storing and passing around a proxy to the queue, rather than the queue itself, so even when the object you pass to your worker processes is a copied, it will still point at the same underlying data structure: your queue. It’s very similar (in concept) to pointers in C/C++. If you create your queues this way, you will be able to pass them when you launch a worker process.

Since you can pass queues around now, you no longer need your dictionary to be managed. Keep a normal dictionary in main that will store all the mappings, and only give your worker processes the queues they need, so they won’t need access to any mappings.

I’ve written an example of this here. It looks like you are passing objects between your workers, so that’s what’s done here. Imagine we have two stages of processing, and the data both starts and ends in the control of main . Look at how we can create the queues that connect the workers like a pipeline, but by giving them only they queues they need, there’s no need for them to know about any mappings:

import multiprocessing as mp def stage1(q_in, q_out): q_out.put(q_in.get()+"Stage 1 did some work.\n") return def stage2(q_in, q_out): q_out.put(q_in.get()+"Stage 2 did some work.\n") return def main(): pool = mp.Pool() manager = mp.Manager() # create managed queues q_main_to_s1 = manager.Queue() q_s1_to_s2 = manager.Queue() q_s2_to_main = manager.Queue() # launch workers, passing them the queues they need results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2)) results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main)) # Send a message into the pipeline q_main_to_s1.put("Main started the job.\n") # Wait for work to complete print(q_s2_to_main.get()+"Main finished the job.") pool.close() pool.join() return if __name__ == "__main__": main() 

The code produces this output:

Main started the job.
Stage 1 did some work.
Stage 2 did some work.
Main finished the job.

I didn’t include an example of storing the queues or AsyncResults objects in dictionaries, because I still don’t quite understand how your program is supposed to work. But now that you can pass your queues freely, you can build your dictionary to store the queue/process mappings as needed.

In fact, if you really do build a pipeline between multiple workers, you don’t even need to keep a reference to the «inter-worker» queues in main . Create the queues, pass them to your workers, then only retain references to queues that main will use. I would definitely recommend trying to let old queues be garbage collected as quickly as possible if you really do have «an arbitrary number» of queues.

Источник

Multiprocessing Manager Share Queue in Python

You can use a Manager to create a hosted Queue object and share it via proxy objects with multiple child processes.

In this tutorial you will discover how to share a queue using a manager in Python.

Need Manager to Share a Queue

A manager in the multiprocessing module provides a way to create Python objects that can be shared easily between processes.

Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.

— multiprocessing — Process-based parallelism

A manager creates a server process that hosts a centralized version of objects that can be shared among multiple processes.

The objects are not shared directly. Instead, the manager creates a proxy object for each object that it manages and the proxy objects are shared among processes.

The proxy objects are used and operate just like the original objects, except that they serialize data, synchronize and coordinate with the centralized version of the object hosted in the manager server process.

A proxy is an object which refers to a shared object which lives (presumably) in a different process. […] A proxy object has methods which invoke corresponding methods of its referent (although not every method of the referent will necessarily be available through the proxy).

— multiprocessing — Process-based parallelism

This makes managers a process-safe and preferred way to share Python objects among processes.

You can learn more about multiprocessing managers in the tutorial:

When using multiprocessing, we may need to share a Queue with child processes.

This may be for many reasons, such as:

  • Child processes need to put results to the queue.
  • Child processes need to get results from the queue.

We may not be able to share the Queue directly with the child processes.

This may be because a queue object cannot be pickled, and we may have to pickle objects when sending them to a child process such as via a Pipe, via another Queue or as an argument to a task to the Pool class.

How can we use a multiprocessing Manager to share a Queue with child processes?

Run your loops using all CPUs, download my FREE book to learn how.

Manager for Sharing a Sharing Queue

A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get().

The multiprocessing.Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added. The first items added to the queue will be the first items retrieved. This is opposed to other queue types such as last-in, first-out and priority queues.

You can learn more about how to use the process-safe Queue in the tutorial:

Managers are provided via the multiprocessing.Manager class, which creates and returns a multiprocessing.managers.SyncManager instance.

The SyncManager allows a suite of Python objects to be created and managed by default, including a Queue.

Specifically the SyncManager provides two registered Queue objects, they are:

As such, we can create a Manager instance and use it directly to create a Queue object.

This will create a hosted version of the Queue in the Manager‘s server process and return proxy objects for the Queue that can be pickled and shared with child processes.

Источник

Оцените статью