- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 BackoffSupervisor 策略来创建一个必须处理某些消息的子 Actor。我想实现一个非常简单的重启策略,在出现异常的情况下:
Supervisor 重新启动子级并再次发送失败消息。
主管重试 3 次后放弃
到目前为止我所拥有的是:
主管定义:
val childProps = Props(new SenderActor())
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
childProps,
childName = cmd.hashCode.toString,
minBackoff = 1.seconds,
maxBackoff = 2.seconds,
randomFactor = 0.2
)
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException => {
println("caught specific message!")
SupervisorStrategy.Restart
}
case _: Exception => SupervisorStrategy.Restart
case _ ⇒ SupervisorStrategy.Escalate
})
)
val sup = context.actorOf(supervisor)
sup ! cmd
应该发送电子邮件的子 Actor ,但失败了(抛出一些异常)并将异常传播回主管:
class SenderActor() extends Actor {
def fakeSendMail():Unit = {
Thread.sleep(1000)
throw new Exception("surprising exception")
}
override def receive: Receive = {
case cmd: NewMail =>
println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}
}
}
在上面的代码中,我将任何异常包装到自定义类 MessageException 中,该异常会传播到 SupervisorStrategy,但是如何将其进一步传播到新子级以强制重新处理?这是正确的方法吗?
编辑。我尝试在 preRestart
钩子(Hook)上向 Actor 重新发送消息,但不知何故钩子(Hook)没有被触发:
class SenderActor() extends Actor {
def fakeSendMail():Unit = {
Thread.sleep(1000)
// println("mail sent!")
throw new Exception("surprising exception")
}
override def preStart(): Unit = {
println("child starting")
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
reason match {
case m: MessageException => {
println("aaaaa")
message.foreach(self ! _)
}
case _ => println("bbbb")
}
}
override def postStop(): Unit = {
println("child stopping")
}
override def receive: Receive = {
case cmd: NewMail =>
println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}
}
}
这给了我类似于以下输出的内容:
new mail received routee
caught specific message!
child stopping
[ERROR] [01/26/2018 10:15:35.690]
[example-akka.actor.default-dispatcher-2]
[akka://example/user/persistentActor-4-scala/$a/1962829645] Could not
process message sample.persistence.MessageException:
Could not process message <stacktrace>
child starting
但是没有来自 preRestart
Hook 的日志
最佳答案
子进程的 preRestart
钩子(Hook)未被调用的原因是 Backoff.onFailure
使用 BackoffOnRestartSupervisor
在幕后,它将默认的重新启动行为替换为与退避策略一致的停止和延迟启动行为。换句话说,当使用 Backoff.onFailure 时,当子进程重新启动时,子进程的 preRestart 方法不会被调用,因为底层主管实际上停止了子进程,然后再次启动它之后。 (使用 Backoff.onStop
可以触发子进程的 preRestart
钩子(Hook),但这与当前的讨论无关。)
BackoffSupervisor
API 不支持在主管的子进程重新启动时自动重新发送消息:您必须自己实现此行为。重试消息的一个想法是让 BackoffSupervisor 的主管来处理它。例如:
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
...
).withReplyWhileStopped(ChildIsStopped)
).withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException =>
println("caught specific message!")
self ! Error(msg.cmd) // replace cmd with whatever the property name is
SupervisorStrategy.Restart
case ...
})
)
val sup = context.actorOf(supervisor)
def receive = {
case cmd: NewMail =>
sup ! cmd
case Error(cmd) =>
timers.startSingleTimer(cmd.id, Replay(cmd), 10.seconds)
// We assume that NewMail has an id field. Also, adjust the time as needed.
case Replay(cmd) =>
sup ! cmd
case ChildIsStopped =>
println("child is stopped")
}
在上面的代码中,嵌入 MessageException
中的 NewMail
消息被包装在自定义案例类中(以便轻松将其与“正常”/新消息区分开来) NewMail
消息)并发送给 self
。在此上下文中,self
是创建 BackoffSupervisor
的参与者。然后,这个封闭的 actor 使用 single timer在某个时刻重播原始消息。这个时间点应该在未来足够远的地方,这样 BackoffSupervisor
可能会耗尽 SenderActor
的重新启动尝试,以便子进程有足够的机会进入在收到重新发送的消息之前,状态为“良好”。显然,无论子进程重启了多少次,这个例子都只涉及一条消息的重发。
另一个想法是为每条 NewMail
消息创建一个 BackoffSupervisor
-SenderActor
对,并拥有 SenderActor
在 preStart
Hook 中将 NewMail
消息发送给自身。这种方法的一个问题是资源的清理。即,当处理成功或子进程重新启动耗尽时,关闭 BackoffSupervisors
(这将依次关闭其各自的 SenderActor
子进程)。 NewMail
id 到 (ActorRef, Int)
元组的映射(其中 ActorRef
是对 BackoffSupervisor
的引用> actor,Int
是重新启动尝试的次数)在这种情况下会很有帮助:
class Overlord extends Actor {
var state = Map[Long, (ActorRef, Int)]() // assuming the mail id is a Long
def receive = {
case cmd: NewMail =>
val childProps = Props(new SenderActor(cmd, self))
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
...
).withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException =>
println("caught specific message!")
self ! Error(msg.cmd)
SupervisorStrategy.Restart
case ...
})
)
val sup = context.actorOf(supervisor)
state += (cmd.id -> (sup, 0))
case ProcessingDone(cmdId) =>
state.get(cmdId) match {
case Some((backoffSup, _)) =>
context.stop(backoffSup)
state -= cmdId
case None =>
println(s"${cmdId} not found")
}
case Error(cmd) =>
val cmdId = cmd.id
state.get(cmdId) match {
case Some((backoffSup, numRetries)) =>
if (numRetries == 3) {
println(s"${cmdId} has already been retried 3 times. Giving up.")
context.stop(backoffSup)
state -= cmdId
} else
state += (cmdId -> (backoffSup, numRetries + 1))
case None =>
println(s"${cmdId} not found")
}
case ...
}
}
请注意,上面示例中的 SenderActor
采用 NewMail
和 ActorRef
作为构造函数参数。后一个参数允许 SenderActor
向封闭的 Actor 发送自定义 ProcessingDone
消息:
class SenderActor(cmd: NewMail, target: ActorRef) extends Actor {
override def preStart(): Unit = {
println(s"child starting, sending ${cmd} to self")
self ! cmd
}
def fakeSendMail(): Unit = ...
def receive = {
case cmd: NewMail => ...
}
}
显然,SenderActor
被设置为每次使用 fakeSendMail
的当前实现都会失败。我将保留 SenderActor
中所需的其他更改来实现顺利路径,其中 SenderActor
向 target< 发送一条
,给你。ProcessingDone
消息
关于scala - 从 Supervisor 重新启动后向 actor 发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48446194/
我有一些 Scala 代码,它用两个不同版本的类型参数化函数做了一些漂亮的事情。我已经从我的应用程序中简化了很多,但最后我的代码充满了形式 w(f[Int],f[Double]) 的调用。哪里w()是
如果我在同一目录中有两个单独的未编译的 scala 文件: // hello.scala object hello { def world() = println("hello world") }
val schema = df.schema val x = df.flatMap(r => (0 until schema.length).map { idx => ((idx, r.g
环境: Play 2.3.0/Scala 2.11.1/IntelliJ 13.1 我使用 Typesafe Activator 1.2.1 用 Scala 2.11.1 创建一个新项目。项目创建好后
我只是想知道如何使用我自己的类扩展 Scala 控制台和“脚本”运行程序,以便我可以通过使用实际的 Scala 语言与其通信来实际使用我的代码?我应将 jar 放在哪里,以便无需临时配置即可从每个 S
我已经根据 README.md 文件安装了 ensime,但是,我在低级 ensime-server 缓冲区中出现以下错误: 信息: fatal error :scala.tools.nsc.Miss
我正在阅读《Scala 编程》一书。在书中,它说“一个函数文字被编译成一个类,当在运行时实例化时它是一个函数值”。并且它提到“函数值是对象,因此您可以根据需要将它们存储在变量中”。 所以我尝试检查函数
我有 hello world scala native 应用程序,想对此应用程序运行小型 scala 测试我使用通常的测试命令,但它抛出异常: NativeMain.scala object Nati
有few resources在网络上,在编写与代码模式匹配的 Scala 编译器插件方面很有指导意义,但这些对生成代码(构建符号树)没有帮助。我应该从哪里开始弄清楚如何做到这一点? (如果有比手动构建
我是 Scala 的新手。但是,我用 创建了一个中等大小的程序。斯卡拉 2.9.0 .现在我想使用一个仅适用于 的开源库斯卡拉 2.7.7 . 是吗可能 在我的 Scala 2.9.0 程序中使用这个
有没有办法在 Scala 2.11 中使用 scala-pickling? 我在 sonatype 存储库中尝试了唯一的 scala-pickling_2.11 工件,但它似乎不起作用。我收到消息:
这与命令行编译器选项无关。如何以编程方式获取代码内的 Scala 版本? 或者,Eclipse Scala 插件 v2 在哪里存储 scalac 的路径? 最佳答案 这无需访问 scala-compi
我正在阅读《Scala 编程》一书,并在第 6 章中的类 Rational 实现中遇到了一些问题。 这是我的 Rational 类的初始版本(基于本书) class Rational(numerato
我是 Scala 新手,我正在尝试开发一个使用自定义库的小项目。我在库内创建了一个mysql连接池。这是我的库的build.sbt organization := "com.learn" name :
我正在尝试运行一些 Scala 代码,只是暂时打印出“Hello”,但我希望在 SBT 项目中编译 Scala 代码之前运行 Scala 代码。我发现在 build.sbt 中有以下工作。 compi
Here链接到 maven Scala 插件使用。但没有提到它使用的究竟是什么 Scala 版本。我创建了具有以下配置的 Maven Scala 项目: org.scala-tools
我对 Scala 还很陌生,请多多包涵。我有一堆包裹在一个大数组中的 future 。 future 已经完成了查看几 TB 数据的辛勤工作,在我的应用程序结束时,我想总结上述 future 的所有结
我有一个 scala 宏,它依赖于通过包含其位置的静态字符串指定的任意 xml 文件。 def myMacro(path: String) = macro myMacroImpl def myMacr
这是我的功能: def sumOfSquaresOfOdd(in: Seq[Int]): Int = { in.filter(_%2==1).map(_*_).reduce(_+_) } 为什么我
这个问题在这里已经有了答案: Calculating the difference between two Java date instances (45 个答案) 关闭 5 年前。 所以我有一个这
我是一名优秀的程序员,十分优秀!