gpt4 book ai didi

scala - 如何将 Akka Streams Merge 的输出通过管道传输到另一个 Flow?

转载 作者:行者123 更新时间:2023-12-04 22:45:43 25 4
gpt4 key购买 nike

我正在使用 Akka Streams,并且已经弄清楚了大部分基础知识,但是我不清楚如何获取 Merge 的结果。并对其进行进一步的操作(映射、过滤、折叠等)。

我想修改以下代码,以便我可以进一步操作数据,而不是将合并传递到接收器。

implicit val materializer = FlowMaterializer()

val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))
val sink = ForeachSink(println)

val materialized = FlowGraph { implicit builder =>
import FlowGraphImplicits._
val merge = Merge[Int]("m1")
items_a ~> merge
items_b ~> merge ~> sink
}.run()

我想我的主要问题是我无法弄清楚如何制作没有源的流组件,而且我无法弄清楚如何在不使用特殊的 Merge 对象和 ~> 的情况下进行合并。句法。

编辑: 这个问题和答案适用于 Akka Streams 0.11

最佳答案

如果你不关心 Merge 的语义元素随机流向下游,然后你可以试试 concatSource而是像这样:

items_a.concat(items_b).map(_ * 2).map(_.toString).foreach(println)

这里的区别在于 a 中的所有项目将在 b 的任何元素之前先向下游流动.如果你真的需要 Merge 的行为,那么您可以考虑以下内容(请记住,您最终将需要一个接收器,但您当然可以在合并后进行其他转换):
val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))

val sink = ForeachSink[Double](println)
val transform = Flow[Int].map(_ * 2).map(_.toDouble).to(sink)


val materialized = FlowGraph { implicit builder =>
import FlowGraphImplicits._
val merge = Merge[Int]("m1")
items_a ~> merge
items_b ~> merge ~> transform
}.run

在这个例子中,你可以看到我使用了来自 Flow 的助手。伴侣打造 Flow没有特定的输入 Source .从那里我可以将它附加到合并点以获得我的额外处理。

关于scala - 如何将 Akka Streams Merge 的输出通过管道传输到另一个 Flow?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27233052/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com