如何在pyspark pandas_udf中记录/打印消息?

9rbhqvlz  于 2023-03-07  发布在  Spark
关注(0)|答案(3)|浏览(310)

我已经测试过loggerprint都不能在pandas_udf中打印消息,无论是在群集模式还是客户端模式下。
试验代码:

import sys
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import logging

logger = logging.getLogger('test')

spark = (SparkSession
.builder
.appName('test')
.getOrCreate())

df = spark.createDataFrame(pd.DataFrame({
    'y': np.random.randint(1, 10, (20,)),
    'ds': np.random.randint(1000, 9999, (20,)),
    'store_id' : ['a'] * 10 + ['b'] *7 + ['q']*3,
    'product_id' : ['c'] * 5 + ['d'] *12 + ['e']*3,
    })
)

@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    print('#'*100)
    logger.info('$'*100)
    logger.error('&'*100)
    return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])

df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)

另请注意:

log4jLogger = spark.sparkContext._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("#"*50)

你不能在pandas_udf中使用这个,因为这个日志超出了spark上下文对象,你不能在udf中引用spark会话/上下文。
我知道的唯一方法是使用Excetion作为我在下面写的答案。但是这很棘手并且有缺点。我想知道是否有任何方法可以只打印panda_udf中的消息。

dced5bon

dced5bon1#

目前,我在spark 2. 4中尝试了各种方法。
如果没有日志,很难调试一个有问题的panda_udf。我知道的唯一可行的方法是在panda_udf中打印错误消息。所以用这种方法调试确实很费时间,但我知道没有更好的方法。

@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    print('#'*100)
    logger.info('$'*100)
    logger.error('&'*100)
    raise Exception('@'*100)  # The only way I know can print message but would break execution 
    return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])

缺点是你不能保持Spark运行后,打印消息。

9wbgstp7

9wbgstp72#

您可以做的一件事是将日志消息放入DataFrame本身。

@pandas_udf('y int, ds int, store_id string, product_id string, log string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    return pd.DataFrame([3, 5, 'store123', 'product123', 'My log message'], columns=['y', 'ds','store_id','product_id', 'log'])

之后,您可以选择日志列相关信息到另一个DataFrame并输出到文件中。将其从原始DataFrame中删除。
不是很完美,但可能有用。

csbfibhn

csbfibhn3#

不能打印并不是真的。当我在UDF中print()时,消息会显示在Spark task 的stderr中。诀窍是确保你看对了地方。下面是我正在开发的一个应用程序的示例。

相关问题