gpt4 book ai didi

java - 如何确定 Akka actor/supervisor 层次结构?

转载 作者:行者123 更新时间:2023-11-30 08:13:55 26 4
gpt4 key购买 nike

我是 Akka(Java 库,v2.3.9)的新手。我正在尝试关注 supervisor hierarchy best practices ,但由于这是我的第一个 Akka 应用程序,我在某处遇到了心理障碍。

在我的第一个 Akka 应用程序(实际上是一个旨在跨多个应用程序重用的库)中,来自外部世界的输入表现为 Process传递给 Actor 的消息。使用我的应用程序的开发人员将提供一个基于文本的配置文件,最终配置发送哪些参与者 Process实例,哪些没有。换句话说,假设这些是我的 Actor 类:

// Groovy pseudo-code
class Process {
private final Input input

Process(Input input) {
super()
this.input = deepClone(input)
}

Input getInput() {
deepClone(this.input)
}
}

class StormTrooper extends UntypedActor {
@Override
void onReceive(Object message) {
if(message instanceof Process) {
// Process the message like a Storm Trooper would.
}
}
}

class DarthVader extends UntypedActor {
@Override
void onReceive(Object message) {
if(message instanceof Process) {
// Process the message like Darth Vader would.
}
}
}

class Emperor extends UntypedActor {
@Override
void onReceive(Object message) {
if(message instanceof Process) {
// Process the message like the Emperor would.
}
}
}

// myapp-config.json -> where the actors are configured, along with other
// app-specific configs
{
"fizzbuzz": "true",
"isYosemite": "false",
"borderColor": "red",
"processors": [
"StormTrooper",
"Emperor"
]
}

正如你在配置文件中看到的,只有StormTrooperEmperor被选中接收Process消息。这最终导致零 (0) DarthVader正在创建的 Actor 。这也是我的意图,这将导致 Set<ActorRef>可供填充有 StormTrooper 的应用程序使用和 Emperor像这样:

class SomeApp {
SomeAppConfig config

static void main(String[] args) {
String configFileUrl = args[0] // Nevermind this horrible code

// Pretend here that configFileUrl is a valid path to
// myapp-config.json.

SomeApp app = new SomeApp(configFileUrl)
app.run()
}

SomeApp(String url) {
super()

config = new SomeAppConfig(url)
}

void run() {
// Since the config file only specifies StormTrooper and
// Emperor as viable processors, the set only contains instances of
// these ActorRef types.
Set<ActorRef> processors = config.loadProcessors()
ActorSystem actorSystem = config.getActorSystem()

while(true) {
Input input = scanForInput()
Process process = new Process(input)

// Notify each config-driven processor about the
// new input we've received that they need to process.
processors.each {
it.tell(process, Props.self()) // This isn't correct btw
}
}
}
}

因此,正如您(希望)看到的那样,我们拥有处理 UntypedActor 的所有这些参与者(实际上,有数十个 Process impl)消息(反过来,从某些来源捕获 Input)。至于哪些 Actor 还活着/在线来处理这些 Process消息完全由配置驱动。最后,每次应用程序收到 Input , 它被注入(inject)一个 Process消息,然后Process消息被发送到所有已配置/Activity 的 Actor 。

有了这个作为给定的背景故事/设置,我无法确定“ Actor /主管层次结构”需要是什么。在我的用例中,似乎所有参与者都是真正平等的,他们之间没有监督结构。 StormTrooper只收到 Process如果该类型的参与者被配置为存在,则消息。其他 actor 子类也一样。

我是不是完全遗漏了什么?如果所有参与者都是平等的并且层次结构本质上是“扁平”/水平的,我该如何定义监督层次结构(用于容错目的)?

最佳答案

如果您只想为每个 Actor 实例化一个实例 - 您可能需要 SenatorPalpatine 来监督这三个实例。如果您可能有多个 StormTrooper - 您可能希望让 JangoFett actor 负责创建(并可能杀死)他们,一些 router也是不错的选择(它会自动监督他们)。这也将使您能够在失败时重新启动所有士兵(OneForAllStrategy),能够广播,保存一些常见的统计信息等。

带有路由器的示例(伪 Scala):

//application.conf
akka.actor.deployment {
/palpatine/vader {
router = broadcast-pool
nr-of-instances = 1
}
/palpatine/troopers {
router = broadcast-pool
nr-of-instances = 10
}
}

class Palpatine extends Actor {
import context._

val troopers = actorOf(FromConfig.props(Props[Trooper],
"troopers").withSupervisorStrategy(strategy) //`strategy` is strategy for troopers

val vader = actorOf(FromConfig.props(Props[Vader]), "vader")

override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1) //stategy for Palpatine's children (routers itself)

val strategy = OneForOneStrategy(maxNrOfRetries = 100, withinTimeRange = 1) //stategy for troopers

def receive = {
case p@Process => troopers ! p; vader ! p
case t@Terminted => println(t)
}
}

根据标准 akka-config 创建广播池.我还展示了您可以分别为它们定制监督策略。

如果您希望某些 actor 出于某种原因忽略消息 - 只需在 actor 内部实现此逻辑,例如:

class Vader extends Actor {
def receive {
case p@Process => ...
case Ignore => context.become(ignore) //changes message handler to `ignore`
}


def ignore = {
case x => println("Ignored message " + x)
case UnIgnore => context.become(process)//changes message handler back
}

}

这将动态配置忽略/取消忽略(否则它只是一个简单的if)。您可以根据某些配置向参与者发送 Ignore 消息:

val listOfIgnorantPathes = readFromSomeConfig()
context.actorSelection(listOfIgnoredPathes) ! Ignore

如果你想从配置中控制异构广播,你也可以像 trooper 的路由器一样为 palpatine 创建广播器(只需使用组而不是池):

akka.actor.deployment {
... //vader, troopers configuration

/palpatine/broadcaster {
router = broadcast-group
routees.paths = ["/palpatine/vader", "/palpatine/troopers"]
}
}

class Palpatine extends Actor {
... //vader, troopers definitions

val broadcaster = actorOf(FromConfig.props(), "broadcaster")

def receive = {
case p@Process => broadcaster ! p
}
}

只需将 vader 从 routees.paths 中排除,使他不会收到 Process 消息。

附言Actor 永远不会孤单 - 总是有 Guardian Actor(参见 The Top-Level Supervisors ),它会在出现异常时关闭整个系统。因此,无论哪种方式,SenatorPalpatine 都可能成为您的救星。

附言2 context.actorSelection("palpatine/*")实际上允许您向所有 child 发送消息(作为广播池和组的替代方法),因此您不需要在其中设置一组。

关于java - 如何确定 Akka actor/supervisor 层次结构?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29774075/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com