gpt4 book ai didi

python - PySpark 中 pandas_udf 的隐式模式?

转载 作者:行者123 更新时间:2023-11-28 17:00:41 27 4
gpt4 key购买 nike

This answer很好地解释了如何使用 pyspark 的 groupby 和 pandas_udf 进行自定义聚合。但是,我不可能像示例的这一部分所示那样手动声明我的架构

from pyspark.sql.types import *

schema = StructType([
StructField("key", StringType()),
StructField("avg_min", DoubleType())
])

因为我将返回 100 多个具有自动生成的名称的列。有什么方法可以告诉 PySpark 隐含地使用我的函数返回的架构并假设所有工作节点都相同?该架构也会在运行期间发生变化,因为我将不得不尝试使用我想要使用的预测变量,因此架构生成的自动化过程可能是一个选项...

最佳答案

基于 Sanxofons comment,我对如何自己实现这个有了一个想法:

from pyspark.sql.types import *

mapping = {"float64": DoubleType,
"object":StringType,
"int64":IntegerType} # Incomplete - extend with your types.

def createUDFSchemaFromPandas(dfp):
column_types = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
schema = StructType(column_types)
return schema

我所做的是获取样本 pandas df,将其传递给函数,然后查看返回的内容:

dfp = df_total.limit(100).toPandas()
df_return = my_UDF_function(dfp)
schema = createUDFSchemaFromPandas(df_return)

这似乎对我有用。问题是它有点递归(需要定义函数来获取模式,将模式定义为 udf)。我通过创建一个只传递数据帧的“包装器”UDF 解决了这个问题。

关于python - PySpark 中 pandas_udf 的隐式模式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54770485/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com