0

I have a dictionary of 15 variables, each with 3 values, for which I need to generate a product of all possible combinations (3**15 = 14.3M combinations). I'm using multi-threading with a 12 core processor to process the combinations (likely jumping to 64 cores).

I'm using itertools.product to generate the different combinations, and ThreadPool with imap_unordered to run the multiprocessing. Additionally, I'm using deque to remove the result as soon as it's available. However, I'm finding that the memory consumption is blowing up to about 2.5GB. I understand that the itertools.product is an iterable and therefore should not be storing much data in memory, but that doesn't seem to be the case.

Below is my code, and I'm wondering if anyone can help me figure out how I can better optimize the memory utilization.

Additionally, I'm wondering how the chunk size in the imap_unordered plays a role in memory efficiency. I was trying different numbers to see how it effects memory usage (including 10, 100, 1000, 10000) but it doesn't seem to impact much other than stabilizing the memory utilization at around 2.5GB. If I don't include the chunk size, memory tends to blow up >5GB.

I also tried changing the number of threads from 12 to 1, and that also did not impact the memory usage. However, using the single-processor implementation (commented out below) reduces the memory usage to only ~30MB.

import numpy as np
import itertools
import multiprocessing
import queue
import functools
from multiprocessing import pool, dummy

def dummy_func(values, keys):
    print( dict(zip(keys, values)) )
    return

def main():
    num_threads = multiprocessing.cpu_count()

    parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'], 
                  'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'], 
                  'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'], 
                  'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'], 
                  'i': ['7p', '16p', '22p'], 
                  'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'], 
                  'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'], 
                  'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']}
    keys = list(parameters)

    # process simulations for all permutations using single process
    #for values in itertools.product(*map(parameters.get, keys)):
    #    dummy_func(values, keys)

    # process simulations for all permutations using multi-threading
    with multiprocessing.pool.ThreadPool(num_threads) as workers:
        queue.deque(workers.imap_unordered(functools.partial(dummy_func, keys=keys), 
                                           itertools.product(*map(parameters.get, keys)), 100))
    return

if __name__ == "__main__":
    main()
arnt
  • 8,046
  • 5
  • 22
  • 32
David
  • 31
  • 2
  • Why are you materializing all the results in a deque, anyway? That creates the memory problem you're trying to avoid. It sounds like you may have misunderstood the concept of a deque. – user2357112 Aug 30 '21 at 01:37
  • 1
    Maybe you saw `collections.deque(iterator, maxlen=0)` used to consume an iterator somewhere and didn't realize the role of the `maxlen` argument. – user2357112 Aug 30 '21 at 01:39
  • Here's some info on what [chunksize](https://stackoverflow.com/questions/3822512/chunksize-parameter-in-multiprocessing-pool-map) does. – martineau Aug 30 '21 at 01:44
  • Your worker function, `dummy_func`, is returning `None` and that is what you are adding to your deque over and over again and it will consequently grow very large. What is the point of this? Anyway, try specifying a reasonable *maxlen* argument to the deque constructor to keep the size limited to `maxlen` elements. – Booboo Aug 30 '21 at 10:38
  • @martineau `chunksize` effects speed rather than space. – Booboo Aug 30 '21 at 10:58
  • @Booboo: I never said what it did. The OP was wondering about it so I provided a link to something that describes what it does. Personally I *think* it might affect space because it determines the approximate size of the pieces that the iterable will be split up into and submitted to each task. Bigger pieces → more memory. – martineau Aug 30 '21 at 11:30
  • `imap_unordered` saves its results to a deque internally as it goes, and does not wait for them to be consumed before computing more. Calling `queue.deque` similarly tries to collect all the results at once making the call equivalent to `map` anyway. These results may be just `None`, but if you have millions of pointers to `None`, it could still add up. – Aaron Aug 30 '21 at 18:49
  • Thanks to everyone for the feedback. I've tried these suggestions but to no avail. Below I've tried to consume the iterables while also filtering None and limiting the dequeue size, but I'm still seeing memory usage blow up. Any recommendations? `with multiprocessing.pool.ThreadPool(num_threads) as workers: collections.deque(filter(None, workers.imap_unordered(functools.partial(process_simulation, keys=keys), itertools.product(*map(parameters.get, keys)), chunksize=num_threads)), maxlen=0)` – David Aug 31 '21 at 02:15

1 Answers1

0

Update

If you do not want to blow up memory you need 3 things:

  1. You need to have an iterable that is generating your values being passed to dummy_func that generates the values incrementally. itertools.product actually generates all the values in memory before yielding the first value, so it will blow up memory regardless of anything else you do.
  2. You must use a function that processes the iterable one by one and for each result appends the result to the deque initialized with a suitable non-zero maxlen argument. Your current code is initializing the deque with the complete output of the map function, which will have the length of the passed iterable. This will blow memory.
  3. Even if you generate values for your worker function, dummy_func, incrmentally using imap, you could be generating the tasks faster than the results are being generated and therefore the pool's input queue will continue to grow and you will be exploding memory.

