gpt4 book ai didi

scala - 你如何处理 Akka Flow 中的 futures 和 mapAsync?

转载 作者:行者123 更新时间:2023-12-01 09:26:16 24 4
gpt4 key购买 nike

我构建了一个定义简单流程的 akka 图 DSL。但是流 f4 需要 3 秒来发送一个元素,而 f2 需要 10 秒。

结果,我得到了:3, 2, 3, 2。但是,这不是我想要的。由于 f2 花费太多时间,我想得到:3, 3, 2, 2。这是代码...

implicit val actorSystem = ActorSystem("NumberSystem")
implicit val materializer = ActorMaterializer()

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source(List(1, 1))
val out = Sink.foreach(println)

val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))



val yourMapper: Int => Future[Int] = (i: Int) => Future(i + 1)
val yourMapper2: Int => Future[Int] = (i: Int) => Future(i + 2)

val f1, f3 = Flow[Int]
val f2= Flow[Int].throttle(1, 10.second, 0, ThrottleMode.Shaping).mapAsync[Int](2)(yourMapper)
val f4= Flow[Int].throttle(1, 3.second, 0, ThrottleMode.Shaping).mapAsync[Int](2)(yourMapper2)

in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
ClosedShape
})
g.run()

那么我哪里出错了?用 future 还是 mapAsync ?要不然 ...谢谢

最佳答案

抱歉,我是 akka 的新手,所以我还在学习。要获得预期的结果,一种方法是使用异步:

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source(List(1, 1))
val out = Sink.foreach(println)

val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))



val yourMapper: Int => Future[Int] = (i: Int) => Future(i + 1)
val yourMapper2: Int => Future[Int] = (i: Int) => Future(i + 2)

val f1, f3 = Flow[Int]
val f2= Flow[Int].throttle(1, 10.second, 0, ThrottleMode.Shaping).map(_+1)
//.mapAsyncUnordered[Int](2)(yourMapper)
val f4= Flow[Int].throttle(1, 3.second, 0, ThrottleMode.Shaping).map(_+2)
//.mapAsync[Int](2)(yourMapper2)

in ~> f1 ~> bcast ~> f2.async ~> merge ~> f3 ~> out
bcast ~> f4.async ~> merge
ClosedShape
})
g.run()

关于scala - 你如何处理 Akka Flow 中的 futures 和 mapAsync?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53769294/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com