gpt4 book ai didi

python - 动态增量值pyspark数据帧

转载 作者:太空宇宙 更新时间:2023-11-03 14:43:08 24 4
gpt4 key购买 nike

我有以下数据框

+--------------------+----------------+----------------+-----------+---+
| patient_gid|interaction_desc|interaction_date|rx_end_date|rnk|
+--------------------+----------------+----------------+-----------+---+
|00000000000072128380| prod1| 2009-02-23| 2009-05-22| 1|
|00000000000072128380| prod1| 2010-04-05| 2009-05-22| 2|
|00000000000072128380| prod1| 2009-03-23| 2009-05-22| 3|
|00000000000072128380| prod1| 2009-04-20| 2009-05-22| 4|
|00000000000072128380| prod1| 2009-05-16| 2009-05-22| 5|
|00000000000072128380| prod1| 2009-06-17| 2009-05-22| 6|
|00000000000072128380| prod1| 2009-07-15| 2009-05-22| 7|
|00000000000072128380| prod1| 2009-08-12| 2009-05-22| 8|
|00000000000072128380| prod1| 2009-09-05| 2009-05-22| 9|
|00000000000072128380| prod1| 2009-10-06| 2009-05-22| 10|
|00000000000072128380| prod2| 2009-10-28| 2009-05-22| 1|
|00000000000072128380| prod2| 2009-12-02| 2009-05-22| 2|
|00000000000072128380| prod2| 2010-05-10| 2009-05-22| 3|
|00000000000072128380| prod2| 2008-05-22| 2009-05-22| 4|
|00000000000072128380| prod2| 2010-07-06| 2009-05-22| 5|
|00000000000072128380| prod2| 2010-08-03| 2009-05-22| 6|
|00000000000072128380| prod2| 2010-09-23| 2009-05-22| 7|
|00000000000072128380| prod2| 2010-10-20| 2009-05-22| 8|
|00000000000072128380| prod2| 2010-01-29| 2009-05-22| 9|
|00000000000072128380| prod2| 2008-05-22| 2009-05-22| 10|
+--------------------+----------------+----------------+-----------+---+

用例:我想添加具有以下逻辑的新专栏剧集如果排名为 1 Episode =1 。如果排名 > 1 并且乘积相同且交互日期 > rx_end_date 则剧集 = 上一集 + 1,否则剧集 = 上一集

预期结果是

+--------------------+----------------+----------------+-----------+---+-------+
| patient_gid|interaction_desc|interaction_date|rx_end_date|rnk|episode|
+--------------------+----------------+----------------+-----------+---+-------+
|00000000000072128380| prod1| 2009-02-23| 2009-05-22| 1| 1|
|00000000000072128380| prod1| 2010-04-05| 2009-05-22| 2| 2|
|00000000000072128380| prod1| 2009-03-23| 2009-05-22| 3| 2|
|00000000000072128380| prod1| 2009-04-20| 2009-05-22| 4| 2|
|00000000000072128380| prod1| 2009-05-16| 2009-05-22| 5| 2|
|00000000000072128380| prod1| 2009-06-17| 2009-05-22| 6| 3|
|00000000000072128380| prod1| 2009-07-15| 2009-05-22| 7| 4|
|00000000000072128380| prod1| 2009-08-12| 2009-05-22| 8| 5|
|00000000000072128380| prod1| 2009-09-05| 2009-05-22| 9| 6|
|00000000000072128380| prod1| 2009-10-06| 2009-05-22| 10| 7|
|00000000000072128380| prod2| 2009-10-28| 2009-05-22| 1| 1|
|00000000000072128380| prod2| 2009-12-02| 2009-05-22| 2| 2|
|00000000000072128380| prod2| 2010-05-10| 2009-05-22| 3| 3|
|00000000000072128380| prod2| 2008-05-22| 2009-05-22| 4| 3|
|00000000000072128380| prod2| 2010-07-06| 2009-05-22| 5| 4|
|00000000000072128380| prod2| 2010-08-03| 2009-05-22| 6| 5|
|00000000000072128380| prod2| 2010-09-23| 2009-05-22| 7| 6|
|00000000000072128380| prod2| 2010-10-20| 2009-05-22| 8| 7|
|00000000000072128380| prod2| 2010-01-29| 2009-05-22| 9| 8|
|00000000000072128380| prod2| 2008-05-22| 2009-05-22| 10| 8|
+--------------------+----------------+----------------+-----------+---+-------+

我想使用 Spark 窗口函数来实现上述逻辑,或者使用任何 Spark 数据框函数来执行此操作?

最佳答案

希望这有帮助!

from pyspark.sql.functions import col, when, lag, last
from pyspark.sql.window import Window
import sys

