gpt4 book ai didi

sql - Spark SQL中如何按时间间隔分组

转载 作者:行者123 更新时间:2023-12-01 21:12:19 24 4
gpt4 key购买 nike

我的数据集如下所示:

KEY |Event_Type | metric | Time 
001 |event1 | 10 | 2016-05-01 10:50:51
002 |event2 | 100 | 2016-05-01 10:50:53
001 |event3 | 20 | 2016-05-01 10:50:55
001 |event1 | 15 | 2016-05-01 10:51:50
003 |event1 | 13 | 2016-05-01 10:55:30
001 |event2 | 12 | 2016-05-01 10:57:00
001 |event3 | 11 | 2016-05-01 11:00:01

我想获取所有验证这一点的 key :

“特定事件的指标总和”> 5 分钟期间的阈值

在我看来,这是使用滑动窗口函数的完美候选者。

如何使用 Spark SQL 执行此操作?

谢谢。

最佳答案

Spark >= 2.0

您可以使用window (不要与窗口函数混淆)。根据一种变体,它会将时间戳分配给另一个可能重叠的存储桶:

df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")

// +---+---------------------------------------------+-----------+
// |KEY|window |sum(metric)|
// +---+---------------------------------------------+-----------+
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45 |
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12 |
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13 |
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11 |
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100 |
// +---+---------------------------------------------+-----------+

Spark <2.0

让我们从示例数据开始:

import spark.implicits._  // import sqlContext.implicits._ in Spark < 2.0

val df = Seq(
("001", "event1", 10, "2016-05-01 10:50:51"),
("002", "event2", 100, "2016-05-01 10:50:53"),
("001", "event3", 20, "2016-05-01 10:50:55"),
("001", "event1", 15, "2016-05-01 10:51:50"),
("003", "event1", 13, "2016-05-01 10:55:30"),
("001", "event2", 12, "2016-05-01 10:57:00"),
("001", "event3", 11, "2016-05-01 11:00:01")
).toDF("KEY", "Event_Type", "metric", "Time")

我假设该事件由KEY标识。如果不是这种情况,您可以根据您的要求调整 GROUP BY/PARTITION BY 子句。

如果您对独立于数据的静态窗口的聚合感兴趣,请将时间戳转换为数字数据类型并进行舍入

import org.apache.spark.sql.functions.{round, sum}

// cast string to timestamp_seconds
val ts = $"Time".cast("timestamp").cast("long")

// Round to 300 seconds interval
// In Spark >= 3.1 replace cast("timestamp") with
val interval = (round(ts / 300L) * 300.0).cast("timestamp").alias("interval")

df.groupBy($"KEY", interval).sum("metric")

// +---+---------------------+-----------+
// |KEY|interval |sum(metric)|
// +---+---------------------+-----------+
// |001|2016-05-01 11:00:00.0|11 |
// |001|2016-05-01 10:55:00.0|12 |
// |001|2016-05-01 10:50:00.0|45 |
// |003|2016-05-01 10:55:00.0|13 |
// |002|2016-05-01 10:50:00.0|100 |
// +---+---------------------+-----------+

如果您对相对于当前行的窗口感兴趣,请使用窗口函数:

import org.apache.spark.sql.expressions.Window

// Partition by KEY
// Order by timestamp
// Consider window of -150 seconds to + 150 seconds relative to the current row
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150)
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))

// +---+----------+------+-------------------+----------+----------+
// |KEY|Event_Type|metric|Time |ts |window_sum|
// +---+----------+------+-------------------+----------+----------+
// |003|event1 |13 |2016-05-01 10:55:30|1462092930|13 |
// |001|event1 |10 |2016-05-01 10:50:51|1462092651|45 |
// |001|event3 |20 |2016-05-01 10:50:55|1462092655|45 |
// |001|event1 |15 |2016-05-01 10:51:50|1462092710|45 |
// |001|event2 |12 |2016-05-01 10:57:00|1462093020|12 |
// |001|event3 |11 |2016-05-01 11:00:01|1462093201|11 |
// |002|event2 |100 |2016-05-01 10:50:53|1462092653|100 |
// +---+----------+------+-------------------+----------+----------+

出于性能原因,仅当数据可以划分为多个单独的组时,此方法才有用。在 Spark < 2.0.0 中,您还需要 HiveContext 才能使其正常工作。

关于sql - Spark SQL中如何按时间间隔分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37632238/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com