gpt4 book ai didi

python - Pyspark - 如何将 '4 hours' 多个窗口分组聚合

转载 作者:行者123 更新时间:2023-12-01 23:40:21 26 4
gpt4 key购买 nike

我有一个数据集如下:

id  email   Date_of_purchase    time_of_purchase
1 abc@gmail.com 11/10/18 12:10 PM
2 abc@gmail.com 11/10/18 02:11 PM
3 abc@gmail.com 11/10/18 03:14 PM
4 abc@gmail.com 11/11/18 06:16 AM
5 abc@gmail.com 11/11/18 09:10 AM
6 def@gmail.com 11/10/18 12:17 PM
7 def@gmail.com 11/10/18 03:24 PM
8 def@gmail.com 11/10/18 08:16 PM
9 def@gmail.com 11/10/18 09:13 PM
10 def@gmail.com 11/11/18 12:01 AM

我想计算每个电子邮件 ID 在 4 小时内进行的交易数量。例如,电子邮件 ID:abc@gmail.com 从 11/10/18 12.10 PM 到 11/10/18 4.10 PM 进行了 3 笔交易,从 11/11/18 6.16 AM 到 11/11/18 进行了 2 笔交易上午 10.16。电子邮件 ID:def@gmail.com 从 11/10/18 12.17 PM 到 11/10/18 4.17 PM 进行了 2 笔交易,从 11/10/18 8.16 PM 到 11/11/18 12.16 AM 进行了 3 笔交易。

我想要的输出是:

 email          hour_interval                           purchase_in_4_hours
abc@gmail.com [11/10/18 12.10 PM to 11/10/18 4.10 PM] 3
abc@gmail.com [11/11/18 6.16 AM to 11/11/18 10.16 AM] 2
def@gmail.com [11/10/18 12.17 PM to 11/10/18 4.17 PM] 2
def@gmail.com [11/10/18 8.16 PM to 11/11/18 12.16 AM] 3

我的数据集有 1000k 行。我对 Spark 很陌生。任何帮助将不胜感激。附:时间间隔可以从 4 小时更改为 1 小时、6 小时、1 天等。

TIA。

最佳答案

其想法是通过电子邮件对数据进行分区,按日期和时间在每个分区内进行排序,然后将每个分区映射到所需的输出。如果每个分区的数据(=一个电子邮件地址的数据)适合一个 Spark 执行器的内存,则此方法将起作用。

实际的 Spark 逻辑遵循以下步骤

  1. 创建一个包含时间戳的新列
  2. 按电子邮件对数据进行分区,以便具有相同电子邮件的所有行都属于同一分区。请注意,一个分区中可能存在来自多封电子邮件的数据。
  3. 按电子邮件和时间戳对每个分区进行排序。
  4. 处理每个分区。如有必要,根据需要为每个分区生成多个输出
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
from datetime import datetime, timedelta

spark = SparkSession.builder.appName("test").getOrCreate()
df = spark.read.option("header", "true").csv(<path>) #or any other data source
df = df.withColumn("date_time", to_timestamp(concat(col("Date_of_purchase"), lit(" "), col("time_of_purchase")), "MM/dd/yy hh:mm aa")) \
.drop("Date_of_purchase", "time_of_purchase") \
.repartition(col("email")) \
.sortWithinPartitions(col("email"), col("date_time"))

def process_partition(df_chunk):
row_list = list(df_chunk)
if len(row_list) == 0:
return
email = row_list[0]['email']
start = row_list[0]['date_time']
end = start + timedelta(hours=4)
count = 0
for row in row_list:
if email == row['email'] and end > row['date_time']:
count = count +1
else:
yield Row(email, start, end, count)
email = row['email']
start = row['date_time']
end = start + timedelta(hours=4)
count = 1
yield Row(email, start, end, count)

result = df.rdd.mapPartitions(process_partition).toDF(["email", "from", "to", "count"])
result.show()

输出:

+-------------+-------------------+-------------------+-----+
| email| from| to|count|
+-------------+-------------------+-------------------+-----+
|def@gmail.com|2018-11-10 12:17:00|2018-11-10 16:17:00| 2|
|def@gmail.com|2018-11-10 20:16:00|2018-11-11 00:16:00| 3|
|abc@gmail.com|2018-11-10 12:10:00|2018-11-10 16:10:00| 3|
|abc@gmail.com|2018-11-11 06:16:00|2018-11-11 10:16:00| 2|
+-------------+-------------------+-------------------+-----+

要更改周期长度,可以将 timedelta 设置为任何值。

关于python - Pyspark - 如何将 '4 hours' 多个窗口分组聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58221350/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com