gpt4 book ai didi

scala - 你如何处理 Akka Flow 中的 future ?

转载 作者:行者123 更新时间:2023-12-04 02:50:42 25 4
gpt4 key购买 nike

我构建了一个定义流的 akka 图。我的目标是重新格式化我 future 的响应并将其保存到文件中。流程可以概括如下:

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val balancer = builder.add(Balance[(HttpRequest, String)](6, waitForAllDownstreams = false))
val merger = builder.add(Merge[Future[Map[String, String]]](6))
val fileSink = FileIO.toPath(outputPath, options)
val ignoreSink = Sink.ignore
val in = Source(seeds)
in ~> balancer.in
for (i <- Range(0,6)) {
balancer.out(i) ~>
wikiFlow.async ~>
// This maps to a Future[Map[String, String]]
Flow[(Try[HttpResponse], String)].map(parseHtml) ~>
merger
}

merger.out ~>
// When we merge we need to map our Map to a file
Flow[Future[Map[String, String]]].map((d) => {
// What is the proper way of serializing future map
// so I can work with it like a normal stream into fileSink?

// I could manually do ->
// d.foreach(someWriteToFileProcess(_))
// with ignoreSink, but this defeats the nice
// akka flow
}) ~>
fileSink

ClosedShape
})

我可以破解这个工作流程,通过 foreach 将我 future 的映射写入文件,但我担心这可能会导致 FileIO 的并发问题,而且感觉不对。用我们的 akka 流程处理 future 的正确方法是什么?

最佳答案

创建 Flow 的最简单方法涉及异步计算的方法是使用 mapAsync .

所以...假设你想创建一个 Flow消耗 Int并生产 String使用异步计算 mapper: Int => Future[String]并行度为 5。

val mapper: Int => Future[String] = (i: Int) => Future(i.toString)

val yourFlow = Flow[Int].mapAsync[String](5)(mapper)

现在,您可以根据需要在图表中使用此流程。

一个示例用法是,
val graph = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._

val intSource = Source(1 to 10)

val printSink = Sink.foreach[String](s => println(s))

val yourMapper: Int => Future[String] = (i: Int) => Future(i.toString)

val yourFlow = Flow[Int].mapAsync[String](2)(yourMapper)

intSource ~> yourFlow ~> printSink

ClosedShape
}

关于scala - 你如何处理 Akka Flow 中的 future ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42059929/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com