gpt4 book ai didi

scala - 如何从 Source[A] 创建 Akka Stream Source[Seq[A]]

转载 作者:行者123 更新时间:2023-12-02 04:40:07 27 4
gpt4 key购买 nike

在以前版本的 Akka Streams 中,groupBy 返回一个 SourceSource 可以具体化为一个 Source[Seq [A]].

在 Akka Streams 2.4 中,我看到 groupBy 返回一个 SubFlow - 我不清楚如何使用它。我需要应用到流程的转换必须有整个 Seq 可用,所以我不能只 map SubFlow(我想想)。

我编写了一个扩展 GraphStage 的类,它通过 GraphStageLogic 中的可变集合进行聚合,但是是否有内置功能?我错过了 SubFlow 的要点吗?

最佳答案

我最终写了一个 GraphStage:

class FlowAggregation[A, B](f: A => B) extends GraphStage[FlowShape[A, Seq[A]]] {
val in: Inlet[A] = Inlet("in")
val out: Outlet[Seq[A]] = Outlet("out")
override val shape = FlowShape.of(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {

private var counter: Option[B] = None
private var aggregate = scala.collection.mutable.ArrayBuffer.empty[A]

setHandler(in, new InHandler {
override def onPush(): Unit = {
val element = grab(in)

counter.fold({
counter = Some(f(element))
aggregate += element
pull(in)
}) { p =>
if (f(element) == p) {
aggregate += element
pull(in)
} else {
push(out, aggregate)
counter = Some(f(element))
aggregate = scala.collection.mutable.ArrayBuffer(element)
}
}
}
override def onUpstreamFinish(): Unit = {
emit(out, aggregate)
complete(out)
}
})

setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}

关于scala - 如何从 Source[A] 创建 Akka Stream Source[Seq[A]],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38246812/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com