gpt4 book ai didi

scala - Akka 流批处理

转载 作者:行者123 更新时间:2023-12-04 01:38:41 25 4
gpt4 key购买 nike

学习 Akka 流。我有一个记录流,每个时间单位很多,已经按时间排序(来自 Slick),我想通过检测时间步长何时更改将它们分批处理到时间组中进行处理。

示例

case class Record(time: Int, payload: String)

如果传入流是

Record(1, "a")
Record(1, "k")
Record(1, "k")
Record(1, "a")
Record(2, "r")
Record(2, "o")
Record(2, "c")
Record(2, "k")
Record(2, "s")
Record(3, "!")
...

我想把它变成

Batch(1, Seq("a","k","k","a"))
Batch(2, Seq("r","o","c","k","s"))
Batch(3, Seq("!"))
...

到目前为止,我只发现按固定数量的记录进行分组,或者分成许多子流,但从我的角度来看,我不需要多个子流。

更新:我找到了batch ,但它看起来更关心背压,而不仅仅是一直批处理。

最佳答案

statefulMapConcat是 Akka Streams 库中的多功能工具。

val records =
Source(List(
Record(1, "a"),
Record(1, "k"),
Record(1, "k"),
Record(1, "a"),
Record(2, "r"),
Record(2, "o"),
Record(2, "c"),
Record(2, "k"),
Record(2, "s"),
Record(3, "!")
))
.concat(Source.single(Record(0, "notused"))) // needed to print the last element

records
.statefulMapConcat { () =>
var currentTime = 0
var payloads: Seq[String] = Nil

record =>
if (record.time == currentTime) {
payloads = payloads :+ record.payload
Nil
} else {
val previousState = (currentTime, payloads)
currentTime = record.time
payloads = Seq(record.payload)
List(previousState)
}
}
.runForeach(println)

运行上面的命令打印如下:

(0,List())
(1,List(a, k, k, a))
(2,List(r, o, c, k, s))
(3,List(!))

您可以调整示例以打印 Batch 对象。

关于scala - Akka 流批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58471183/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com