gpt4 book ai didi

sql - 如何使用 Spark Scala 或 sql 对特定时间间隔内的记录进行分组?

转载 作者:行者123 更新时间:2023-12-05 06:27:13 24 4
gpt4 key购买 nike

我只想在 scala 中对具有相同 ID 且时间相差不超过 1 分钟的记录进行分组。

我在概念上是这样想的?但我不是很确定

HAVING a.ID = b.ID AND a.time + 30 sec > b.time AND a.time - 30 sec < b.time




| ID | volume | Time |
|:-----------|------------:|:--------------------------:|
| 1 | 10 | 2019-02-17T12:00:34Z |
| 2 | 20 | 2019-02-17T11:10:46Z |
| 3 | 30 | 2019-02-17T13:23:34Z |
| 1 | 40 | 2019-02-17T12:01:02Z |
| 2 | 50 | 2019-02-17T11:10:30Z |
| 1 | 60 | 2019-02-17T12:01:57Z |

为此:

| ID         |     volume  | 
|:-----------|------------:|
| 1 | 50 | // (10+40)
| 2 | 70 | // (20+50)
| 3 | 30 |


df.groupBy($"ID", window($"Time", "1 minutes")).sum("volume")

上面的代码是 1 个解决方案,但它总是四舍五入。

例如 2019-02-17T12:00:45Z 的范围是

2019-02-17T12:00:00Z TO 2019-02-17T12:01:00Z.

我正在寻找这个:2019-02-17T11:45:00Z 到 2019-02-17T12:01:45Z。

有办法吗?

最佳答案

org.apache.spark.sql.functions 提供如下重载窗口函数。

<强>1。 window(timeColumn: Column, windowDuration: String) : 在给定时间戳指定列的情况下生成翻转时间窗口。窗口开始是包容性的,但窗口结束是排他性的,例如12:05 将在 [12:05,12:10) 窗口中,但不在 [12:00,12:05) 中。

窗口看起来像:

  {{{
09:00:00-09:01:00
09:01:00-09:02:00
09:02:00-09:03:00 ...
}}}

<强>2。窗口((timeColumn:列,windowDuration:字符串,slideDuration:字符串): 给定时间戳指定列,将行分桶到一个或多个时间窗口中。窗口开始是包容性的,但窗口结束是排他性的,例如12:05 将在窗口 [12:05,12:10) 中,但不在 [12:00,12:05) 中。 slideDuration 指定窗口滑动间隔的参数,例如1 分钟。每 slideDuration 将生成一个新窗口。必须小于或等于 windowDuration

窗口看起来像:

{{{
09:00:00-09:01:00
09:00:10-09:01:10
09:00:20-09:01:20 ...
}}}

<强>3。 window((timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): 给定指定列的时间戳,将行分桶到一个或多个时间窗口中。窗口开始是包含的,但窗口结束是排他的,例如12:05 将在 [12:05,12:10) 窗口中,但不在 [12:00,12:05) 中。

窗口看起来像:

{{{
09:00:05-09:01:05
09:00:15-09:01:15
09:00:25-09:01:25 ...
}}}

例如,为了让每小时滚动的窗口在整点后 15 分钟开始,例如12:15-13:15、13:15-14:15... 将 startTime 提供为 15 分钟这是满足您要求的完美重载窗口函数。

请找到如下工作代码。

import org.apache.spark.sql.SparkSession

object SparkWindowTest extends App {

val spark = SparkSession
.builder()
.master("local")
.appName("File_Streaming")
.getOrCreate()

import spark.implicits._
import org.apache.spark.sql.functions._

//Prepare Test Data
val df = Seq((1, 10, "2019-02-17 12:00:49"), (2, 20, "2019-02-17 11:10:46"),
(3, 30, "2019-02-17 13:23:34"),(2, 50, "2019-02-17 11:10:30"),
(1, 40, "2019-02-17 12:01:02"), (1, 60, "2019-02-17 12:01:57"))
.toDF("ID", "Volume", "TimeString")

df.show()
df.printSchema()

+---+------+-------------------+
| ID|Volume| TimeString|
+---+------+-------------------+
| 1| 10|2019-02-17 12:00:49|
| 2| 20|2019-02-17 11:10:46|
| 3| 30|2019-02-17 13:23:34|
| 2| 50|2019-02-17 11:10:30|
| 1| 40|2019-02-17 12:01:02|
| 1| 60|2019-02-17 12:01:57|
+---+------+-------------------+

root
|-- ID: integer (nullable = false)
|-- Volume: integer (nullable = false)
|-- TimeString: string (nullable = true)

//Converted String Timestamp into Timestamp
val modifiedDF = df.withColumn("Time", to_timestamp($"TimeString"))

//Dropped String Timestamp from DF
val modifiedDF1 = modifiedDF.drop("TimeString")

modifiedDF.show(false)
modifiedDF.printSchema()

+---+------+-------------------+-------------------+
|ID |Volume|TimeString |Time |
+---+------+-------------------+-------------------+
|1 |10 |2019-02-17 12:00:49|2019-02-17 12:00:49|
|2 |20 |2019-02-17 11:10:46|2019-02-17 11:10:46|
|3 |30 |2019-02-17 13:23:34|2019-02-17 13:23:34|
|2 |50 |2019-02-17 11:10:30|2019-02-17 11:10:30|
|1 |40 |2019-02-17 12:01:02|2019-02-17 12:01:02|
|1 |60 |2019-02-17 12:01:57|2019-02-17 12:01:57|
+---+------+-------------------+-------------------+

root
|-- ID: integer (nullable = false)
|-- Volume: integer (nullable = false)
|-- TimeString: string (nullable = true)
|-- Time: timestamp (nullable = true)

modifiedDF1.show(false)
modifiedDF1.printSchema()

+---+------+-------------------+
|ID |Volume|Time |
+---+------+-------------------+
|1 |10 |2019-02-17 12:00:49|
|2 |20 |2019-02-17 11:10:46|
|3 |30 |2019-02-17 13:23:34|
|2 |50 |2019-02-17 11:10:30|
|1 |40 |2019-02-17 12:01:02|
|1 |60 |2019-02-17 12:01:57|
+---+------+-------------------+

root
|-- ID: integer (nullable = false)
|-- Volume: integer (nullable = false)
|-- Time: timestamp (nullable = true)

//Main logic
val modifiedDF2 = modifiedDF1.groupBy($"ID", window($"Time", "1 minutes","1 minutes","45 seconds")).sum("Volume")

//Renamed all columns of DF.
val newNames = Seq("ID", "WINDOW", "VOLUME")
val finalDF = modifiedDF2.toDF(newNames: _*)

finalDF.show(false)

+---+---------------------------------------------+------+
|ID |WINDOW |VOLUME|
+---+---------------------------------------------+------+
|2 |[2019-02-17 11:09:45.0,2019-02-17 11:10:45.0]|50 |
|1 |[2019-02-17 12:01:45.0,2019-02-17 12:02:45.0]|60 |
|1 |[2019-02-17 12:00:45.0,2019-02-17 12:01:45.0]|50 |
|3 |[2019-02-17 13:22:45.0,2019-02-17 13:23:45.0]|30 |
|2 |[2019-02-17 11:10:45.0,2019-02-17 11:11:45.0]|20 |
+---+---------------------------------------------+------+

}

关于sql - 如何使用 Spark Scala 或 sql 对特定时间间隔内的记录进行分组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55403088/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com