gpt4 book ai didi

scala - 从 Supervisor 重新启动后向 actor 发送消息

转载 作者:行者123 更新时间:2023-12-02 17:29:04 25 4
gpt4 key购买 nike

我正在使用 BackoffSupervisor 策略来创建一个必须处理某些消息的子 Actor。我想实现一个非常简单的重启策略,在出现异常的情况下:

  1. 子级将失败消息传播给主管
  2. Supervisor 重新启动子级并再次发送失败消息。

  3. 主管重试 3 次后放弃

  4. Akka 持久化不是一种选择

到目前为止我所拥有的是:

主管定义:

val childProps = Props(new SenderActor())
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
childProps,
childName = cmd.hashCode.toString,
minBackoff = 1.seconds,
maxBackoff = 2.seconds,
randomFactor = 0.2
)
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException => {
println("caught specific message!")
SupervisorStrategy.Restart
}
case _: Exception => SupervisorStrategy.Restart
case _ ⇒ SupervisorStrategy.Escalate
})
)

val sup = context.actorOf(supervisor)


sup ! cmd

应该发送电子邮件的子 Actor ,但失败了(抛出一些异常)并将异常传播回主管:

class SenderActor() extends Actor {

def fakeSendMail():Unit = {
Thread.sleep(1000)
throw new Exception("surprising exception")
}

override def receive: Receive = {
case cmd: NewMail =>

println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}

}
}

在上面的代码中,我将任何异常包装到自定义类 MessageException 中,该异常会传播到 SupervisorStrategy,但是如何将其进一步传播到新子级以强制重新处理?这是正确的方法吗?

编辑。我尝试在 preRestart 钩子(Hook)上向 Actor 重新发送消息,但不知何故钩子(Hook)没有被触发:

class SenderActor() extends Actor {

def fakeSendMail():Unit = {
Thread.sleep(1000)
// println("mail sent!")
throw new Exception("surprising exception")
}

override def preStart(): Unit = {
println("child starting")
}


override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
reason match {
case m: MessageException => {
println("aaaaa")
message.foreach(self ! _)
}
case _ => println("bbbb")
}
}

override def postStop(): Unit = {
println("child stopping")
}

override def receive: Receive = {
case cmd: NewMail =>

println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}

}
}

这给了我类似于以下输出的内容:

new mail received routee
caught specific message!
child stopping
[ERROR] [01/26/2018 10:15:35.690]
[example-akka.actor.default-dispatcher-2]
[akka://example/user/persistentActor-4-scala/$a/1962829645] Could not
process message sample.persistence.MessageException:
Could not process message <stacktrace>
child starting

但是没有来自 preRestart Hook 的日志

最佳答案

子进程的 preRestart 钩子(Hook)未被调用的原因是 Backoff.onFailure 使用 BackoffOnRestartSupervisor在幕后,它将默认的重新启动行为替换为与退避策略一致的停止和延迟启动行为。换句话说,当使用 Backoff.onFailure 时,当子进程重新启动时,子进程的 preRestart 方法不会被调用,因为底层主管实际上停止了子进程,然后再次启动它之后。 (使用 Backoff.onStop 可以触发子进程的 preRestart 钩子(Hook),但这与当前的讨论无关。)

BackoffSupervisor API 不支持在主管的子进程重新启动时自动重新发送消息:您必须自己实现此行为。重试消息的一个想法是让 BackoffSupervisor 的主管来处理它。例如:

val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
...
).withReplyWhileStopped(ChildIsStopped)
).withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException =>
println("caught specific message!")
self ! Error(msg.cmd) // replace cmd with whatever the property name is
SupervisorStrategy.Restart
case ...
})
)

val sup = context.actorOf(supervisor)

def receive = {
case cmd: NewMail =>
sup ! cmd
case Error(cmd) =>
timers.startSingleTimer(cmd.id, Replay(cmd), 10.seconds)
// We assume that NewMail has an id field. Also, adjust the time as needed.
case Replay(cmd) =>
sup ! cmd
case ChildIsStopped =>
println("child is stopped")
}

在上面的代码中,嵌入 MessageException 中的 NewMail 消息被包装在自定义案例类中(以便轻松将其与“正常”/新消息区分开来) NewMail 消息)并发送给 self。在此上下文中,self 是创建 BackoffSupervisor 的参与者。然后,这个封闭的 actor 使用 single timer在某个时刻重播原始消息。这个时间点应该在未来足够远的地方,这样 BackoffSupervisor 可能会耗尽 SenderActor 的重新启动尝试,以便子进程有足够的机会进入在收到重新发送的消息之前,状态为“良好”。显然,无论子进程重启了多少次,这个例子都只涉及一条消息的重发。

<小时/>

另一个想法是为每条 NewMail 消息创建一个 BackoffSupervisor-SenderActor 对,并拥有 SenderActorpreStart Hook 中将 NewMail 消息发送给自身。这种方法的一个问题是资源的清理。即,当处理成功或子进程重新启动耗尽时,关闭 BackoffSupervisors(这将依次关闭其各自的 SenderActor 子进程)。 NewMail id 到 (ActorRef, Int) 元组的映射(其中 ActorRef 是对 BackoffSupervisor 的引用> actor,Int 是重新启动尝试的次数)在这种情况下会很有帮助:

class Overlord extends Actor {

var state = Map[Long, (ActorRef, Int)]() // assuming the mail id is a Long

def receive = {
case cmd: NewMail =>
val childProps = Props(new SenderActor(cmd, self))
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
...
).withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException =>
println("caught specific message!")
self ! Error(msg.cmd)
SupervisorStrategy.Restart
case ...
})
)
val sup = context.actorOf(supervisor)
state += (cmd.id -> (sup, 0))

case ProcessingDone(cmdId) =>
state.get(cmdId) match {
case Some((backoffSup, _)) =>
context.stop(backoffSup)
state -= cmdId
case None =>
println(s"${cmdId} not found")
}

case Error(cmd) =>
val cmdId = cmd.id
state.get(cmdId) match {
case Some((backoffSup, numRetries)) =>
if (numRetries == 3) {
println(s"${cmdId} has already been retried 3 times. Giving up.")
context.stop(backoffSup)
state -= cmdId
} else
state += (cmdId -> (backoffSup, numRetries + 1))
case None =>
println(s"${cmdId} not found")
}

case ...
}
}

请注意,上面示例中的 SenderActor 采用 NewMailActorRef 作为构造函数参数。后一个参数允许 SenderActor 向封闭的 Actor 发送自定义 ProcessingDone 消息:

class SenderActor(cmd: NewMail, target: ActorRef) extends Actor {
override def preStart(): Unit = {
println(s"child starting, sending ${cmd} to self")
self ! cmd
}

def fakeSendMail(): Unit = ...

def receive = {
case cmd: NewMail => ...
}
}

显然,SenderActor 被设置为每次使用 fakeSendMail 的当前实现都会失败。我将保留 SenderActor 中所需的其他更改来实现顺利路径,其中 SenderActortarget< 发送一条 ProcessingDone 消息,给你。

关于scala - 从 Supervisor 重新启动后向 actor 发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48446194/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com