gpt4 book ai didi

akka - Akka Decider 是否可以访问完整的故障场景?

转载 作者:行者123 更新时间:2023-12-05 01:29:32 26 4
gpt4 key购买 nike

Akka 新手。创建一个扩展 SupervisorStrategy 的新 Scala 类为我提供了以下模板:

class MySupervisorStrategy extends SupervisorStrategy {
override def decider: Decider = ???

override def handleChildTerminated(context: ActorContext, child: ActorRef,
children: Iterable[ActorRef]): Unit = ???

override def processFailure(context: ActorContext, restart: Boolean,
child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = ???
}

我正在寻找一种访问方式:

  1. 子 Actor 抛出的Throwable/Exception
  2. 抛出异常的子actor ActorRef
  3. 传递给子actor提示抛出异常的消息

认为Decider(实际上是一个PartialFunction[Throwable,Directive])通过了Throwable 每当 child 抛出异常时,但我看不到从哪里可以访问上面列表中的 #2 和 #3。 有什么想法吗?


更新

从发布的 fiddle 来看,它看起来像一个有效的 Decider 是:

{
case ActorException(ref,t,"stop") =>
println(s"Received 'stop' from ${ref}")
Stop
case ActorException(ref,t,"restart") =>
println(s"Received 'restart' from ${ref}")
Restart
case ActorException(ref,t,"resume") =>
println(s"Received 'resume' from ${ref}")
Resume
}

在上面,我看到了所有三个:

  1. child 抛出的异常
  2. 抛出异常的 child (ref)
  3. 最初发送给 child 的消息(导致抛出异常)

看起来 Decider 中没有任何需要Supervisor 类中定义的内容。我想将 Decider 逻辑提取到例如 MyDecider.scala 中,并找到一种重构 Supervisor 的方法,使其 supervisorStrategy 使用 MyDecider 的一个实例,所以可能类似于:

class Supervisor extends Actor {
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._

var child: ActorRef = _

override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute, decider = myDecider)

...
}

最佳答案

对于 #2,您可以访问发送者“如果策略是在监督参与者内部声明的

If the strategy is declared inside the supervising actor (as opposed to within a companion object) its decider has access to all internal state of the actor in a thread-safe fashion, including obtaining a reference to the currently failed child (available as the sender of the failure message).

该消息不可用,因此唯一的选择是捕获您的异常并使用收到的消息抛出一个自定义异常。

这是一个quick fiddle

class ActorSO extends Actor {

def _receive: Receive = {
case e =>
println(e)
throw new RuntimeException(e.toString)
}

final def receive = {
case any => try {
_receive(any)
}
catch {
case t:Throwable => throw new ActorException(self,t,any)
}

}

}

更新

Decider 只是一个 PartialFunction,因此您可以将其传递到构造函数中。

object SupervisorActor {
def props(decider: Decider) = Props(new SupervisorActor(decider))
}

class SupervisorActor(decider: Decider) extends Actor {

override val supervisorStrategy = OneForOneStrategy()(decider)

override def receive: Receive = ???
}

class MyDecider extends Decider {
override def isDefinedAt(x: Throwable): Boolean = true

override def apply(v1: Throwable): SupervisorStrategy.Directive = {
case t:ActorException => Restart
case notmatched => SupervisorStrategy.defaultDecider.apply(notmatched)
}
}

object Test {
val myDecider: Decider = {
case t:ActorException => Restart
case notmatched => SupervisorStrategy.defaultDecider.apply(notmatched)
}
val myDecider2 = new MyDecider()
val system = ActorSystem("stackoverflow")
val supervisor = system.actorOf(SupervisorActor.props(myDecider))
val supervisor2 = system.actorOf(SupervisorActor.props(myDecider2))
}

通过这样做,您将无法访问主管状态,例如通过 sender() 抛出异常的 child 的 ActorRef(尽管我们包括这在 ActorException)

关于您从主管访问导致异常的子消息的原始问题,您可以看到here (来自 akka 2.5.3)akka 开发人员选择不将其用于决策。

  final protected def handleFailure(f: Failed): Unit = {
// ¡¡¡ currentMessage.message is the one that cause the exception !!!
currentMessage = Envelope(f, f.child, system)
getChildByRef(f.child) match {
/*
* only act upon the failure, if it comes from a currently known child;
* the UID protects against reception of a Failed from a child which was
* killed in preRestart and re-created in postRestart
*/
case Some(stats) if stats.uid == f.uid ⇒
// ¡¡¡ currentMessage.message is not passed to the handleFailure !!!
if (!actor.supervisorStrategy.handleFailure(this, f.child, f.cause, stats, getAllChildStats)) throw f.cause
case Some(stats) ⇒
publish(Debug(self.path.toString, clazz(actor),
"dropping Failed(" + f.cause + ") from old child " + f.child + " (uid=" + stats.uid + " != " + f.uid + ")"))
case None ⇒
publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from unknown child " + f.child))
}
}

关于akka - Akka Decider 是否可以访问完整的故障场景?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50353738/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com