gpt4 book ai didi

python - PySpark DataFrame 上分组数据的 Pandas 样式转换

转载 作者:太空狗 更新时间:2023-10-29 18:30:24 25 4
gpt4 key购买 nike

如果我们有一个由一列类别和一列值组成的 Pandas 数据框,我们可以通过执行以下操作删除每个类别中的均值:

df["DemeanedValues"] = df.groupby("Category")["Values"].transform(lambda g: g - numpy.mean(g))

据我所知,Spark 数据帧不直接提供这种分组/转换操作(我在 Spark 1.5.0 上使用 PySpark)。那么,实现这种计算的最佳方式是什么?

我试过使用 group-by/join 如下:

df2 = df.groupBy("Category").mean("Values")
df3 = df2.join(df)

但它非常慢,因为据我了解,每个类别都需要对 DataFrame 进行全面扫描。

我认为(但尚未证实)如果我将 group-by/mean 的结果收集到字典中,然后在 UDF 中使用该字典,我可以大大加快速度,如下所示:

nameToMean = {...}
f = lambda category, value: value - nameToMean[category]
categoryDemeaned = pyspark.sql.functions.udf(f, pyspark.sql.types.DoubleType())
df = df.withColumn("DemeanedValue", categoryDemeaned(df.Category, df.Value))

是否有一种惯用的方式来表达这种类型的操作而不牺牲性能?

最佳答案

I understand, each category requires a full scan of the DataFrame.

不,它没有。 DataFrame 聚合是使用类似于 aggregateByKey 的逻辑执行的。参见 DataFrame groupBy behaviour/optimization较慢的部分是 join,它需要排序/洗牌。但它仍然不需要按组扫描。

如果这是一个精确的代码,你使用它会很慢,因为你没有提供连接表达式。因此,它只是执行笛卡尔积。所以它不仅低效而且不正确。你想要这样的东西:

from pyspark.sql.functions import col

means = df.groupBy("Category").mean("Values").alias("means")
df.alias("df").join(means, col("df.Category") == col("means.Category"))

I think (but have not verified) that I can speed this up a great deal if I collect the result of the group-by/mean into a dictionary, and then use that dictionary in a UDF

虽然性能会因具体情况而异,但这是可能的。使用 Python UDF 的一个问题是它必须将数据移入和移出 Python。不过,这绝对值得一试。不过,您应该考虑为 nameToMean 使用广播变量。

Is there an idiomatic way to express this type of operation without sacrificing performance?

在 PySpark 1.6 中,您可以使用 broadcast 函数:

df.alias("df").join(
broadcast(means), col("df.Category") == col("means.Category"))

但它在 <= 1.5 中不可用。

关于python - PySpark DataFrame 上分组数据的 Pandas 样式转换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34464577/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com