gpt4 book ai didi

python - withColumn 中的用户定义函数只调用一次而不是每个 DF 行

转载 作者:太空宇宙 更新时间:2023-11-04 01:53:25 25 4
gpt4 key购买 nike

我有一个用户定义函数的问题,该函数是为连接来自一个数据帧的值而构建的,该数据帧与来自另一个数据帧的索引值相匹配。

以下是我尝试匹配的简化数据框:

a_df:
+-------+------+
| index | name |
+-------+------+
| 1 | aaa |
| 2 | bbb |
| 3 | ccc |
| 4 | ddd |
| 5 | eee |
+-------+------+

b_df:
+-------+------+
| index | code |
+-------+------+
| 1 | 101 |
| 2 | 102 |
| 3 | 101 |
| 3 | 102 |
| 4 | 103 |
| 4 | 104 |
| 5 | 101 |
+-------+------+

udf函数&调用:

> def concatcodes(index, dataframe):
> res = dataframe.where(dataframe.index == index).collect()
> reslist = "|".join([value.code for value in res])
> return reslist
>
> spark.udf.register("concatcodes", concatcodes, StringType())
>
> resultDF = a_DF.withColumn("codes", lit(concatcodes(a_DF.index, b_df)))

我希望为 a_DF 数据帧的每一行调用该函数,从而产生以下输出:

+-------+------+-------+
| index | name |codes |
+-------+------+-------+
| 1 | aaa |101 |
| 2 | bbb |102 |
| 3 | ccc |101|102|
| 4 | ddd |103|104|
| 5 | eee |101 |
+-------+------+-------+

但是,该函数似乎只被调用一次,整个列作为其参数传递,导致以下输出:

+-------+------+---------------------------+
| index | name |codes |
+-------+------+---------------------------+
| 1 | aaa |101|102|101|102|103|104|101| |
| 2 | bbb |101|102|101|102|103|104|101|
| 3 | ccc |101|102|101|102|103|104|101|
| 4 | ddd |101|102|101|102|103|104|101|
| 5 | eee |101|102|101|102|103|104|101|
+-------+------+---------------------------+

我想我在 .withColum 方法中调用 UDF 时犯了根本性的错误,但我不知道是什么 - 如果有人指出我的逻辑有什么问题,我将不胜感激。

最佳答案

首先,你don't need a udf为了这。您问题的核心本质上是 Concatenating string by rows in pyspark和一个 join。以下将产生所需的输出:

from pyspark.sql.functions import collect_list, concat_ws

resultDF = a_df.join(
b_df.groupBy("index").agg(concat_ws("|", collect_list("code")).alias("code")),
on="index"
)

resultDF .show()
#+-----+----+-------+
#|index|name| code|
#+-----+----+-------+
#| 3| ccc|101|102|
#| 5| eee| 101|
#| 1| aaa| 101|
#| 4| ddd|103|104|
#| 2| bbb| 102|
#+-----+----+-------+

请记住,spark 数据帧本质上是无序的,除非您使用 sortorderBy 显式引入顺序。


要解决的问题:

I suppose I am doing something fundamentally wrong when it comes to calling UDF in the .withColum method but I could not figure out what

如果您查看代码的执行计划,您会发现 where(dataframe.index == index) 部分基本上被忽略了。

resultDF = a_DF.withColumn("codes", lit(concatcodes(a_DF.index, b_df)))
resultDF.explain()
#== Physical Plan ==
#*(1) Project [index#0, name#1, 101|102|101|102|103|104|101 AS codes#64]
#+- Scan ExistingRDD[index#0,name#1]

我怀疑这是因为 the python udf being applied in batch mode, rather than on a Row basis .你不能use a Dataframe inside a udf ,所以必须发生的事情是优化器运行一次 collect 并将其用于所有行。

这里更大的问题是在 udf 中调用 collect 的方法违背了 spark 的目的(这是您的根本误解)。使用 spark 的全部意义在于将您的计算并行分布到多个执行程序。当您使用 collect 操作时,这会将所有数据带入驱动程序的本地内存中。 (在你的情况下,它似乎会被广播回执行者)。

相反,当您需要引用来自多个 spark DataFrame 的数据时,请使用 join。对于 udf,您可以将它们视为本质上仅用于对单个 spark DataFrame 的单个 Row 进行操作。

关于python - withColumn 中的用户定义函数只调用一次而不是每个 DF 行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57480594/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com