95

I have the Python code:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    for i in range(0, MAX_PROCESSES):
        p = Process(target=f, args=(i,))
        p.start()

which runs well. However, MAX_PROCESSES is variable and can be any value between 1 and 512. Since I'm only running this code on a machine with 8 cores, I need to find out if it is possible to limit the number of processes allowed to run at the same time. I've looked into multiprocessing.Queue, but it doesn't look like what I need - or perhaps I'm interpreting the docs incorrectly.

Is there a way to limit the number of simultaneous multiprocessing.Processs running?

martineau
  • 112,593
  • 23
  • 157
  • 280
Brett
  • 11,575
  • 29
  • 122
  • 205
  • for i in range(0, min(MAX_PROCESSES, 8)): – Jacob Jan 02 '14 at 15:55
  • @Jacob I still want all the MAX_PROCESSES to run though. The code above is truncated for simplicity, but the main function is called up to 512 times (hence the loop). So I'm wondering if there is a way to queue processes. – Brett Jan 02 '14 at 15:58
  • 2
    so you want a master/worker setup, and you want to limit the number of workers? – Jacob Jan 02 '14 at 16:01
  • @Jacob Yes, that might be a better way of phrasing it. – Brett Jan 02 '14 at 16:02

3 Answers3

125

It might be most sensible to use multiprocessing.Pool which produces a pool of worker processes based on the max number of cores available on your system, and then basically feeds tasks in as the cores become available.

The example from the standard docs (http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers) shows that you can also manually set the number of cores:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

And it's also handy to know that there is the multiprocessing.cpu_count() method to count the number of cores on a given system, if needed in your code.

Edit: Here's some draft code that seems to work for your specific case:

import multiprocessing

def f(name):
    print 'hello', name

if __name__ == '__main__':
    pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument
    for i in xrange(0, 512):
        pool.apply_async(f, args=(i,))
    pool.close()
    pool.join()
treddy
  • 2,621
  • 2
  • 17
  • 29
  • Alright, I've drafted a version that seems to work just fine for your specific case & added to the post above. – treddy Jan 02 '14 at 16:31
  • 62
    `multiprocessing.cpu_count()-1 or 1` can be a useful heuristic for deciding how many processes to run in parallel: the -1 avoids locking up the system by monopolising all cores, but if there is only one CPU available then the `or` gives a graceful fallback to single-core running. – andybuckley Dec 10 '14 at 16:28
  • What if my function has io heavy work and little processing? Is using 10 threads on the a 4 core machine, going to affect the program in any way? – Abhidemon Apr 08 '16 at 13:01
  • 6
    note that `multiprocessing.cpu_count()` is not the number of cores, but the number of threads (in the sense of hyperthreading). – Grismar Feb 27 '19 at 00:16
  • 1
    I was able to reduce heavy back-end processing time on nightly scheduled tasks in my app from ~20 mins to ~8 minutes using what you outlined above. Thanks @treddy ! – Fergus Mar 25 '19 at 19:30
  • great solution. It helped me to reduce 90 minutes process to 5 minutes. – user0204 Feb 02 '20 at 12:43
  • why map in your first example but apply_async in your second ? – Areza Feb 07 '20 at 10:41
  • Quick note on the number of CPUs. The documentation reads as follows: multiprocessing.cpu_count() Return the number of CPUs in the system. This number is not equivalent to the number of CPUs the current process can use. The number of usable CPUs can be obtained with len(os.sched_getaffinity(0)) You can obtain an integer number of CPU's using the os library: os.cpu_count() Appears a few updates have occurred since the discussion above, which was quite enlightening! Happy Coding! – RandallShanePhD Apr 16 '20 at 18:19
  • How can i get the process number in the function? so that i can see the output for every single process number – DevPy Jul 14 '20 at 11:25
18

I think Semaphore is what you are looking for, it will block the main process after counting down to 0. Sample code:

from multiprocessing import Process
from multiprocessing import Semaphore
import time

def f(name, sema):
    print('process {} starting doing business'.format(name))
    # simulate a time-consuming task by sleeping
    time.sleep(5)
    # `release` will add 1 to `sema`, allowing other 
    # processes blocked on it to continue
    sema.release()

if __name__ == '__main__':
    concurrency = 20
    total_task_num = 1000
    sema = Semaphore(concurrency)
    all_processes = []
    for i in range(total_task_num):
        # once 20 processes are running, the following `acquire` call
        # will block the main process since `sema` has been reduced
        # to 0. This loop will continue only after one or more 
        # previously created processes complete.
        sema.acquire()
        p = Process(target=f, args=(i, sema))
        all_processes.append(p)
        p.start()

    # inside main process, wait for all processes to finish
    for p in all_processes:
        p.join()

The following code is more structured since it acquires and releases sema in the same function. However, it will consume too much resources if total_task_num is very large:

from multiprocessing import Process
from multiprocessing import Semaphore
import time

def f(name, sema):
    print('process {} starting doing business'.format(name))
    # `sema` is acquired and released in the same
    # block of code here, making code more readable,
    # but may lead to problem.
    sema.acquire()
    time.sleep(5)
    sema.release()

if __name__ == '__main__':
    concurrency = 20
    total_task_num = 1000
    sema = Semaphore(concurrency)
    all_processes = []
    for i in range(total_task_num):
        p = Process(target=f, args=(i, sema))
        all_processes.append(p)
        # the following line won't block after 20 processes
        # have been created and running, instead it will carry 
        # on until all 1000 processes are created.
        p.start()

    # inside main process, wait for all processes to finish
    for p in all_processes:
        p.join()

The above code will create total_task_num processes but only concurrency processes will be running while other processes are blocked, consuming precious system resources.

makiko_fly
  • 451
  • 5
  • 8
  • This is great! Also solves issue with PicklingError for things python Can't pickle – YotamW Constantini Feb 18 '20 at 15:54
  • I am not sure if this is something I am doing incorrectly, but my sema.release() is never occurring when using the first block of code with the release in the function f but acquire in the main. Anyone ever have that problem? Silly mistake? – user1983682 Oct 30 '20 at 21:04
  • 1
    I get an error `OSError: [Errno 24] Too many open files` mid-way starting the processes! am i doing something wrong ? – Ansh David Apr 06 '21 at 23:20
  • this is perfect for a serverless environment, especially in AWS lambda – Ubaid Qureshi Aug 10 '21 at 21:14
5

more generally, this could also look like this:

import multiprocessing
def chunks(l, n):
    for i in range(0, len(l), n):
        yield l[i:i + n]

numberOfThreads = 4


if __name__ == '__main__':
    jobs = []
    for i, param in enumerate(params):
        p = multiprocessing.Process(target=f, args=(i,param))
        jobs.append(p)
    for i in chunks(jobs,numberOfThreads):
        for j in i:
            j.start()
        for j in i:
            j.join()

Of course, that way is quite cruel (since it waits for every process in a junk until it continues with the next chunk). Still it works well for approx equal run times of the function calls.

Baedsch
  • 501
  • 4
  • 12