113

The documentation for the multiprocessing module shows how to pass a queue to a process started with multiprocessing.Process. But how can I share a queue with asynchronous worker processes started with apply_async? I don't need dynamic joining or anything else, just a way for the workers to (repeatedly) report their results back to base.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    q = multiprocessing.Queue()
    workers = pool.apply_async(worker, (33, q))

This fails with: RuntimeError: Queue objects should only be shared between processes through inheritance. I understand what this means, and I understand the advice to inherit rather than require pickling/unpickling (and all the special Windows restrictions). But how do I pass the queue in a way that works? I can't find an example, and I've tried several alternatives that failed in various ways. Help please?

Darkonaut
  • 17,692
  • 6
  • 45
  • 56
alexis
  • 46,350
  • 14
  • 97
  • 153

2 Answers2

154

Try using multiprocessing.Manager to manage your queue and to also make it accessible to different workers.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue()
    workers = pool.apply_async(worker, (33, q))
alexis
  • 46,350
  • 14
  • 97
  • 153
enderskill
  • 6,748
  • 3
  • 23
  • 23
  • That did it, thanks! There was an unrelated problem with the async call in my original code, so I copied the fix to your answer too. – alexis Mar 30 '12 at 21:15
  • 24
    Any explanation why `queue.Queue()` is not suitable for this? – mrgloom Jul 08 '19 at 17:31
  • 5
    @mrgloom: `queue.Queue` was built for threading, using in-memory locks. In a Multiprocess environment, each subprocess would get it's own copy of a `queue.Queue()` instance in their own memory space, since subprocesses don't share memory (mostly). – LeoRochael May 13 '20 at 14:26
  • 1
    @alexis How to get the elements from the Manager().Queue() after multiple workers have inserted data into it? – MSS Jul 21 '20 at 13:48
  • [Multiprocessing.Queue.get()](https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Queue.get) – alexis Sep 24 '20 at 16:08
  • @LeoRochael, this does not explain, why those `queue.Queue` is still usable with multiprocess env, when "using inheritance", I mean Process and not Pool – g.pickardou Jun 02 '22 at 06:31
21

multiprocessing.Pool already has a shared result-queue, there is no need to additionally involve a Manager.Queue. Manager.Queue is a queue.Queue (multithreading-queue) under the hood, located on a separate server-process and exposed via proxies. This adds additional overhead compared to Pool's internal queue. Contrary to relying on Pool's native result-handling, the results in the Manager.Queue also are not guaranteed to be ordered.

The worker processes are not started with .apply_async(), this already happens when you instantiate Pool. What is started when you call pool.apply_async() is a new "job". Pool's worker-processes run the multiprocessing.pool.worker-function under the hood. This function takes care of processing new "tasks" transferred over Pool's internal Pool._inqueue and of sending results back to the parent over the Pool._outqueue. Your specified func will be executed within multiprocessing.pool.worker. func only has to return something and the result will be automatically send back to the parent.

.apply_async() immediately (asynchronously) returns a AsyncResult object (alias for ApplyResult). You need to call .get() (is blocking) on that object to receive the actual result. Another option would be to register a callback function, which gets fired as soon as the result becomes ready.

from multiprocessing import Pool

def busy_foo(i):
    """Dummy function simulating cpu-bound work."""
    for _ in range(int(10e6)):  # do stuff
        pass
    return i

if __name__ == '__main__':

    with Pool(4) as pool:
        print(pool._outqueue)  # DEMO
        results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
        # `.apply_async()` immediately returns AsyncResult (ApplyResult) object
        print(results[0])  # DEMO
        results = [res.get() for res in results]
        print(f'result: {results}')       

Example Output:

<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Note: Specifying the timeout-parameter for .get() will not stop the actual processing of the task within the worker, it only unblocks the waiting parent by raising a multiprocessing.TimeoutError.

Darkonaut
  • 17,692
  • 6
  • 45
  • 56
  • Interesting, I'll try it out first chance I get. It certainly didn't work this way in 2012. – alexis Apr 08 '19 at 18:12
  • @alexis Python 2.7 (2010) relevantly here is only missing the context manager and the `error_callback`-parameter for `apply_async`, so it didn't change much since. – Darkonaut Apr 08 '19 at 18:26
  • I found the callback function to be the most useful, especially when combined with a partial function to allow for using a regular list to collect async results as described here; https://gist.github.com/Glench/5789879 – user5359531 Oct 06 '20 at 00:32
  • This doesn't work if you want a stream of data from a child worker, not a single result. – user48956 May 24 '22 at 21:46