df = sc.parallelize([
['00000000000072128380', 'prod1', '2009-02-23', '2009-05-22', 1],
['00000000000072128380', 'prod1', '2010-04-05', '2009-05-22', 2],
['00000000000072128380', 'prod1', '2009-03-23', '2009-05-22', 3],
['00000000000072128380', 'prod1', '2009-04-20', '2009-05-22', 4],
['00000000000072128380', 'prod1', '2009-05-16', '2009-05-22', 5],
['00000000000072128380', 'prod1', '2009-06-17', '2009-05-22', 6],
['00000000000072128380', 'prod1', '2009-07-15', '2009-05-22', 7],
['00000000000072128380', 'prod1', '2009-08-12', '2009-05-22', 8],
['00000000000072128380', 'prod1', '2009-09-05', '2009-05-22', 9],
['00000000000072128380', 'prod1', '2009-10-06', '2009-05-22', 10],
['00000000000072128380', 'prod2', '2009-10-28', '2009-05-22', 1],
['00000000000072128380', 'prod2', '2009-12-02', '2009-05-22', 2],
['00000000000072128380', 'prod2', '2010-05-10', '2009-05-22', 3],
['00000000000072128380', 'prod2', '2008-05-22', '2009-05-22', 4],
['00000000000072128380', 'prod2', '2010-07-06', '2009-05-22', 5],
['00000000000072128380', 'prod2', '2010-08-03', '2009-05-22', 6],
['00000000000072128380', 'prod2', '2010-09-23', '2009-05-22', 7],
['00000000000072128380', 'prod2', '2010-10-20', '2009-05-22', 8],
['00000000000072128380', 'prod2', '2010-01-29', '2009-05-22', 9],
['00000000000072128380', 'prod2', '2008-05-22', '2009-05-22', 10]]).toDF(('patient_gid','interaction_desc', 'interaction_date', 'rx_end_date', 'rnk'))

w = Window.partitionBy(col("interaction_desc")).orderBy(col("rnk"))
df1 = df.withColumn("episode_temp",
when(col('rnk')==1, 1).
when((col('rnk')>1) &
(col('interaction_desc') == lag("interaction_desc").over(w)) &
(col('interaction_date') > col('rx_end_date')), col('rnk')).
otherwise(None))
df1 = df1.withColumn("episode", last('episode_temp', True).over(w.rowsBetween(-sys.maxsize, 0))).drop('episode_temp')
df1.show()

输出为

+--------------------+----------------+----------------+-----------+---+-------+
| patient_gid|interaction_desc|interaction_date|rx_end_date|rnk|episode|
+--------------------+----------------+----------------+-----------+---+-------+
|00000000000072128380| prod1| 2009-02-23| 2009-05-22| 1| 1|
|00000000000072128380| prod1| 2010-04-05| 2009-05-22| 2| 2|
|00000000000072128380| prod1| 2009-03-23| 2009-05-22| 3| 2|
|00000000000072128380| prod1| 2009-04-20| 2009-05-22| 4| 2|
|00000000000072128380| prod1| 2009-05-16| 2009-05-22| 5| 2|
|00000000000072128380| prod1| 2009-06-17| 2009-05-22| 6| 6|
|00000000000072128380| prod1| 2009-07-15| 2009-05-22| 7| 7|
|00000000000072128380| prod1| 2009-08-12| 2009-05-22| 8| 8|
|00000000000072128380| prod1| 2009-09-05| 2009-05-22| 9| 9|
|00000000000072128380| prod1| 2009-10-06| 2009-05-22| 10| 10|
|00000000000072128380| prod2| 2009-10-28| 2009-05-22| 1| 1|
|00000000000072128380| prod2| 2009-12-02| 2009-05-22| 2| 2|
|00000000000072128380| prod2| 2010-05-10| 2009-05-22| 3| 3|
|00000000000072128380| prod2| 2008-05-22| 2009-05-22| 4| 3|
|00000000000072128380| prod2| 2010-07-06| 2009-05-22| 5| 5|
|00000000000072128380| prod2| 2010-08-03| 2009-05-22| 6| 6|
|00000000000072128380| prod2| 2010-09-23| 2009-05-22| 7| 7|
|00000000000072128380| prod2| 2010-10-20| 2009-05-22| 8| 8|
|00000000000072128380| prod2| 2010-01-29| 2009-05-22| 9| 9|
|00000000000072128380| prod2| 2008-05-22| 2009-05-22| 10| 9|
+--------------------+----------------+----------------+-----------+---+-------+

输出与您想要的并不完全相同,但相似,更重要的是它是单调递增的。

关于python - 动态增量值pyspark数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46448818/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com