gpt4 book ai didi

python - PySpark 中的 Groupby 和 UDF/UDAF,同时保持 DataFrame 结构

转载 作者:太空宇宙 更新时间:2023-11-04 04:28:52 25 4
gpt4 key购买 nike

我是 PySpark 的新手,正在为简单的数据帧操作而苦苦挣扎。我有一个类似于以下的数据框:

product    period     rating   product_Desc1   product_Desc2 ..... more columns 
a 1 60 foo xx
a 2 70 foo xx
a 3 59 foo xx
b 1 50 bar yy
b 2 55 bar yy
c 1 90 foo bar xy
c 2 100 foo bar xy

我想对产品进行分组,添加列以计算评分的算术、几何和谐波平均值同时还保留数据框中的其余列,这些列在每个产品中都是一致的。

我尝试结合使用内置函数和 UDF。例如:

a_means = df.groupBy("product").agg(mean("rating").alias("a_mean")
g_means = df.groupBy("product").agg(udf_gmean("rating").alias("g_mean")

哪里:

def g_mean(x):
gm = reduce(mul,x)**(1/len(x))
return gm

udf_gmean = udf(g_mean, FloatType())

然后我会将 a_means 和 g_means 输出与产品上的原始数据框结合起来并删除重复项。但是,此方法返回错误,对于 g_means,指出“rating”不参与 groupBy,也不是用户定义的聚合函数....

我也尝试过使用 SciPy 的 gmean 模块,但我收到的错误消息指出 ufunc 'log' 不适合输入类型,尽管据我所知所有评级列都是整数类型。

网站上也有类似的问题,但我找不到似乎可以解决我遇到的这个问题的任何问题。我真的很感激你的帮助,因为它让我发疯!

提前致谢,如果我提供的信息不够,我今天应该能够很快提供任何进一步的信息。

值得注意的是,为了提高效率,我无法像使用 Pandas 数据帧那样简单地转换为 Pandas 和转换...而且我使用的是 Spark 2.2,无法更新!

最佳答案

这样的事情怎么样

from pyspark.sql.functions import avg
df1 = df.select("product","rating").rdd.map(lambda x: (x[0],(1.0,x[1]*1.0))).reduceByKey(lambda x,y: (x[0]+y[0], x[1]*y[1])).toDF(['product', 'g_mean'])
gdf = df1.select(df1['product'],pow(df1['g_mean._2'],1.0/df1['g_mean._1']).alias("rating_g_mean"))
display(gdf)

+-------+-----------------+
|product| rating_g_mean|
+-------+-----------------+
| a|62.81071936240795|
| b|52.44044240850758|
| c|94.86832980505137|
+-------+-----------------+


df1 = df.withColumn("h_mean", 1.0/df["rating"])
hdf = df1.groupBy("product").agg(avg(df1["rating"]).alias("rating_mean"), (1.0/avg(df1["h_mean"])).alias("rating_h_mean"))
sdf = hdf.join(gdf, ['product'])
display(sdf)

+-------+-----------+-----------------+-----------------+
|product|rating_mean| rating_h_mean| rating_g_mean|
+-------+-----------+-----------------+-----------------+
| a| 63.0|62.62847514743051|62.81071936240795|
| b| 52.5|52.38095238095239|52.44044240850758|
| c| 95.0|94.73684210526315|94.86832980505137|
+-------+-----------+-----------------+-----------------+


fdf = df.join(sdf, ['product'])
display(fdf.sort("product"))


+-------+------+------+-------------+-------------+-----------+-----------------+-----------------+
|product|period|rating|product_Desc1|product_Desc2|rating_mean| rating_h_mean| rating_g_mean|
+-------+------+------+-------------+-------------+-----------+-----------------+-----------------+
| a| 3| 59| foo| xx| 63.0|62.62847514743051|62.81071936240795|
| a| 2| 70| foo| xx| 63.0|62.62847514743051|62.81071936240795|
| a| 1| 60| foo| xx| 63.0|62.62847514743051|62.81071936240795|
| b| 2| 55| bar| yy| 52.5|52.38095238095239|52.44044240850758|
| b| 1| 50| bar| yy| 52.5|52.38095238095239|52.44044240850758|
| c| 2| 100| foo bar| xy| 95.0|94.73684210526315|94.86832980505137|
| c| 1| 90| foo bar| xy| 95.0|94.73684210526315|94.86832980505137|
+-------+------+------+-------------+-------------+-----------+-----------------+-----------------+

关于python - PySpark 中的 Groupby 和 UDF/UDAF,同时保持 DataFrame 结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53031808/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com