9

I have tested that both logger and print can't print message in a pandas_udf , either in cluster mode or client mode.

Test code:

import sys
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import logging

logger = logging.getLogger('test')

spark = (SparkSession
.builder
.appName('test')
.getOrCreate())


df = spark.createDataFrame(pd.DataFrame({
    'y': np.random.randint(1, 10, (20,)),
    'ds': np.random.randint(1000, 9999, (20,)),
    'store_id' : ['a'] * 10 + ['b'] *7 + ['q']*3,
    'product_id' : ['c'] * 5 + ['d'] *12 + ['e']*3,
    })
)


@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    print('#'*100)
    logger.info('$'*100)
    logger.error('&'*100)
    return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])


df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)

Also note:

log4jLogger = spark.sparkContext._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("#"*50)

You can't use this in pandas_udf, because this log beyond to spark context object, you can't refer to spark session/context in a udf.

The only way I know is use Excetion as the answer I wrote below. But it is tricky and with drawback. I want to know if there is any way to just print message in pandas_udf.

Mithril
  • 11,666
  • 17
  • 90
  • 135

2 Answers2

2

Currently, I tried every way in spark 2.4 .

Without log, it is hard to debug a faulty pandas_udf. The only workable way I know can print error messgage in pandas_udf is raise Exception . So it really cost time to debug in this way, but there isn't a better way I know .

@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    print('#'*100)
    logger.info('$'*100)
    logger.error('&'*100)
    raise Exception('@'*100)  # The only way I know can print message but would break execution 
    return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])

The drawback is you can't keep spark running after print message.

Mithril
  • 11,666
  • 17
  • 90
  • 135
0

One thing you can do is to put the log message into the DataFrame itself. For example

@pandas_udf('y int, ds int, store_id string, product_id string, log string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    return pd.DataFrame([3, 5, 'store123', 'product123', 'My log message'], columns=['y', 'ds','store_id','product_id', 'log'])

After this, you can select the log column with related information into another DataFrame and output to file. Drop it from the original DataFrame.

It's not perfect, but it might be helpful.

niuer
  • 1,414
  • 2
  • 9
  • 10