gpt4 book ai didi

java - 如何检测 akka actor 终止是由于系统关闭造成的并避免重新启动它

转载 作者:行者123 更新时间:2023-12-02 01:22:17 26 4
gpt4 key购买 nike

我有一个使用小型 Akka actor 系统(使用 Java)的 Spring 应用程序,其中有一个 MasterActor 扩展了 Akka 的 AbstractActor 并初始化了一个 Router 并设置一些工作参与者。它还监视 worker 的生命周期。如果 Worker Actor 由于某些异常而死亡,我想重新启动它。

 public MasterActor(ActorPropsFactory actorPropsFactory) {
this.actorPropsFactory = actorPropsFactory;

int workers = Runtime.getRuntime().availableProcessors() - 1;

List<Routee> routees = Stream.generate(this::createActorRefRoutee).limit(workers).collect(Collectors.toList());

this.router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees);
}

private ActorRefRoutee createActorRefRoutee() {
ActorRef worker = getContext().actorOf(actorPropsFactory.create(getWorkerActorClass()));
getContext().watch(worker);
return new ActorRefRoutee(worker);
}

private void route(Object message, Supplier<String> routingKeySupplier) {
String routingKey = routingKeySupplier.get();
RouterEnvelope envelope = new ConsistentHashingRouter.ConsistentHashableEnvelope(message, routingKey);
router.route(envelope, getSender());
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(
EventMessage.class,
message -> this.route(message, () -> message.getEvent().getId().toString()))
.match(
Terminated.class,
message -> {
logger.info("WorkerActor {} terminated, restarting", message.getActor());
// todo: detect whether the system is shutting down before restarting the actor
router = router.removeRoutee(message.actor())
.addRoutee(createActorRefRoutee());
})
.build();
}

我遇到的问题是,如果 Spring 应用程序无法启动。 (例如,它无法连接到数据库,或者某些凭据不正确或其他),我收到来自所有工作人员的 Termated 消息,并且 Master actor 尝试启动新的工作人员,这也会得到立即终止,进入无限循环。

检测这种情况的正确方法是什么?有没有办法让主actor检测到actor系统正在关闭,以便worker不会再次重新启动?

最佳答案

难道你不能为你的路由器设置一个监督策略,这样你就可以检查导致失败的异常类型吗?这样您也不需要手动重新启动工作人员。

编辑:

您可以像这样设置SupervisorStrategy:

private static SupervisorStrategy strategy=
new OneForOneStrategy(
10,
Duration.ofMinutes(1),
DeciderBuilder.match(ArithmeticException.class,e->SupervisorStrategy.resume())
.match(NullPointerException.class,e->SupervisorStrategy.restart())
.match(IllegalArgumentException.class,e->SupervisorStrategy.stop())
.matchAny(o->SupervisorStrategy.escalate())
.build());
final ActorRef router=
system.actorOf(
new RoundRobinPool(5).withSupervisorStrategy(strategy).props(Props.create(Echo.class)));

您可以在这里阅读更多相关信息:

Router Actor supervision

Fault tolerance in Akka

关于java - 如何检测 akka actor 终止是由于系统关闭造成的并避免重新启动它,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57476420/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com