gpt4 book ai didi

python - PySpark 逐行函数组合

转载 作者:IT老高 更新时间:2023-10-28 22:25:24 30 4
gpt4 key购买 nike

作为一个简化的示例,我有一个数据框“df”,其中包含“col1,col2”列,我想在对每一列应用函数后计算逐行最大值:

def f(x):
return (x+1)

max_udf=udf(lambda x,y: max(x,y), IntegerType())
f_udf=udf(f, IntegerType())

df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2)))

所以如果 df:

col1   col2
1 2
3 0

然后

df2:

col1   col2  result
1 2 3
3 0 4

以上似乎不起作用并产生“无法评估表达式:PythonUDF#f...”

我非常肯定“f_udf”在我的 table 上工作得很好,主要问题在于 max_udf。

在不创建额外列或使用基本 map/reduce 的情况下,有没有办法完全使用数据帧和 udf 来完成上述工作?我应该如何修改“max_udf”?

我也试过了:

max_udf=udf(max, IntegerType())

这会产生相同的错误。

我还确认了以下工作:

df2=(df.withColumn("temp1", f_udf(df.col1))
.withColumn("temp2", f_udf(df.col2))

df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2))

为什么我不能一次完成这些?

我希望看到一个可以推广到任何函数“f_udf”和“max_udf”的答案。

最佳答案

我遇到了类似的问题,在 this stackoverflow question 的答案中找到了解决方案

要将多列或整行传递给 UDF,请使用 struct :

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType

df = sqlContext.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))

count_empty_columns = udf(lambda row: len([x for x in row if x == None]), IntegerType())

new_df = df.withColumn("null_count", count_empty_columns(struct([df[x] for x in df.columns])))

new_df.show()

返回:

+----+----+----------+
| a| b|null_count|
+----+----+----------+
|null|null| 2|
| 1|null| 1|
|null| 2| 1|
+----+----+----------+

关于python - PySpark 逐行函数组合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36584812/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com