gpt4 book ai didi

pandas - 如何在 pyspark pandas_udf 中记录/打印消息?

转载 作者:行者123 更新时间:2023-12-03 21:01:33 25 4
gpt4 key购买 nike

我已经测试过 loggerprint无法在 pandas_udf 中打印消息,无论是集群模式还是客户端模式。

测试代码:

import sys
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import logging

logger = logging.getLogger('test')

spark = (SparkSession
.builder
.appName('test')
.getOrCreate())


df = spark.createDataFrame(pd.DataFrame({
'y': np.random.randint(1, 10, (20,)),
'ds': np.random.randint(1000, 9999, (20,)),
'store_id' : ['a'] * 10 + ['b'] *7 + ['q']*3,
'product_id' : ['c'] * 5 + ['d'] *12 + ['e']*3,
})
)


@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
print('#'*100)
logger.info('$'*100)
logger.error('&'*100)
return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])


df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)

另请注意:

log4jLogger = spark.sparkContext._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("#"*50)

您不能在 pandas_udf 中使用它,因为这个日志超出了 Spark 上下文对象,你不能在 udf 中引用 Spark session /上下文。

我知道的唯一方法是使用 Excetion正如我在下面写的答案。
但它很棘手并且有缺点。
我想知道是否有任何方法可以在 pandas_udf 中打印消息。

最佳答案

您可以做的一件事是将日志消息放入 DataFrame 本身。
例如

@pandas_udf('y int, ds int, store_id string, product_id string, log string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
return pd.DataFrame([3, 5, 'store123', 'product123', 'My log message'], columns=['y', 'ds','store_id','product_id', 'log'])


之后,您可以将包含相关信息的日志列选择到另一个 DataFrame 中并输出到文件。从原始 DataFrame 中删除它。

它并不完美,但可能会有所帮助。

关于pandas - 如何在 pyspark pandas_udf 中记录/打印消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57175767/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com