gpt4 book ai didi

python - 在 pyspark 中应用用户定义聚合函数的替代方法

转载 作者:太空宇宙 更新时间:2023-11-03 14:01:46 25 4
gpt4 key购买 nike

我正在尝试将用户定义的聚合函数应用于 spark 数据帧,以应用加法平滑,请参见下面的代码:

import findspark
findspark.init()
import pyspark as ps
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, col, collect_list, concat_ws, udf

try:
sc
except NameError:
sc = ps.SparkContext()
sqlContext = SQLContext(sc)

df = sqlContext.createDataFrame([['A', 1],
['A',1],
['A',0],
['B',0],
['B',0],
['B',1]], schema=['name', 'val'])


def smooth_mean(x):
return (sum(x)+5)/(len(x)+5)

smooth_mean_udf = udf(smooth_mean)

df.groupBy('name').agg(collect_list('val').alias('val'))\
.withColumn('val', smooth_mean_udf('val')).show()

这样做有意义吗?据我所知,这并不能很好地扩展,因为我使用的是 udf。我也找不到 collect_list 的确切工作方式,名称中的 collect 部分似乎表明数据已“收集”到边缘节点,但我假设数据被“收集”到各个节点?

提前感谢您的任何反馈。

最佳答案

To my understanding this does not scale

你的理解是正确的,这里最大的问题是collect_list which is just good old groupByKey . Python udf 的影响要小得多,但对于简单的算术运算,使用它没有意义。

只需使用标准聚合

from pyspark.sql.functions import sum as sum_, count

(df
.groupBy("name")
.agg(((sum_("val") + 5) / (count("val") + 5)).alias("val"))
.show())

# +----+-----+
# |name| val|
# +----+-----+
# | B| 0.75|
# | A|0.875|
# +----+-----+

关于python - 在 pyspark 中应用用户定义聚合函数的替代方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48500850/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com