gpt4 book ai didi

scala - Akka 流选项输出

转载 作者:行者123 更新时间:2023-12-04 10:06:55 33 4
gpt4 key购买 nike

我创建了一个 Akka Stream,它有一个简单的 Source , FlowSink .有了这个,我可以轻松地通过它发送元素。现在我想改变这个流,以便 Flow返回 Option .取决于 Option 的结果我想改变 Flow 的输出.

enter image description here

是否有可能创建这样的结构?

最佳答案

此时给出的两个答案都涉及Broadcast .请注意,它可能适用于这个特定示例,但适用于更复杂的图形 Broadcast可能不是一个明智的选择。
原因是Broadcast如果至少有一个下游背压,则总是背压。
最好的背压感知解决方案是 Partition ,它能够有选择地从 Partitioner 函数选择的分支传播背压。

下面的示例(详细说明 T-Fowl 的答案之一)

  def splittingSink[T, M1, M2, Mat](f: T ⇒ Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) ⇒ Mat): Sink[T, Mat] = {
val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder ⇒
(sink1, sink2) ⇒ {
import GraphDSL.Implicits._

def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1)
val partition = builder.add(Partition[Option[T]](2, partitioner))
partition.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> sink1.in
partition.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> sink2.in

val mapper = builder.add(Flow.fromFunction(f))
mapper.out ~> partition.in

SinkShape(mapper.in)
}
}
Sink.fromGraph(graph)
}

关于scala - Akka 流选项输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40991994/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com