gpt4 book ai didi

python - 在 PySpark 中使用多列从 groupby 中获取具有最大值的行

转载 作者:太空宇宙 更新时间:2023-11-03 14:58:53 25 4
gpt4 key购买 nike

我有一个类似于

的数据框
from pyspark.sql.functions import avg, first

rdd = sc.parallelize(
[
(0, "A", 223,"201603", "PORT"),
(0, "A", 22,"201602", "PORT"),
(0, "A", 22,"201603", "PORT"),
(0, "C", 22,"201605", "PORT"),
(0, "D", 422,"201601", "DOCK"),
(0, "D", 422,"201602", "DOCK"),
(0, "C", 422,"201602", "DOCK"),
(1,"B", 3213,"201602", "DOCK"),
(1,"A", 3213,"201602", "DOCK"),
(1,"C", 3213,"201602", "PORT"),
(1,"B", 3213,"201601", "PORT"),
(1,"B", 3213,"201611", "PORT"),
(1,"B", 3213,"201604", "PORT"),
(3,"D", 3999,"201601", "PORT"),
(3,"C", 323,"201602", "PORT"),
(3,"C", 323,"201602", "PORT"),
(3,"C", 323,"201605", "DOCK"),
(3,"A", 323,"201602", "DOCK"),
(2,"C", 2321,"201601", "DOCK"),
(2,"A", 2321,"201602", "PORT")
]
)
df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])

我需要按 idtype 进行聚合,并获得每组出现次数最多的 ship。例如,

grouped = df_data.groupby('id','type', 'ship').count()

有一列是每组的次数:

+---+----+----+-----+
| id|type|ship|count|
+---+----+----+-----+
| 3| A|DOCK| 1|
| 0| D|DOCK| 2|
| 3| C|PORT| 2|
| 0| A|PORT| 3|
| 1| A|DOCK| 1|
| 1| B|PORT| 3|
| 3| C|DOCK| 1|
| 3| D|PORT| 1|
| 1| B|DOCK| 1|
| 1| C|PORT| 1|
| 2| C|DOCK| 1|
| 0| C|PORT| 1|
| 0| C|DOCK| 1|
| 2| A|PORT| 1|
+---+----+----+-----+

我需要得到

+---+----+----+-----+
| id|type|ship|count|
+---+----+----+-----+
| 0| D|DOCK| 2|
| 0| A|PORT| 3|
| 1| A|DOCK| 1|
| 1| B|PORT| 3|
| 2| C|DOCK| 1|
| 2| A|PORT| 1|
| 3| C|PORT| 2|
| 3| A|DOCK| 1|
+---+----+----+-----+

我尝试结合使用

grouped.groupby('id', 'type', 'ship')\
.agg({'count':'max'}).orderBy('max(count)', ascending=False).\
groupby('id', 'type', 'ship').agg({'ship':'first'})

但是失败了。有没有办法从分组依据的计数中获取最大行数?

在 pandas 上,这个 oneliner 完成了这项工作:

df_pd = df_data.toPandas()
df_pd_t = df_pd[df_pd['count'] == df_pd.groupby(['id','type', ])['count'].transform(max)]

最佳答案

根据您的预期输出,您似乎只按 idship 分组 - 因为您在 grouped 中已经有不同的值 -并因此根据 idshipcount 列删除重复元素,按 type 排序。

为此,我们可以使用 Window 函数:

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

window = (Window
.partitionBy(grouped['id'],
grouped['ship'])
.orderBy(grouped['count'].desc(), grouped['type']))


(grouped
.select('*', rank()
.over(window)
.alias('rank'))
.filter(col('rank') == 1)
.orderBy(col('id'))
.dropDuplicates(['id', 'ship', 'count'])
.drop('rank')
.show())
+---+----+----+-----+
| id|type|ship|count|
+---+----+----+-----+
| 0| D|DOCK| 2|
| 0| A|PORT| 3|
| 1| A|DOCK| 1|
| 1| B|PORT| 3|
| 2| C|DOCK| 1|
| 2| A|PORT| 1|
| 3| A|DOCK| 1|
| 3| C|PORT| 2|
+---+----+----+-----+

关于python - 在 PySpark 中使用多列从 groupby 中获取具有最大值的行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40116117/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com