gpt4 book ai didi

apache-spark - 查找 Spark DataFrame 中每组的最大行数

转载 作者:行者123 更新时间:2023-12-03 07:03:43 24 4
gpt4 key购买 nike

我尝试使用 Spark 数据帧而不是 RDD,因为它们看起来比 RDD 更高级,并且往往会生成更可读的代码。

在 14 个节点的 Google Dataproc 集群中,我有大约 600 万个名称,这些名称由两个不同的系统转换为 id:sasb。每个Row包含nameid_said_sb。我的目标是生成从 id_said_sb 的映射,这样对于每个 id_sa,对应的 id_sb 是附加到 id_sa 的所有名称中最常见的 ID。

让我们尝试用一个例子来阐明。如果我有以下行:

[Row(name='n1', id_sa='a1', id_sb='b1'),
Row(name='n2', id_sa='a1', id_sb='b2'),
Row(name='n3', id_sa='a1', id_sb='b2'),
Row(name='n4', id_sa='a2', id_sb='b2')]

我的目标是生成从 a1b2 的映射。事实上,与 a1 关联的名称是 n1n2n3,它们分别映射到 b1 b2b2,因此 b2 是与 a1 关联的名称中最常见的映射。同样,a2 将映射到 b2。可以假设总会有赢家:无需打破平局。

我希望可以在我的数据帧上使用groupBy(df.id_sa),但我不知道下一步该怎么做。我希望聚合能够最终生成以下行:

[Row(id_sa=a1, max_id_sb=b2),
Row(id_sa=a2, max_id_sb=b2)]

但也许我尝试使用错误的工具,我应该重新使用 RDD。

最佳答案

使用join(如果出现平局,则会导致组中出现多行):

import pyspark.sql.functions as F
from pyspark.sql.functions import count, col

cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")

cnts.join(maxs,
(col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))

使用窗口函数(将放弃联系):

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())

(cnts
.withColumn("rn", row_number().over(w))
.where(col("rn") == 1)
.select("id_sa", "id_sb"))

使用struct排序:

from pyspark.sql.functions import struct

(cnts
.groupBy("id_sa")
.agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
.select(col("id_sa"), col("max.id_sb")))

另请参阅How to select the first row of each group?

关于apache-spark - 查找 Spark DataFrame 中每组的最大行数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35218882/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com