gpt4 book ai didi

pyspark - 如何在保留最新数据的同时从 Spark 数据框中删除重复项?

转载 作者:行者123 更新时间:2023-12-01 01:41:50 25 4
gpt4 key购买 nike

我正在使用 spark 从 Amazon S3 加载 json 文件。我想根据保留最新的数据框的两列删除重复项(我有时间戳列)。最好的方法是什么?请注意,重复项可能分布在多个分区中。我可以在不改组的情况下删除保留最后一条记录的重复项吗?我正在处理 1 TB 的数据。

我正在考虑按这两个列对数据框进行分区,这样所有重复记录都将“一致地散列”到同一分区中,因此分区级别排序后删除重复项将消除所有重复项,只保留一个。我不知道是否有可能。任何信息表示赞赏。

最佳答案

使用 row_number() 窗口函数可能更容易完成您的任务,如下 c1是时间戳列,c2 , c3是用于分区数据的列:

from pyspark.sql import Window, functions as F

# create a win spec which is partitioned by c2, c3 and ordered by c1 in descending order
win = Window.partitionBy('c2', 'c3').orderBy(F.col('c1').desc())

# set rn with F.row_number() and filter the result by rn == 1
df_new = df.withColumn('rn', F.row_number().over(win)).where('rn = 1').drop('rn')
df_new.show()

编辑:

如果您只需要重复项并删除唯一行,请添加另一个字段:
from pyspark.sql import Window, functions as F

# create a win spec which is partitioned by c2, c3 and ordered by c1 in descending order
win = Window.partitionBy('c2', 'c3').orderBy(F.col('c1').desc())

# window to cover all rows in the same partition
win2 = Window.partitionBy('c2', 'c3') \
.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# set new columns: rn, cnt and filter the result by rn == 1 and cnt > 1
df_new = df.withColumn('rn', F.row_number().over(win)) \
.withColumn('cnt', F.count('c1').over(win2)) \
.where('rn = 1 and cnt > 1') \
.drop('rn', 'cnt')
df_new.show()

关于pyspark - 如何在保留最新数据的同时从 Spark 数据框中删除重复项?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55660085/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com