gpt4 book ai didi

pyspark 滞后函数(基于列)

转载 作者:行者123 更新时间:2023-12-05 01:01:06 25 4
gpt4 key购买 nike

我想实现以下目标

lag(column1,datediff(column2,column3)).over(window)

偏移量是动态的。我也尝试过使用UDF,但它没有用。

对于如何实现上述任何想法?

最佳答案

lag 函数的参数 count 采用整数而不是列对象:

psf.lag(col, count=1, default=None)

因此它不能是“动态”值。相反,您可以在列中构建滞后,然后将表与自身连接。

首先让我们创建我们的数据框:

df = spark.createDataFrame(
sc.parallelize(
[[1, "2011-01-01"], [1, "2012-01-01"], [2, "2013-01-01"], [1, "2014-01-01"]]
),
["int", "date"]
)

我们要枚举行:

from pyspark.sql import Window
import pyspark.sql.functions as psf
df = df.withColumn(
"id",
psf.monotonically_increasing_id()
)
w = Window.orderBy("id")
df = df.withColumn("rn", psf.row_number().over(w))
+---+----------+-----------+---+
|int| date| id| rn|
+---+----------+-----------+---+
| 1|2011-01-01|17179869184| 1|
| 1|2012-01-01|42949672960| 2|
| 2|2013-01-01|68719476736| 3|
| 1|2014-01-01|94489280512| 4|
+---+----------+-----------+---+

现在构建滞后:

df1 = df.select(
"int",
df.date.alias("date1"),
(df.rn - df.int).alias("rn")
)
df2 = df.select(
df.date.alias("date2"),
'rn'
)

最后我们可以加入它们并计算日期差:

df1.join(df2, "rn", "inner").withColumn(
"date_diff",
psf.datediff("date1", "date2")
).drop("rn")

+---+----------+----------+---------+
|int| date1| date2|date_diff|
+---+----------+----------+---------+
| 1|2012-01-01|2011-01-01| 365|
| 2|2013-01-01|2011-01-01| 731|
| 1|2014-01-01|2013-01-01| 365|
+---+----------+----------+---------+

关于pyspark 滞后函数(基于列),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45961164/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com