gpt4 book ai didi

python - 如何使用pyspark收集两个连续日期之间的新ID列表

转载 作者:行者123 更新时间:2023-12-04 07:47:57 25 4
gpt4 key购买 nike

我正在 PYSPARK 数据框中的一周列进行分组,并在两个连续的周数之间收集新的不同 ID。
我尝试在周列上进行分组,然后在数据帧上聚合 collect_set 方法以获取所有可用 ID,然后我逐个比较列表以按周数获取两个连续列表之间的不同 ID,如下所示:
输入 :


星期
ID


1
ID_1

1
ID_2

1
ID_3

2
ID_1

2
ID_4

2
ID_5

3
ID_6


输出 :


星期
ID_List
diff_list
new_different_ID_count


1
ID_1、ID_2、ID_3
ID_1、ID_2、ID_3
——

2
ID_1、ID_4、ID_5
ID_4, ID_5
2

3
ID_2, ID_6
ID_6
1


这里的问题是,由于在我的情况下有大量的 ID(超过 900 万个 ID),我认为由于内存不足错误(错误 500),spark session 被终止了!
有没有其他解决方案可以使用 PYSPARK 在连续两周之间获取新的不同 ID 列表?

最佳答案

为了扩展,您需要按 ID 进行聚合,而不收集任何结果。尝试以下方法:

import pyspark.sql.functions as F
from pyspark.sql import Window

data = spark.createDataFrame([(1, "ID_1"), (1, "ID_2"), (1, "ID_3"),
(2, "ID_1"), (2, "ID_4"), (2, "ID_5"),
(3, "ID_6")], ["Week", "ID"])

win = Window.partitionBy('ID').orderBy('Week')

agg_data = (
data
.withColumn("prevWeek", F.lag("Week", offset=1).over(win))
.withColumn("isInPrevWeek",
F.col("prevWeek").isNotNull() & ((F.col("Week") - F.col("prevWeek")) == 1))
.filter(~F.col("isInPrevWeek"))
.groupBy("Week")
.agg(F.count("*").alias("newIDs"),
F.array_sort(F.collect_list("ID")).alias("showNewIDs")) # Remove in production
.orderBy("Week")
)

agg_data.show()
一、函数 lag使用 Window 函数创建一个包含前一周的新列,该列允许单独考虑每个 ID(分区)并按时间顺序对周进行排序。这可以很好地扩展,因为 Spark 任务由一组 ID 组成。
然后, isInPrevWeek检查 ID 是否确实在前一周。如果是,则过滤掉该记录。现在,您只需要按周计算remining ID。
+----+------+------------------+
|Week|newIDs| showNewIDs|
+----+------+------------------+
| 1| 3|[ID_1, ID_2, ID_3]|
| 2| 2| [ID_4, ID_5]|
| 3| 1| [ID_6]|
+----+------+------------------+
请注意,该代码段收集 ID 仅用于说明目的,但这不是计算所必需的。

关于python - 如何使用pyspark收集两个连续日期之间的新ID列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67121704/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com