Update
If you do not want to blow up memory you need 3 things:
- You need to have an iterable that is generating your values being passed to
dummy_func that generates the values incrementally. itertools.product actually generates all the values in memory before yielding the first value, so it will blow up memory regardless of anything else you do.
- You must use a function that processes the iterable one by one and for each result appends the result to the
deque initialized with a suitable non-zero maxlen argument. Your current code is initializing the deque with the complete output of the map function, which will have the length of the passed iterable. This will blow memory.
- Even if you generate values for your worker function,
dummy_func, incrmentally using imap, you could be generating the tasks faster than the results are being generated and therefore the pool's input queue will continue to grow and you will be exploding memory.
To overcome the problem described in 1. I am using the permutations generator function.
To overcome the problem described in 2. I have initialized an empty deque with maxlen=10. As each value is returned from dumy_func I will append it to the deque.
To overcome the problem described in 3. you need to use either the BoundedQueueProcessPool or BoundedQueueThreadPool class. It uses the imap method to submit new tasks with a callback function to process results. It differs from the standard pool functions in that it by default will block the main thread from submitting more tasks once the input queue size reaches the number of processes or threads in the pool as the case may be (you can manually specify the maximum queue size with the max_waiting_tasks argument):
import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps, partial
name = 'bounded_pool'
class ImapResult():
def __init__(self, semaphore, result):
self.semaphore = semaphore
self.it = result.__iter__()
def __iter__(self):
return self
def __next__(self):
try:
elem = self.it.__next__()
self.semaphore.release()
return elem
except StopIteration:
raise
except:
self.semaphore.release()
raise
class BoundedQueuePool:
def __init__(self, semaphore):
self.semaphore = semaphore
def release(self, result, callback=None):
self.semaphore.release()
if callback:
callback(result)
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
self.semaphore.acquire()
callback_fn = self.release if callback is None else partial(self.release, callback=callback)
error_callback_fn = self.release if error_callback is None else partial(self.release, callback=error_callback)
return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)
def imap(self, func, iterable, chunksize=1):
def new_iterable(iterable):
for elem in iterable:
self.semaphore.acquire()
yield elem
result = super().imap(func, new_iterable(iterable), chunksize)
return ImapResult(self.semaphore, result)
def imap_unordered(self, func, iterable, chunksize=1):
def new_iterable(iterable):
for elem in iterable:
self.semaphore.acquire()
yield elem
result = super().imap_unordered(func, new_iterable(iterable), chunksize)
return ImapResult(self.semaphore, result)
class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._processes
elif max_waiting_tasks < 0:
raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
BoundedQueuePool.__init__(self, multiprocessing.BoundedSemaphore(self._processes + max_waiting_tasks))
class BoundedQueueThreadPool(BoundedQueuePool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
multiprocessing.pool.ThreadPool.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._processes
elif max_waiting_tasks < 0:
raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
BoundedQueuePool.__init__(self, threading.BoundedSemaphore(self._processes + max_waiting_tasks))
def threadpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
def processpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
##################################################################
import queue
from itertools import permutations
def dummy_func(values, keys):
#print( dict(zip(keys, values)))
...
return dict(zip(keys, values))
def main():
num_threads = multiprocessing.cpu_count()
parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
'i': ['7p', '16p', '22p'],
'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'],
'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']
}
# A more reasonably sized parameters:
parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
'i': ['7p', '16p', '22p'],
'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
}
keys = list(parameters)
# process simulations for all permutations using single process
#for values in itertools.product(*map(parameters.get, keys)):
# dummy_func(values, keys)
q = queue.deque(maxlen=10)
pool = BoundedQueueThreadPool(num_threads)
for v in pool.imap(partial(dummy_func, keys=keys), permutations(parameters.values(), len(keys))):
q.append(v)
return q
if __name__ == '__main__':
import time
t = time.time()
q = main()
print(q)
print(time.time() - t)