98

I am trying to solve a big numerical problem which involves lots of subproblems, and I'm using Python's multiprocessing module (specifically Pool.map) to split up different independent subproblems onto different cores. Each subproblem involves computing lots of sub-subproblems, and I'm trying to effectively memoize these results by storing them to a file if they have not been computed by any process yet, otherwise skip the computation and just read the results from the file.

I'm having concurrency issues with the files: different processes sometimes check to see if a sub-subproblem has been computed yet (by looking for the file where the results would be stored), see that it hasn't, run the computation, then try to write the results to the same file at the same time. How do I avoid writing collisions like this?

Big Dogg
  • 2,334
  • 3
  • 19
  • 21
  • 5
    Check out an example from the documentation of using [`multiprocessing.Lock`](http://docs.python.org/2/library/multiprocessing.html#synchronization-between-processes) to synchronize multiple processes. – John Vinyard Nov 19 '12 at 01:22
  • 17
    You could have a only single process writing results, with a Queue as input that could be fed by the other worker processes. I believe it would be safe to have all the worker processes read-only. – GP89 Nov 19 '12 at 01:27
  • 1
    I should have mentioned that, to make things more complicated, I'm running multiple different big main problems at the same time on a cluster, with each one writing results to sub-subproblems on the same networked file system. Thus I can get collisions from processes running on separate machines entirely (so I don't think solutions using things like multiprocessing.Lock will work). – Big Dogg Nov 19 '12 at 01:45
  • Is the problem you're having with file write collisions, or is it just that you don't want to duplicate work in situations where one worker starts solving a sub-subproblem while another is already working on it? The latter is a bit more difficult to solve (more synchronization is required). – Blckknght Nov 19 '12 at 01:58
  • Well originally I was having file write collisions, but I find that checking for the file's existence immediately before writing (instead of relying on the check I do before I start computing the sub-subproblem) took care of that. Now it's more the latter; I'd like to avoid duplicate work if possible (and can imagine others in the same situation). – Big Dogg Nov 19 '12 at 02:15
  • 3
    If your networked files system supports file locking, you can use the os specific file create method to exclusively create the file and hold an exclusive lock on it until the results are ready, then close it. Any process that failed to "win" the create race would try to open it and re-try (with a delay) until the were able to open it, then they can read the results. – JimP Nov 19 '12 at 02:57
  • Ah, thanks JimP! That sounds like exactly what I need. I'll look into it. – Big Dogg Nov 19 '12 at 03:06
  • 11
    You're essentially programming a database server here. Have you considered using an existing one? – georg Nov 19 '12 at 09:06

2 Answers2

159

@GP89 mentioned a good solution. Use a queue to send the writing tasks to a dedicated process that has sole write access to the file. All the other workers have read only access. This will eliminate collisions. Here is an example that uses apply_async, but it will work with map too:

import multiprocessing as mp
import time

fn = 'c:/temp/temp.txt'

def worker(arg, q):
    '''stupidly simulates long running process'''
    start = time.clock()
    s = 'this is a test'
    txt = s
    for i in range(200000):
        txt += s 
    done = time.clock() - start
    with open(fn, 'rb') as f:
        size = len(f.read())
    res = 'Process' + str(arg), str(size), done
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''

    with open(fn, 'w') as f:
        while 1:
            m = q.get()
            if m == 'kill':
                f.write('killed')
                break
            f.write(str(m) + '\n')
            f.flush()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()    
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    for i in range(80):
        job = pool.apply_async(worker, (i, q))
        jobs.append(job)

    # collect results from the workers through the pool result queue
    for job in jobs: 
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

if __name__ == "__main__":
   main()
Steven C. Howell
  • 14,502
  • 13
  • 69
  • 84
