gpt4 book ai didi

scala - 如何对 akka 流中的异常(上游故障)使用react

转载 作者:行者123 更新时间:2023-12-04 08:47:37 25 4
gpt4 key购买 nike

我有一个 akka Flow[I, O]这是我无法控制的,因为它来自某些第三方代码。每当输入元素不产生输出元素时,我都需要使用react(例如,因为在流程的某些部分引发了异常)。为此,我需要产生失败的输入元素。我在流程或类似方面没有找到任何允许我注册处理程序或以任何方式对其作出 react 的 API。我怎样才能做到这一点?

最佳答案

您要Resume而不是 Stop当 akka 流引发异常时。收集所有成功元素后,您可以Seq#diff告诉哪些元素由于异常抛出而被丢弃。

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

object Exception {

case class MyException(n: Int) extends RuntimeException

def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("Exception")
implicit val ec: ExecutionContext = system.dispatcher

val decider: Supervision.Decider = {
case _: MyException => Supervision.Resume
case _ => Supervision.Stop
}
val flow = Flow[Int]
.map(n =>
if (n % 2 == 1) throw MyException(n)
else n
)
val in = 1 to 10
val outFuture = Source(in)
.via(flow)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.seq)
outFuture.onComplete {
case Success(out) =>
println("dropped elements are " + (in.diff(out)))
case Failure(_) =>
println("unknown failure")
}
}
}
控制台输出是:
dropped elements are Vector(1, 3, 5, 7, 9)
引用: How to get object that caused failure in Akka Streams?

关于scala - 如何对 akka 流中的异常(上游故障)使用react,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64229169/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com