gpt4 book ai didi

scala - 非时间戳列上的 Spark 结构化流窗口

转载 作者:行者123 更新时间:2023-12-04 12:39:05 30 4
gpt4 key购买 nike

我正在获取以下形式的数据流:

+--+---------+---+----+
|id|timestamp|val|xxx |
+--+---------+---+----+
|1 |12:15:25 | 50| 1 |
|2 |12:15:25 | 30| 1 |
|3 |12:15:26 | 30| 2 |
|4 |12:15:27 | 50| 2 |
|5 |12:15:27 | 30| 3 |
|6 |12:15:27 | 60| 4 |
|7 |12:15:28 | 50| 5 |
|8 |12:15:30 | 60| 5 |
|9 |12:15:31 | 30| 6 |
|. |... |...|... |

我有兴趣将窗口操作应用于 xxx列就像时间戳上的窗口操作在 Spark Streaming 中可用,具有一些窗口大小和滑动步长。

让我们在 groupBy带窗函数, lines表示窗口大小为 2 且滑动步长为 1 的流数据帧。
val c_windowed_count = lines.groupBy(
window($"xxx", "2", "1"), $"val").count().orderBy("xxx")

因此,输出应如下所示:
+------+---+-----+
|window|val|count|
+------+---+-----+
|[1, 3]|50 | 2 |
|[1, 3]|30 | 2 |
|[2, 4]|30 | 2 |
|[2, 4]|50 | 1 |
|[3, 5]|30 | 1 |
|[3, 5]|60 | 1 |
|[4, 6]|60 | 2 |
|[4, 6]|50 | 1 |
|... |.. | .. |

我尝试使用 partitionBy但它在 Spark Structured Streaming 中不受支持。

我正在使用 Spark Structured Streaming 2.3.1。

谢谢!

最佳答案

目前无法使用 Spark Structured Streaming 以这种方式在非时间戳列上使用窗口。但是,您可以做的是转换 xxx列到时间戳列 ,做 groupBycount ,然后变回来。
from_unixtime可用于将自 1970-01-01 以来的秒数转换为时间戳。使用 xxx列作为秒,可以创建一个假时间戳以在窗口中使用:

lines.groupBy(window(from_unixtime($"xxx"), "2 seconds", "1 seconds"), $"val").count()
.withColumn("window", struct(unix_timestamp($"window.start"), unix_timestamp($"window.end")).as("window"))
.filter($"window.col1" =!= 0)
.orderBy($"window.col1")

上面,分组是在转换后的时间戳上完成的,下一行将把它转换回原来的数字。过滤器已完成,因为前两行将是一个窗口 [0,2] (即仅在 xxx 等于 1 的行上)但可以跳过。

上述输入的结果输出:
+------+---+-----+
|window|val|count|
+------+---+-----+
| [1,3]| 50| 2|
| [1,3]| 30| 2|
| [2,4]| 30| 2|
| [2,4]| 50| 1|
| [3,5]| 30| 1|
| [3,5]| 60| 1|
| [4,6]| 60| 2|
| [4,6]| 50| 1|
| [5,7]| 30| 1|
| [5,7]| 60| 1|
| [5,7]| 50| 1|
| [6,8]| 30| 1|
+------+---+-----+

关于scala - 非时间戳列上的 Spark 结构化流窗口,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52290069/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com