MikeHunter
  • 3,924
  • 1
  • 17
  • 13
  • 2
    Hey Mike, thanks for the answer. I think this would work for the question as I phrased it, but I'm not so sure if it will solve the full problem as outlined in the comments to the question, specifically how I have several main programs running across several machines on a networked filesystem, all of which might have processes that will try to write to the same file. (FWIW, I got around my personal problem in a hacky way a while ago but am commenting in case others have similar issues.) – Big Dogg Nov 25 '12 at 05:14
  • 6
    I really would like to upvote this many times. This has been helpful so many times for me. Once more today. – Eduardo Feb 19 '14 at 10:44
  • 1
    Thanks Mike - I'd been struggling with how to use MP Queues. Your example makes it very clear and straightforward. – Anurag Mar 24 '14 at 14:50
  • 14
    I had to add a `pool.join()` below `pool.close()`. Otherwise my workers would finish before the listener and the process would just stop. – herrherr May 20 '14 at 13:49
  • Many thanks for this! Note that I had to include herrherr's suggestion, lest it may cause a hard-to-detect flaw in at least my scenario. – Joel Sjöstrand Feb 23 '15 at 13:16
  • 3
    What about when the consumer is greatly outnumbered and causes memory issues? How would you implement multiple consumers all writing to the same file? – ccdpowell Mar 01 '16 at 18:56
  • 23
    why `mp.cpu_count() + 2` when setting number of processes? – JenkinsY Jan 02 '18 at 09:31
  • After adopting this code, my program exits before the listener finishes its work, how could I fix that? – zyxue Jul 28 '18 at 03:32
  • This works great, except that it puts my outputs in a random order to disk, instead of doing it in the order I push data through. I'm using map rather than async for the worker threads. Unsure how to solve that issue. – Will Aug 08 '18 at 18:05
  • 1
    Tested on Linux, need to change `f = open(fn, 'wb') ` to `f = open(fn, 'w') ` to store the result, other wise the output file will be blank while the code can run like a charm. – Jason Goal Feb 20 '19 at 14:14
  • [Here](https://gist.github.com/StevenCHowell/c40c8879b71ee9979231a40d6fda1cbe) is an expanded version of this example. – Steven C. Howell Aug 16 '19 at 21:06
  • this freezes on `pool.join()`, copied this exactly.. any idea why? – Rafael Apr 10 '20 at 17:11
  • 1
    @JenkinsY `mp.cpu_count()+2` is just common practice or "rule of thumb" to ensure that the pool will be saturated. – pedrosaurio Oct 12 '20 at 13:49
2

It looks to me that you need to use Manager to temporarily save your results to a list and then write the results from the list to a file. Also, use starmap to pass the object you want to process and the managed list. The first step is to build the parameter to be passed to starmap, which includes the managed list.

from multiprocessing import Manager
from multiprocessing import Pool  
import pandas as pd

def worker(row, param):
    # do something here and then append it to row
    x = param**2
    row.append(x)

if __name__ == '__main__':
    pool_parameter = [] # list of objects to process
    with Manager() as mgr:
        row = mgr.list([])

        # build list of parameters to send to starmap
        for param in pool_parameter:
            params.append([row,param])

        with Pool() as p:
            p.starmap(worker, params)

From this point you need to decide how you are going to handle the list. If you have tons of RAM and a huge data set feel free to concatenate using pandas. Then you can save of the file very easily as a csv or a pickle.

        df = pd.concat(row, ignore_index=True)

        df.to_pickle('data.pickle')
        df.to_csv('data.csv')
ggorlen
  • 33,459
  • 6
  • 59
  • 67
fizix137
  • 100
  • 6
  • 4
    Can I get some feedback on why this was down-voted? I see that the accepted answer is way better. I just want to learn. – fizix137 Jun 17 '20 at 20:22
  • 1
    what is params here? I cannot see it being initialised anywhere. Also, mgr.list([]), would it be a empty list? you are appending tuple row and param to params, param contain the object to be processed, what row contains? – akshit bhatia Apr 11 '21 at 16:23
  • 2
    It might be downvoted since in your code all the process outputs are stored in memory, this doesn't solve the issue. OP asks about writing each process output to a file while processing. The main problem to solve here is to avoid collision, e.g. multiple processes trying to access the file at the same time. – Kerem T Apr 15 '21 at 04:45