gpt4 book ai didi

scala - 如何处理 Akka 子角色的长时间初始化?

转载 作者:行者123 更新时间:2023-12-03 20:15:02 28 4
gpt4 key购买 nike

我有一个 Actor ,它创建一个子 Actor 来执行一些冗长的计算。

问题是子actor的初始化需要几秒钟,并且父actor发送给子actor的所有消息都被创建并被完全初始化被丢弃。

这是我正在使用的代码的逻辑:

class ChildActor extends Actor {
val tagger = IntializeTagger(...) // this takes a few seconds to complete

def receive = {
case Tag(text) => sender ! tagger.tag(text)
case "hello" => println("Hello")
case _ => println("Unknown message")
}
}

class ParentActor extends Actor {
val child = context.ActorOf(Props[ChildActor], name = "childactor")

// the below two messages seem to get lost
child ! "hello"
child ! Tag("This is my sample text")

def receive = {
...
}
}

我怎么能解决这个问题?是否可以让父 Actor 等到 child 完全初始化?我将使用带有路由的子actor,并且可能在远程actor系统上使用。

编辑

按照 drexin 的建议,我将代码更改为:
class ChildActor extends Actor {
var tagger: Tagger = _

override def preStart() = {
tagger = IntializeTagger(...) // this takes a few seconds to complete
}

def receive = {
case Tag(text) => sender ! tagger.tag(text)
case "hello" => println("Hello")
case _ => println("Unknown message")
}
}

class ParentActor extends Actor {
var child: ActorRef = _

override def preStart() = {
child = context.ActorOf(Props[ChildActor], name = "childactor")

// When I add
// Thread.sleep(5000)
// here messages are processed without problems

// wihout hardcoding the 5 seconds waiting
// the below two messages seem to get lost
child ! "hello"
child ! Tag("This is my sample text")
}

def receive = {
...
}
}

但问题仍然存在。我错过了什么?

最佳答案

我认为您可能正在寻找的是 Stash 的组合和 become .这个想法是子actor将它的初始状态设置为未初始化,并且在这种状态下,它将隐藏所有传入的消息,直到它完全初始化。完全初始化后,您可以在将行为切换到初始化状态之前取消存储所有消息。一个简单的例子如下:

class ChildActor2 extends Actor with Stash{
import context._
var dep:SlowDependency = _

override def preStart = {
val me = context.self
Future{
dep = new SlowDependency
me ! "done"
}
}

def uninitialized:Receive = {
case "done" =>
unstashAll
become(initialized)
case other => stash()
}

def initialized:Receive = {
case "a" => println("received the 'a' message")
case "b" => println("received the 'b' message")
}

def receive = uninitialized
}

通知 preStart我正在异步进行初始化,以免停止 Actor 的启动。现在这有点难看,关闭了可变的 dep变种。您当然可以通过将消息发送给另一个处理慢依赖项的实例化并将其发送回该actor的消息来处理它。收到依赖后,它将调用 become对于 initialized状态。

现在有一个警告 Stash我会直接从 Akka 文档中粘贴它:
Please note that the Stash can only be used together with actors that 
have a deque-based mailbox. For this, configure the mailbox-type of the
dispatcher to be a deque-based mailbox, such as
akka.dispatch.UnboundedDequeBasedMailbox (see Dispatchers (Scala)).

现在,如果这不适合您,您可以尝试更多 DI type 方法并让慢依赖通过它的构造函数注入(inject)到子actor中。所以你可以像这样定义子actor:
class ChildActor(dep:SlowDependency) extends Actor{
...
}

然后在启动这个actor时,你会这样做:
context.actorOf(new Props().withCreator(new ChildActor(slowDep)), name = "child-actor")

关于scala - 如何处理 Akka 子角色的长时间初始化?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17061740/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com