gpt4 book ai didi

scala - Akka 流 - 在失败后使用广播和 zip 恢复图形

转载 作者:行者123 更新时间:2023-12-04 19:29:05 25 4
gpt4 key购买 nike

我有一个带有广播和 zipper 的流程图。如果此流程中的某些内容(无论是什么)失败,我想删除传递给它的有问题的元素并继续。我想出了以下解决方案:

val flow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._

val dangerousFlow = Flow[Int].map {
case 5 => throw new RuntimeException("BOOM!")
case x => x
}
val safeFlow = Flow[Int]
val bcast = builder.add(Broadcast[Int](2))
val zip = builder.add(Zip[Int, Int])

bcast ~> dangerousFlow ~> zip.in0
bcast ~> safeFlow ~> zip.in1

FlowShape(bcast.in, zip.out)
})

Source(1 to 9)
.via(flow)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.foreach(println))

我希望它打印:
(1,1)
(2,2)
(3,3)
(4,4)
(5,5)
(6,6)
(7,7)
(8,8)
(9,9)

但是,它会死锁,仅打印:
(1,1)
(2,2)
(3,3)
(4,4)

我们做了一些调试,结果发现它对它的 child 应用了“恢复”策略,这导致了 dangerousFlow在失败后恢复,从而从 bcast 请求一个元素. bcast直到 safeFlow 才会发出元素需要另一个元素,这实际上从未发生过(因为它正在等待来自 zip 的需求)。

有没有办法恢复图表,而不管其中一个阶段出了什么问题?

最佳答案

我想你很好地理解了这个问题。你看到了,当你的元素 5崩溃 dangerousFlow ,您还应该停止元素 5正在通过 safeFlow因为如果它到达 zip阶段,您遇到了您描述的问题。我不知道如何解决您在 broadcast 之间的问题和 zip阶段,但如何进一步插入问题,更容易处理?

考虑使用以下 dangerousFlow :

import scala.util._
val dangerousFlow = Flow[Int].map {
case 5 => Failure(new RuntimeException("BOOM!"))
case x => Success(x)
}

即使出现问题, dangerousFlow仍然会发出数据。然后您可以 zip正如您目前所做的那样,只需要添加一个 collect stage 作为图表的最后一步。在流程中,这看起来像:
Flow[(Try[Int], Int)].collect {
case (Success(s), i) => s -> i
}

现在,如果,如您所写,您真的希望它输出 (5, 5)元组,使用以下内容:
Flow[(Try[Int], Int)].collect {
case (Success(s), i) => s -> i
case (_, i) => i -> i
}

关于scala - Akka 流 - 在失败后使用广播和 zip 恢复图形,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47032339/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com