13

I have a function that parses a file and inserts the data into MySQL using SQLAlchemy. I've been running the function sequentially on the result of os.listdir() and everything works perfectly.

Because most of the time is spent reading the file and writing to the DB, I wanted to use multiprocessing to speed things up. Here is my pseduocode as the actual code is too long:

def parse_file(filename):
    f = open(filename, 'rb')
    data = f.read()
    f.close()

    soup = BeautifulSoup(data,features="lxml", from_encoding='utf-8')

    # parse file here

    db_record = MyDBRecord(parsed_data)

    session.add(db_record)
    session.commit()

pool = mp.Pool(processes=8)

pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])

The problem I'm seeing is that the script hangs and never finishes. I usually get 48 of 63 records into the database. Sometimes it's more, sometimes it's less.

I've tried using pool.close() and in combination with pool.join() and neither seems to help.

How do I get this script to complete? What am I doing wrong? I'm using Python 2.7.8 on a Linux box.

wspeirs
  • 1,241
  • 2
  • 11
  • 21
  • 1
    Where does the `session` variable come from? – Kevin Oct 07 '15 at 15:10
  • 1
    Doesn't the main code need to go in a function, otherwise it gets re-imported and run again? – Peter Wood Oct 07 '15 at 15:10
  • @Kevin it comes from an import: `from db_classes import session` which looks like this: `engine = create_engine('mysql://root:root@localhost/my_db')` `Base.metadata.create_all(engine)` `Base.metadata.bind = engine` `DBSession = sessionmaker(bind=engine)` `session = DBSession()` – wspeirs Oct 07 '15 at 15:56
  • Is it safe to share one session across multiple processes? That seems weird. – Kevin Oct 07 '15 at 15:58

2 Answers2

9

You need to put all code which uses multiprocessing, inside its own function. This stops it recursively launching new pools when multiprocessing re-imports your module in separate processes:

def parse_file(filename):
    ...

def main():
    pool = mp.Pool(processes=8)
    pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])

if __name__ == '__main__:
    main()

See the documentation about making sure your module is importable, also the advice for running on Windows(tm)

Peter Wood
  • 22,682
  • 5
  • 57
  • 94
3

The problem was a combination of 2 things:

  1. my pool code being called multiple times (thanks @Peter Wood)
  2. my DB code opening too many sessions (and/or) sharing sessions

I made the following changes and everything works now: Original File

def parse_file(filename):
    f = open(filename, 'rb')
    data = f.read()
    f.close()

    soup = BeautifulSoup(data,features="lxml", from_encoding='utf-8')

    # parse file here

    db_record = MyDBRecord(parsed_data)

    session = get_session() # see below
    session.add(db_record)
    session.commit()

pool = mp.Pool(processes=8)

pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])

DB File

def get_session():
    engine = create_engine('mysql://root:root@localhost/my_db')

    Base.metadata.create_all(engine)
    Base.metadata.bind = engine

    db_session = sessionmaker(bind=engine)

    return db_session()
wspeirs
  • 1,241
  • 2
  • 11
  • 21