3

I have a bunch of Python scripts to run some data science models. It takes quite a while and the only way to speed it up is to use multiprocessing. To achieve this, I used the joblib library and it works really well. Unfortunately, however, this messes up logging, and the console output is garbled (expectedly so, however) too, as all processes dump their respective outputs simultaneously.

I am new to using the logging library and followed some other SO answers to try and get it to work. I am using 8 cores for processing. Using the answers on SO, I wrote out to log files, and expected 8 new files every iteration. However, it created 8 files the first iteration, and wrote/appended only to those 8 files every loop. This was a little inconvenient and so I explored a little more and found loguru and logzero. While they both cover examples using multiprocessing, neither of them show how to use it with joblib. Here is what I have so far:

run_models.py

import math
import multiprocessing
import time
from datetime import datetime
from loguru import logger

import pandas as pd
import psutil
from joblib import Parallel, delayed

import helper
import log
import prep_data
import stock_subscriber_data
import train_model


def get_pred(cust_df, stock_id, logger):

    logger.info('--------------------------------Stock loop {}-------------------------------'.format(stock_id))

    cust_stockid_df = stock_subscriber_data.get_stockid_data(cust_df, stock_id)
    weekly_timeseries, last_date, abn_df = prep_data.prep(cust_stockid_df, logger)  
    single_row_df = stock_subscriber_data.get_single_row(cust_df, stock_id)

    stock_subscriber_data.write_data(abn_df, 't1')
    test_y, prd = train_model.read_train_write(cust_df, stock_id, weekly_timeseries, last_date, logger)

    return True


def main():

    cust_df = stock_subscriber_data.get_data()
    cust_df = helper.clean_data(cust_df)
    stock_list = cust_df['intStockID'].unique()

    max_proc = max(math.ceil(((psutil.virtual_memory().total >> 30) - 100) / 50), 1)
    num_cores = min(multiprocessing.cpu_count(), max_proc)

    logger.add("test_loguru.log", format="{time} {level}: ({file}:{module} - {line}) >> {message}", level="INFO", enqueue=True)

    Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s, logger) for s in stock_list)


if __name__ == "__main__":
    main()

train_model.py

import math
from datetime import datetime
from itertools import product
from math import sqrt

import pandas as pd
from keras import backend
from keras.layers import Dense
from keras.layers import LSTM
from keras.models import Sequential
from numpy import array
from numpy import mean
from pandas import DataFrame
from pandas import concat
from sklearn.metrics import mean_squared_error

import helper
import stock_subscriber_data

# bunch of functions here that don't need logging...

# walk-forward validation for univariate data
def walk_forward_validation(logger, data, n_test, cfg):
    #... do stuff here ...
    #... and here ...
    logger.info('{0:.3f}'.format(error))
    return error, model


# score a model, return None on failure
def repeat_evaluate(logger, data, config, n_test, n_repeats=10):
    #... do stuff here ...
    #... and here ...
    logger.info('> Model{0} {1:.3f}'.format(key, result))
    return key, result, best_model



def read_train_write(data_df, stock_id, series, last_date, logger):
    #... do stuff here ...
    #... and here ...
    logger.info('done')

    #... do stuff here ...
    #... and here ...

    # bunch of logger.info() statements here... 
    #
    #
    #
    #

    #... do stuff here ...
    #... and here ...

    return test_y, prd

This works well when there is only one process at a time. However, I get an _pickle.PicklingError: Could not pickle the task to send it to the workers. error when running in multiprocess mode. What am I doing wrong? How can I remediate this? I don't mind switching to something other than loguru or logzero, as long as I can create one file with coherent logs, or even n files, each of which contain the logs of every iteration of joblib.

CodingInCircles
  • 2,305
  • 10
  • 56
  • 82
  • ***`_pickle.PicklingError:`***: I feel this has noting to do with logging, as it stands you are only log strings. [Edit] your question and show the **Full Traceback**. ***"the console output is garbled "***: Follow this answer [print-overwriting-itself](https://stackoverflow.com/questions/49130540/python-threading-print-overwriting-itself) – stovfl Dec 21 '19 at 08:38
  • By garbled, I mean that the output of each of the processes write to console and it becomes impossible to know "who" is writing what. As such, I don't have the issue in the question you've linked to. – CodingInCircles Dec 21 '19 at 23:05
  • ***"becomes impossible to know "who" is writing what"***: That's what the linked answer solves by serialize into a `Queue`. Results in **one point printing** no garbled output possible. – stovfl Dec 21 '19 at 23:18

2 Answers2

0

I got it to work by modifying my run_models.py. Now, I have one log file per loop. This creates a LOT of log files, but they're all relevant to each loop and not jumbled or anything. One step at a time, I guess. Here's what I did:

run_models.py

import math
import multiprocessing
import time
from datetime import datetime
from loguru import logger

import pandas as pd
import psutil
from joblib import Parallel, delayed

import helper
import log
import prep_data
import stock_subscriber_data
import train_model


def get_pred(cust_df, stock_id):

    log_file_name = "log_file_{}".format(stock_id)

    logger.add(log_file_name, format="{time} {level}: ({file}:{module} - {line}) >> {message}", level="INFO", enqueue=True)

    logger.info('--------------------------------Stock loop {}-------------------------------'.format(stock_id))

    cust_stockid_df = stock_subscriber_data.get_stockid_data(cust_df, stock_id)
    weekly_timeseries, last_date, abn_df = prep_data.prep(cust_stockid_df, logger)  
    single_row_df = stock_subscriber_data.get_single_row(cust_df, stock_id)

    stock_subscriber_data.write_data(abn_df, 't1')
    test_y, prd = train_model.read_train_write(cust_df, stock_id, weekly_timeseries, last_date, logger)

    return True


def main():

    cust_df = stock_subscriber_data.get_data()
    cust_df = helper.clean_data(cust_df)
    stock_list = cust_df['intStockID'].unique()

    max_proc = max(math.ceil(((psutil.virtual_memory().total >> 30) - 100) / 50), 1)
    num_cores = min(multiprocessing.cpu_count(), max_proc)

    Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s) for s in stock_list)


if __name__ == "__main__":
    main()
CodingInCircles
  • 2,305
  • 10
  • 56
  • 82
0

So the proper way to use loguru with joblib is to change the backend to multiprocessing.

from loguru import logger
from joblib import Parallel, delayed
from tqdm.autonotebook import tqdm 

logger.remove()
logger.add(sys.stdout, level = 'INFO', enqueue=True)

logger.info('test')
logger.debug('should not appear')

def do_thing(i):
    logger.info('item %i' %i)
    logger.debug('should not appaear')
    return None


Parallel(n_jobs=4, backend='multiprocessing')(
    delayed(do_thing)(i)
    for i in tqdm(range(10))
)



Parallel(n_jobs=4)(
    delayed(do_thing)(i)
    for i in tqdm(range(10))
)

The first parallel call works. The second gets the old issue you mentioned earlier

hongy
  • 1