gpt4 book ai didi

apache-spark - 使用 Spark structured streaming 时,如何像 Spark Streaming 一样只获取当前批处理的聚合结果?

转载 作者:行者123 更新时间:2023-12-05 04:07:42 25 4
gpt4 key购买 nike

Spark Structure Streaming (SSS) 和 Spark Streaming (SS) 之间的一大不同是 SSS 可以利用 statestore。它可以存储以前批处理的聚合结果,并将当前结果与以前的结果一起应用。因此它可以从输入流的最开始就得到真正的聚合结果。

但对于一种情况,我们不想获得与 statestore 的先前值合并的最终结果。我们只想获取(输出)当前批处理的聚合结果。由于平台和框架的原因,我们无法回滚到 SS。

所以我的问题是,在 SSS 中是否仍然可以像 SS 一样获取当前批处理的聚合结果?

以spark structure streaming guide中给出的字数统计应用为例: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

当一批出现“cat cat”时,我的预期输出是cat|2

当下一批出现“cat”时,我的预期输出是cat|1

最佳答案

is it still doable in SSS to get the aggretation result of current batch, like SS?

实现您想要的方法的一种方法是使用 mapGroupsWithState 自己控制状态存储,并将其用作一种实际上不执行任何操作的退化存储。例如:

val spark =
SparkSession.builder().appName("bla").master("local[*]").getOrCreate()

import spark.implicits._

val socketDF = spark.readStream
.format("socket")
.option("host", "127.0.0.1")
.option("port", 9999)
.load()

socketDF
.as[String]
.map { str =>
val Array(key, value) = str.split(';')
(key, value)
}
.groupByKey { case (key, _) => key }
.mapGroupsWithState((str: String,
tuples: Iterator[(String, String)],
value: GroupState[Int]) => {
(str, tuples.size)
})
.writeStream
.outputMode(OutputMode.Update())
.format("console")
.start()
.awaitTermination()

假设我有一个格式为 key;value 的值流,这将只使用 mapGroupsWithState 作为传递存储,实际上不会累积任何结果。这样,对于每个批处理,您都会获得一个没有以前聚合数据的干净状态。

关于apache-spark - 使用 Spark structured streaming 时,如何像 Spark Streaming 一样只获取当前批处理的聚合结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48404960/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com