gpt4 book ai didi

scala - 一对行上的 Spark Dataframe 滑动窗口

转载 作者:行者123 更新时间:2023-12-01 09:18:56 27 4
gpt4 key购买 nike

我在 csv 中有一个事件日志,由三列 timestampeventIduserId 组成。

我想要做的是将一个新列 nextEventId 附加到数据帧。

一个示例事件日志:

eventlog = sqlContext.createDataFrame(Array((20160101, 1, 0),(20160102,3,1),(20160201,4,1),(20160202, 2,0))).toDF("timestamp", "eventId", "userId")
eventlog.show(4)

|timestamp|eventId|userId|
+---------+-------+------+
| 20160101| 1| 0|
| 20160102| 3| 1|
| 20160201| 4| 1|
| 20160202| 2| 0|
+---------+-------+------+

想要的最终结果是:
|timestamp|eventId|userId|nextEventId|
+---------+-------+------+-----------+
| 20160101| 1| 0| 2|
| 20160102| 3| 1| 4|
| 20160201| 4| 1| Nil|
| 20160202| 2| 0| Nil|
+---------+-------+------+-----------+

到目前为止,我一直在使用滑动窗口,但不知道如何比较 2 行...
val w = Window.partitionBy("userId").orderBy(asc("timestamp")) //should be a sliding window over 2 rows...
val nextNodes = second($"eventId").over(w) //should work if there are only 2 rows

最佳答案

您正在寻找的是 lead (或 lag )。使用您已经定义的窗口:

import org.apache.spark.sql.functions.lead

eventlog.withColumn("nextEventId", lead("eventId", 1).over(w))

对于真正的滑动窗口(如滑动平均),您可以使用窗口定义的 rowsBetweenrangeBetween 子句,但这里并不是真正需要的。尽管如此,示例用法可能是这样的:
val w2 =  Window.partitionBy("userId")
.orderBy(asc("timestamp"))
.rowsBetween(-1, 0)

avg($"foo").over(w2)

关于scala - 一对行上的 Spark Dataframe 滑动窗口,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37703063/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com