gpt4 book ai didi

apache-spark - Spark : Find the value with the highest occurrence per group over rolling time window

转载 作者:行者123 更新时间:2023-12-05 03:06:57 27 4
gpt4 key购买 nike

从以下 spark 数据框开始:

from io import StringIO
import pandas as pd
from pyspark.sql.functions import col


pd_df = pd.read_csv(StringIO("""device_id,read_date,id,count
device_A,2017-08-05,4041,3
device_A,2017-08-06,4041,3
device_A,2017-08-07,4041,4
device_A,2017-08-08,4041,3
device_A,2017-08-09,4041,3
device_A,2017-08-10,4041,1
device_A,2017-08-10,4045,2
device_A,2017-08-11,4045,3
device_A,2017-08-12,4045,3
device_A,2017-08-13,4045,3"""),infer_datetime_format=True, parse_dates=['read_date'])

df = spark.createDataFrame(pd_df).withColumn('read_date', col('read_date').cast('date'))
df.show()

输出:

+--------------+----------+----+-----+
|device_id | read_date| id|count|
+--------------+----------+----+-----+
| device_A|2017-08-05|4041| 3|
| device_A|2017-08-06|4041| 3|
| device_A|2017-08-07|4041| 4|
| device_A|2017-08-08|4041| 3|
| device_A|2017-08-09|4041| 3|
| device_A|2017-08-10|4041| 1|
| device_A|2017-08-10|4045| 2|
| device_A|2017-08-11|4045| 3|
| device_A|2017-08-12|4045| 3|
| device_A|2017-08-13|4045| 3|
+--------------+----------+----+-----+

我想在 3 天的滚动窗口中为每个 (device_id, read_date) 组合找到最常见的 ID。对于时间窗口选择的每组行,我需要通过对每个 id 的计数求和来找到最频繁的 id,然后返回排名靠前的 id。

预期输出:

+--------------+----------+----+
|device_id | read_date| id|
+--------------+----------+----+
| device_A|2017-08-05|4041|
| device_A|2017-08-06|4041|
| device_A|2017-08-07|4041|
| device_A|2017-08-08|4041|
| device_A|2017-08-09|4041|
| device_A|2017-08-10|4041|
| device_A|2017-08-11|4045|
| device_A|2017-08-12|4045|
| device_A|2017-08-13|4045|
+--------------+----------+----+

我开始认为这只有使用自定义聚合函数才有可能。由于 spark 2.3 还没有出来,我将不得不在 Scala 中编写它或使用 collect_list。我错过了什么吗?

最佳答案

添加窗口:

from pyspark.sql.functions import window, sum as sum_, date_add

df_w = df.withColumn(
"read_date", window("read_date", "3 days", "1 day")["start"].cast("date")
)
# Then handle the counts
df_w = df_w.groupBy('device_id', 'read_date', 'id').agg(sum_('count').alias('count'))

使用 Find maximum row per group in Spark DataFrame 中的解决方案之一例如

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

rolling_window = 3

top_df = (
df_w
.withColumn(
"rn",
row_number().over(
Window.partitionBy("device_id", "read_date")
.orderBy(col("count").desc())
)
)
.where(col("rn") == 1)
.orderBy("read_date")
.drop("rn")
)

# results are calculated on the start of the time window - adjust read_date as needed

final_df = top_df.withColumn('read_date', date_add('read_date', rolling_window - 1))

final_df.show()

# +---------+----------+----+-----+
# |device_id| read_date| id|count|
# +---------+----------+----+-----+
# | device_A|2017-08-05|4041| 3|
# | device_A|2017-08-06|4041| 6|
# | device_A|2017-08-07|4041| 10|
# | device_A|2017-08-08|4041| 10|
# | device_A|2017-08-09|4041| 10|
# | device_A|2017-08-10|4041| 7|
# | device_A|2017-08-11|4045| 5|
# | device_A|2017-08-12|4045| 8|
# | device_A|2017-08-13|4045| 9|
# | device_A|2017-08-14|4045| 6|
# | device_A|2017-08-15|4045| 3|
# +---------+----------+----+-----+

关于apache-spark - Spark : Find the value with the highest occurrence per group over rolling time window,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48689383/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com