gpt4 book ai didi

apache-spark - Spark 窗口函数最后一个非空值

转载 作者:行者123 更新时间:2023-12-02 00:17:18 25 4
gpt4 key购买 nike

我们有一个用户事件的时间序列数据库,如下所示:

timestamp             user_id     event            ticke_type     error_type 
2019-06-06 14:33:31 user_a choose_ticket ticke_b NULL
2019-06-06 14:34:31 user_b choose_ticket ticke_f NULL
2019-06-06 14:36:31 user_a booing_error NULL error_c
2019-06-06 14:37:31 user_a choose_ticket ticke_h NULL
2019-06-06 14:38:31 user_a booing_error NULL error_d
2019-06-06 14:39:31 user_a booing_error NULL error_e

这是我们需要的一个用例:

为了调查导致预订错误的机票类型,我们将不得不查看机票类型,仅在较早的事件 choose_ticket 中可用。

在这种情况下,我们要查找的是针对每个 booking_error 事件,找到之前的 choose_ticket 事件对于同一用户,并将那里的机票类型合并到 booking_error 事件。

理想情况下,我们想要的输出是:

timestamp             user_id     event            ticke_type     error_type 
2019-06-06 14:36:31 user_a booing_error ticke_b error_c
2019-06-06 14:38:31 user_a booing_error ticke_h error_d
2019-06-06 14:39:31 user_a booing_error ticke_h error_e

我能找到的最接近的是 Spark add new column to dataframe with value from previous row ,这使我们能够从之前的事件中获取属性并立即将其应用到之后的事件中。

这几乎可以工作,除了当有多个事件(本例中为 booing_error)时,在这种情况下只有第一个事件可以获得所需的属性。例如,这是我们将从上面的 SO 链接获得的解决方案:

timestamp             user_id     event            ticke_type     error_type 
2019-06-06 14:36:31 user_a booing_error ticke_b error_c
2019-06-06 14:38:31 user_a booing_error ticke_h error_d
2019-06-06 14:39:31 user_a booing_error NULL error_e

总而言之,对于给定的行,如何找到符合特定条件的前一行并“挑选”其属性?

执行此操作的最佳方法是什么?

最佳答案

org.apache.spark.sql.functions.last 就是您要找的。您可以重命名“最近”列以替换最后的 ticke_type。

scala> df.show
+-------------------+-------+-------------+----------+----------+
| timestamp|user_id| event|ticke_type|error_type|
+-------------------+-------+-------------+----------+----------+
|2019-06-06 14:33:31| user_a|choose_ticket| ticke_b| null|
|2019-06-06 14:34:31| user_b|choose_ticket| ticke_f| null|
|2019-06-06 14:36:31| user_a|booking_error| null| error_c|
|2019-06-06 14:37:31| user_a|choose_ticket| ticke_h| null|
|2019-06-06 14:38:31| user_a|booking_error| null| error_d|
|2019-06-06 14:39:31| user_a|booking_error| null| error_e|
+-------------------+-------+-------------+----------+----------+

scala> val overColumns = Window.partitionBy("user_id").orderBy("timestamp")
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@70dc8c9a

scala> df.withColumn("closest",
org.apache.spark.sql.functions.last("ticke_type", true).over(overColumns)).filter($"event" === "booking_error").show
+-------------------+-------+-------------+----------+----------+-------+
| timestamp|user_id| event|ticke_type|error_type|closest|
+-------------------+-------+-------------+----------+----------+-------+
|2019-06-06 14:36:31| user_a|booking_error| null| error_c|ticke_b|
|2019-06-06 14:38:31| user_a|booking_error| null| error_d|ticke_h|
|2019-06-06 14:39:31| user_a|booking_error| null| error_e|ticke_h|
+-------------------+-------+-------------+----------+----------+-------+

关于apache-spark - Spark 窗口函数最后一个非空值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56637690/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com