gpt4 book ai didi

scala - 如何设置 akka Actor 容错?

转载 作者:行者123 更新时间:2023-12-04 08:14:49 29 4
gpt4 key购买 nike

我试图在 akka Actors 中获得容错行为。我正在编写一些代码,这些代码依赖于系统中的 Actor 可用于长期处理。我发现我的处理在几个小时后停止(大约需要 10 小时)并且没有发生太多事情。我相信我的 Actor 没有从异常中恢复过来。

我需要做什么才能永久地一对一地重新启动 Actor?我希望这可以从这个文档 http://akka.io/docs/akka/1.1.3/scala/fault-tolerance 中完成

我正在使用 akka 1.1.3 和 Scala 2.9

import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
import akka.dispatch.Dispatchers
import akka.routing.CyclicIterator
import akka.routing.LoadBalancer
import akka.config.Supervision._


object TestActor {
val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool")
.setCorePoolSize(100)
.setMaxPoolSize(100)
.build
}

class TestActor(val name: Integer) extends Actor {
self.lifeCycle = Permanent
self.dispatcher = TestActor.dispatcher
def receive = {
case num: Integer => {
if( num % 2 == 0 )
throw new Exception("This is a simulated failure")
println("Actor: " + name + " Received: " + num)
//Thread.sleep(100)
}
}

override def postStop(){
println("TestActor post Stop ")
}

//callback method for restart handling
override def preRestart(reason: Throwable){
println("TestActor "+ name + " restaring after shutdown because of " + reason)
}

//callback method for restart handling
override def postRestart(reason: Throwable){
println("Restaring TestActor "+name+"after shutdown because of " + reason)
}
}

trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
val testActors: List[ActorRef]
val seq = new CyclicIterator[ActorRef](testActors)
}

trait TestActorManager extends Actor {
self.lifeCycle = Permanent
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 1000, 5000)
val testActors: List[ActorRef]
override def preStart = testActors foreach { self.startLink(_) }
override def postStop = { System.out.println("postStop") }
}


object FaultTest {
def main(args : Array[String]) : Unit = {
println("starting FaultTest.main()")
val numOfActors = 5
val supervisor = actorOf(
new TestActorManager with CyclicLoadBalancing {
val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i)));
}
)

supervisor.start();

println("Number of Actors: " + Actor.registry.actorsFor(classOf[TestActor]).length)

val testActor = Actor.registry.actorsFor(classOf[TestActor]).head

(1 until 200 toList) foreach { testActor ! _ }

}
}

这段代码在 LoadBalancer 后面设置了 5 个 Actor,它们只打印出发送给它们的整数,除了它们在偶数上抛出异常以模拟故障。整数 0 到 200 被发送到这些 Actor。我希望奇数会得到输出,但在偶数出现几次错误后,一切似乎都关闭了。使用 sbt 运行此代码会产生以下输出:
[info] Running FaultTest 
starting FaultTest.main()
Loading config [akka.conf] from the application classpath.
Number of Actors: 5
Actor: 2 Received: 1
Actor: 2 Received: 9
Actor: 1 Received: 3
Actor: 3 Received: 7
[info] == run ==
[success] Successful.
[info]
[info] Total time: 13 s, completed Aug 16, 2011 11:00:23 AM

我认为这里发生的是 5 个 Actor 开始,而前 5 个偶数让他们停业,他们没有重新开始。

如何更改此代码以便 Actor 从异常中恢复?

我希望这实际上会打印出从 1 到 200 的所有奇数。我认为每个参与者都会在偶数上失败,但在异常情况下会以完整的邮箱重新启动。我希望看到来自 preRestart 和 postRestart 的 println。需要在此代码示例中配置什么才能使这些事情发生?

这里有一些关于 akka 和 Actors 的额外假设,可能会导致我的误解。我假设可以使用 Supervisor 或 faultHandler 配置一个 Actor,以便在接收期间抛出异常时它会重新启动并继续可用。我假设发送给 Actor 的消息如果在接收期间抛出异常就会丢失。我假设将调用引发异常的 actor 上的 preRestart() 和 postRestart()。

代码示例代表我正在尝试做的事情并且基于 Why is my Dispatching on Actors scaled down in Akka?

** 另一个代码示例 **

这是另一个更简单的代码示例。我正在开始一位在偶数上引发异常的 Actor 。没有负载平衡器或其他东西。我试图打印出有关 Actor 的信息。在将消息发送到 Actor 并监视正在发生的事情后,我正在等待退出程序一分钟。

我希望这会打印出奇数,但看起来 Actor 坐在它的邮箱中,并带着消息。

我的 OneForOneStrategy 设置错了吗?我需要将 Actor 链接到某些东西吗?这种配置是否从根本上误导了我? Dispatcher 是否需要以某种方式设置容错?我可能会弄乱 Dispatcher 中的线程吗?
import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.ActorRef
import akka.actor.ActorRegistry
import akka.config.Supervision._

class SingleActor(val name: Integer) extends Actor {
self.lifeCycle = Permanent
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 30, 1000)
def receive = {
case num: Integer => {
if( num % 2 == 0 )
throw new Exception("This is a simulated failure, where does this get logged?")
println("Actor: " + name + " Received: " + num)
}
}

override def postStop(){
println("TestActor post Stop ")
}

override def preRestart(reason: Throwable){
println("TestActor "+ name + " restaring after shutdown because of " + reason)
}

override def postRestart(reason: Throwable){
println("Restaring TestActor "+name+"after shutdown because of " + reason)
}
}

object TestSingleActor{

def main(args : Array[String]) : Unit = {
println("starting TestSingleActor.main()")

val testActor = Actor.actorOf( new SingleActor(1) ).start()

println("number of actors: " + registry.actors.size)
printAllActorsInfo

(1 until 20 toList) foreach { testActor ! _ }

for( i <- 1 until 120 ){
Thread.sleep(500)
printAllActorsInfo
}
}

def printAllActorsInfo() ={
registry.actors.foreach( (a) =>
println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b "
.format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted)))
}
}

我得到的输出如下:
[info] Running TestSingleActor 
starting TestSingleActor.main()
Loading config [akka.conf] from the application classpath.
number of actors: 1
Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false
Actor: 1 Received: 1
Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false

... 117 more of these lines repeted ...

Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false
[info] == run ==
[success] Successful.
[info]
[info] Total time: 70 s, completed Aug 17, 2011 2:24:49 PM

最佳答案

问题是我在用我的 akka.conf 文件。除了配置事件处理程序的行之外,我使用了引用 1.1.3 akka.conf 文件。

我的(坏掉的):

    event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] 

引用 1.1.3(有效的那个):
    event-handlers = ["akka.event.EventHandler$DefaultListener"]

使用我的事件处理程序配置行,Actor 不会重新启动。引用 1.1.3 行重新启动非常好。

我根据这些说明进行了此更改 http://akka.io/docs/akka/1.1.3/general/slf4j.html

因此,通过删除该页面中的建议并返回 1.1.3 引用 akka.conf,我能够获得容错 Actor。

关于scala - 如何设置 akka Actor 容错?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7080591/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com