gpt4 book ai didi

scala - Akka 流 : Why does Sink. head 终止流 alsoTo 广播?

转载 作者:行者123 更新时间:2023-12-04 15:36:55 26 4
gpt4 key购买 nike

让我们来看一个非常简单的案例:

Source(1 to 10)
.alsoTo(Sink.foreach(v => println(s"each: $v")))
.toMat(Sink.head)(Keep.right)
.run()

根据 alsoTo 文档,我希望 Sink.foreach 打印所有元素,但是,它只打印第一个。如果我切换 Sink.foreachSink.head 位置,也会发生同样的情况。

但是,如果广播是通过 GraphDSL 实现的,即使其中一个接收器是 Sink.head,也会消耗整个源。

编辑:alsoTo 的文档说明如下:

Attaches the given Sink to this Flow, meaning that elements that pass through this Flow will also be sent to the Sink.

对我来说这听起来像是广播,但也许这就是我犯错误的地方。我还可以解释为 toMat 控制流程。因此,我希望以下内容能够从源中获取所有元素:

Source(1 to 10)
.alsoTo(Sink.head)
.toMat(Sink.seq)(Keep.right)
.run()

GraphDSL 版本如我所料:

val s1 = Sink.foreach[Int](v => println(s"each: $v"))
val s2 = Sink.head[Int]
val source = Source(1 to 10)
RunnableGraph.fromGraph(GraphDSL.create(s1, s2)((_, _)) { implicit builder => (s1, s2) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
source ~> broadcast.in
broadcast.out(0) ~> s1
broadcast.out(1) ~> s2
ClosedShape
}).run()

最佳答案

原因是 Sink.head 消耗单个元素并完成自身。这将以 cancel 的形式向上游传播,此后不会从 Source 发送任何元素。

akka.stream.impl.HeadOptionStage.onPush 中的代码展示了它

      def onPush(): Unit = {
p.trySuccess(Option(grab(in)))
completeStage()
}

哪里 completeStage

Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called, then marks the operator as stopped.

更新

alsoTo 是使用以下参数配置的广播:

      val bcast = b.add(Broadcast[Out](2, eagerCancel = true))

您的 GraphDSL 版本工作方式不同,因为默认广播是 eagerCancel = false

在哪里 eagerCancel

if true, broadcast cancels upstream if any of its downstreams cancel.

关于scala - Akka 流 : Why does Sink. head 终止流 alsoTo 广播?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59423459/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com