gpt4 book ai didi

scala - Akka Streams 中的 RestartFlow 未按预期工作

转载 作者:行者123 更新时间:2023-12-01 08:53:56 25 4
gpt4 key购买 nike

我正在使用 Delayed restarts with backoff stage Akka Streams 的功能似乎对我不起作用。

我的测试代码是:

object Test {
import akka.stream.scaladsl.{ Flow, RestartFlow, Sink, Source }
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer

implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()

def main(args: Array[String]): Unit = {
val source = Source(1 to 10)
val flow = Flow[Int].map { x =>
println(s"Processing: $x")
if (x != 6) {
x * 2
} else throw new RuntimeException("Baam!!")
}
val restartFlow = RestartFlow.onFailuresWithBackoff(10.milliseconds, 20.milliseconds, 0.2, 3)(() => flow)
source.via(restartFlow).to(Sink.ignore).run()
}
}

我预计异常会发生三次,因为最大重启次数等于 3。但是,相反,我只看到一次异常。我在这里缺少什么?

运行结果:

Processing: 1
Processing: 2
Processing: 3
Processing: 4
Processing: 5
Processing: 6
[ERROR] [03/20/2018 16:13:59.167] [default-akka.actor.default-dispatcher-2] [RestartWithBackoffFlow(akka://default)] Restarting graph due to failure
java.lang.RuntimeException: Baam!!
at Test$.$anonfun$main$1(Test.scala:18)
at scala.runtime.java8.JFunction1$mcII$sp.apply(JFunction1$mcII$sp.java:12)
at akka.stream.impl.fusing.Map$$anon$9.onPush(Ops.scala:53)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:519)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:482)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:378)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:585)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:469)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:560)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:742)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:732)
at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:726)
at akka.actor.Actor.aroundPreStart(Actor.scala:528)
at akka.actor.Actor.aroundPreStart$(Actor.scala:528)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:667)
at akka.actor.ActorCell.create(ActorCell.scala:654)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:525)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

最佳答案

您遇到了一个已报告的问题:https://github.com/akka/akka/issues/24726

更新:此问题已在 Akka 中修复 2.5.13补丁发布。

关于scala - Akka Streams 中的 RestartFlow 未按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49387958/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com