gpt4 book ai didi

python - Spark SQL 分区依据、窗口、排序依据、计数

转载 作者:行者123 更新时间:2023-11-29 16:33:59 25 4
gpt4 key购买 nike

假设我有一个包含杂志订阅信息的数据框:

subscription_id    user_id       created_at       expiration_date
12384 1 2018-08-10 2018-12-10
83294 1 2018-06-03 2018-10-03
98234 1 2018-04-08 2018-08-08
24903 2 2018-05-08 2018-07-08
32843 2 2018-03-25 2018-05-25
09283 2 2018-04-07 2018-06-07

现在我想添加一列,用于说明在当前订阅开始之前用户有多少个先前订阅已过期。换句话说,与给定用户关联的到期日期在此订阅的开始日期之前有多少个。这是所需的完整输出:

subscription_id    user_id       created_at       expiration_date   previous_expired
12384 1 2018-08-10 2018-12-10 1
83294 1 2018-06-03 2018-10-03 0
98234 1 2018-04-08 2018-08-08 0
24903 2 2018-05-08 2018-07-08 2
32843 2 2018-03-25 2018-05-03 1
09283 2 2018-01-25 2018-02-25 0

尝试:

编辑:使用Python尝试了各种滞后/超前/等,我现在认为这是一个SQL问题

df = df.withColumn('shiftlag', func.lag(df.expires_at).over(Window.partitionBy('user_id').orderBy('created_at')))

<---编辑,编辑:没关系,这不起作用

我想我用尽了滞后/超前/平移方法,发现它不起作用。我现在认为最好使用 Spark SQL 来完成此操作,也许使用 case when 来生成新列,并结合 having count,按 ID 分组?

最佳答案

使用 PySpark 解决了这个问题:

我首先创建了另一列,其中包含每个用户的所有到期日期的数组:

joined_array = df.groupBy('user_id').agg(collect_set('expiration_date'))

然后将该数组连接回原始数据帧:

joined_array = joined_array.toDF('user_idDROP', 'expiration_date_array')
df = df.join(joined_array, df.user_id == joined_array.user_idDROP, how = 'left').drop('user_idDROP')

然后创建一个函数来迭代数组,如果创建日期大于到期日期,则将计数加 1:

def check_expiration_count(created_at, expiration_array):
if not expiration_array:
return 0
else:
count = 0
for i in expiration_array:
if created_at > i:
count += 1
return count

check_expiration_count = udf(check_expiration_count, IntegerType())

然后应用该函数创建一个具有正确计数的新列:

df = df.withColumn('count_of_subs_ending_before_creation', check_expiration_count(df.created_at, df.expiration_array))

瓦拉。完毕。谢谢大家(没有人帮忙,但还是谢谢)。希望 2022 年有人发现这有用

关于python - Spark SQL 分区依据、窗口、排序依据、计数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53730092/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com