gpt4 book ai didi

scala - Spark SQL 窗口函数前瞻和复函数

转载 作者:行者123 更新时间:2023-12-02 01:10:18 28 4
gpt4 key购买 nike

我有以下数据:

+-----+----+-----+
|event|t |type |
+-----+----+-----+
| A |20 | 1 |
| A |40 | 1 |
| B |10 | 1 |
| B |20 | 1 |
| B |120 | 1 |
| B |140 | 1 |
| B |320 | 1 |
| B |340 | 1 |
| B |360 | 7 |
| B |380 | 1 |
+-----+-----+----+

我想要的是这样的:

+-----+----+----+
|event|t |grp |
+-----+----+----+
| A |20 |1 |
| A |40 |1 |
| B |10 |2 |
| B |20 |2 |
| B |120 |3 |
| B |140 |3 |
| B |320 |4 |
| B |340 |4 |
| B |380 |5 |
+-----+----+----+

规则:

  1. 将彼此相距至少 50 毫秒的所有值组合在一起。 (t 列)属于同一事件。
  2. 当出现类型 7 的一行时,也进行切割并删除这一行。 (见最后一行)

我可以通过 this thread 的答案实现的第一条规则:

代码:

val windowSpec= Window.partitionBy("event").orderBy("t")

val newSession = (coalesce(
($"t" - lag($"t", 1).over(windowSpec)),
lit(0)
) > 50).cast("bigint")

val sessionized = df.withColumn("session", sum(newSession).over(userWindow))

我不得不说我无法弄清楚它是如何工作的,也不知道如何修改它以便规则 2 也能工作...希望有人能给我一些有用的提示。

我尝试过的:

val newSession =  (coalesce(
($"t" - lag($"t", 1).over(windowSpec)),
lit(0)
) > 50 || lead($"type",1).over(windowSpec) =!= 7 ).cast("bigint")

但只发生错误:“必须遵循方法;不能遵循 org.apache.spark.sql.Column val grp = (coalesce(

最佳答案

这应该可以解决问题:

val newSession =  (coalesce(
($"t" - lag($"t", 1).over(win)),
lit(0)
) > 50
or $"type"===7) // also start new group in this case
.cast("bigint")

df.withColumn("session", sum(newSession).over(win))
.where($"type"=!=7) // remove these rows
.orderBy($"event",$"t")
.show

给出:

+-----+---+----+-------+
|event| t|type|session|
+-----+---+----+-------+
| A| 20| 1| 0|
| A| 40| 1| 0|
| B| 10| 1| 0|
| B| 20| 1| 0|
| B|120| 1| 1|
| B|140| 1| 1|
| B|320| 1| 2|
| B|340| 1| 2|
| B|380| 1| 3|
+-----+---+----+-------+

关于scala - Spark SQL 窗口函数前瞻和复函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45406971/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com