To overcome the problem described in 1. I am using the permutations generator function.

To overcome the problem described in 2. I have initialized an empty deque with maxlen=10. As each value is returned from dumy_func I will append it to the deque.

To overcome the problem described in 3. you need to use either the BoundedQueueProcessPool or BoundedQueueThreadPool class. It uses the imap method to submit new tasks with a callback function to process results. It differs from the standard pool functions in that it by default will block the main thread from submitting more tasks once the input queue size reaches the number of processes or threads in the pool as the case may be (you can manually specify the maximum queue size with the max_waiting_tasks argument):

import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps, partial

name = 'bounded_pool'

class ImapResult():
    def __init__(self, semaphore, result):
        self.semaphore = semaphore
        self.it = result.__iter__()

    def __iter__(self):
        return self

    def __next__(self):
        try:
            elem = self.it.__next__()
            self.semaphore.release()
            return elem
        except StopIteration:
            raise
        except:
            self.semaphore.release()
            raise

class BoundedQueuePool:
    def __init__(self, semaphore):
        self.semaphore = semaphore

    def release(self, result, callback=None):
        self.semaphore.release()
        if callback:
            callback(result)

    def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
        self.semaphore.acquire()
        callback_fn = self.release if callback is None else partial(self.release, callback=callback)
        error_callback_fn = self.release if error_callback is None else partial(self.release, callback=error_callback)
        return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)

    def imap(self, func, iterable, chunksize=1):
        def new_iterable(iterable):
            for elem in iterable:
                self.semaphore.acquire()
                yield elem
        result = super().imap(func, new_iterable(iterable), chunksize)
        return ImapResult(self.semaphore, result)

    def imap_unordered(self, func, iterable, chunksize=1):
        def new_iterable(iterable):
            for elem in iterable:
                self.semaphore.acquire()
                yield elem
        result = super().imap_unordered(func, new_iterable(iterable), chunksize)
        return ImapResult(self.semaphore, result)

class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._processes
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
        BoundedQueuePool.__init__(self, multiprocessing.BoundedSemaphore(self._processes + max_waiting_tasks))

class BoundedQueueThreadPool(BoundedQueuePool, multiprocessing.pool.ThreadPool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.ThreadPool.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._processes
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
        BoundedQueuePool.__init__(self, threading.BoundedSemaphore(self._processes + max_waiting_tasks))

def threadpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

def processpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

##################################################################

import queue
from itertools import permutations

def dummy_func(values, keys):
    #print( dict(zip(keys, values)))
    ...
    return dict(zip(keys, values))

def main():
    num_threads = multiprocessing.cpu_count()

    parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
                  'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
                  'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
                  'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
                  'i': ['7p', '16p', '22p'],
                  'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
                  'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'],
                  'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']
                  }

    # A more reasonably sized parameters:
    parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
                  'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
                  'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
                  'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
                  'i': ['7p', '16p', '22p'],
                  'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
                  }


    keys = list(parameters)

    # process simulations for all permutations using single process
    #for values in itertools.product(*map(parameters.get, keys)):
    #    dummy_func(values, keys)

    q = queue.deque(maxlen=10)

    pool = BoundedQueueThreadPool(num_threads)
    for v in pool.imap(partial(dummy_func, keys=keys), permutations(parameters.values(), len(keys))):
        q.append(v)
    return q

if __name__ == '__main__':
    import time
    t = time.time()
    q = main()
    print(q)
    print(time.time() - t)
Booboo
  • 29,245
  • 3
  • 32
  • 48
  • thanks for the detailed response. However, to be clear, the intention of the dummy _func is to perform some arbitrary task on the data. The intention is not to return or retain the data to print later. That would require the data to be stored. I've also tried various maxlen settings for dequeue (0,1,10,100,1000) but those did not affect memory or performance. – David Sep 01 '21 at 15:42
  • Your original function implicitly returned `None` (I am not sure why you even had the unnecessary `return` statement at all for that). Then tell me what the purpose was of blowing up memory by having a deque that seemingly (to me) stored semantically meaningless values. I have been giving you the benefit of the doubt that there has been a purpose for it and just assumed that through some possible misunderstanding or absentmindedness on your part you forgot to return a result. – Booboo Sep 01 '21 at 16:55
  • The reason why *maxlen* values you set would not affect memory is because the deque will not get constructed until the `itertools.product` method yielded all its values and that would in itself take up a lot of memory. In my solution above the *intent* at least in using `imap` is to do *lazy* evaluation of `itertools.product` and to add elements to the deque one by one as they become available. However, it turns out that `itertools.product` essentially builds all of the results first in memory before yielding the values one by one and that's why even using `imap` memory explodes. – Booboo Sep 01 '21 at 17:28