gpt4 book ai didi

java - Akka:询问时丢失了 `child.path.name` 的引用

转载 作者:行者123 更新时间:2023-12-02 10:11:25 25 4
gpt4 key购买 nike

我正在尝试实现 "Up and Running" example 的 Java 版本摘自曼宁的《Akka in Action》一书。它是一个基于 Actor 模型的简单 Http 服务器,用于保存(仅在内存中)和检索一些事件。我保存事件没有问题。但在查询我的 Actor 系统中的事件(所有事件)时,我确实遇到了问题。

这是 BoxOffice 的相关代码(我用三个点代替了我认为与我的问题无关的代码) - 所有 TicketSeller 的父角色>s(稍后负责管理每个事件的状态)。

public class BoxOffice extends AbstractActor {

...
private Timeout timeout;
final static String NAME = "boxOffice";

//create child actors
private ActorRef createTicketSeller(String name) {
return getContext().actorOf(TicketSeller.props(name));
}

public BoxOffice(Timeout timeout) {
this.timeout = timeout;
}

//the only method of an actor
@Override
public Receive createReceive() {
return receiveBuilder()
...
...
.match(GetEvent.class, this::receiveMsgGetEvent)
.match(GetEvents.class, this::receiveMsgGetEvents)
...
.build();
}

...

private void receiveMsgGetEvent(GetEvent getEvent) {
Optional<ActorRef> maybeChild = getChildByName(getEvent.getName());
log.info(String.format("Asking for event %s. Child is present: %s", getEvent.getName(), maybeChild.isPresent()));
OptionalConsumer.of(maybeChild)
.ifPresent(child -> child.forward(new TicketSeller.GetEvent(), getContext()))
.ifNotPresent(() -> getSender().tell(Optional.empty(), getSelf()));
}

private void receiveMsgGetEvents(GetEvents getEvents) {
//ask self() for each of the passed-in event
List<CompletableFuture<Optional<Event>>> listFutureMaybeEvent =
allChildrenStream()
.map(child ->
ask(getSelf(), new GetEvent(child.path().name()), timeout)
.thenApply(obj -> (Optional<Event>) obj)
.toCompletableFuture())
.collect(toList());

CompletableFuture<Events> eventsFuture = toFutureEvents(listFutureMaybeEvent);
pipe(eventsFuture, getContext().dispatcher()).to(sender());
}

private Stream<ActorRef> allChildrenStream() {
return StreamSupport.stream(getContext().getChildren().spliterator(), false);
}

...

private CompletableFuture<Events> toFutureEvents(List<CompletableFuture<Optional<Event>>> futurePossibleEvents) {
List<Event> events = futurePossibleEvents.stream()
.map(CompletableFuture::join)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toList());
return CompletableFuture.supplyAsync(() -> new Events(events));
}

...

private Optional<ActorRef> getChildByName(String name) {
return getContext().findChild(name);
}

static Props props(Timeout timeout) {
return Props.create(BoxOffice.class, () -> new BoxOffice(timeout));
}

基本上发生的情况是,在 receiveMsgGetEvents 中,我向 self 发送一条包含子名称 child.path.name 的消息。但是,当我收到该消息(分别在 receiveMsgGetEvent 中)时,无法按该名称找到子 actor:

INFO  [BoxOffice]: Asking for event $a. Child is present: false

此外,值得注意的是,GetEvent 的发送和由同一个 actor 接收之间需要很长的时间(比如几秒,但我的感觉不到 20)。

问题可能是由于我的 CompletableFutures 操作造成的,但我尝试重现 Scala 等效代码。

上面的信息日志以及此消息:

INFO  [DeadLetterActorRef]: Message [java.util.Optional] from Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585] to Actor[akka://mycompanyAkkaDemo/deadLetters] was not delivered. [1] dead letters encountered. This logging...

在堆栈跟踪之后打印,堆栈跟踪在配置的超时(20 秒)后打印:

ERROR [ActorSystemImpl]: Error during processing of request: 'Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvents".'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler.
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvents".
at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:595)
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:605)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:140)
...
at java.lang.Thread.run(Thread.java:748)
ERROR [OneForOneStrategy]: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvent".
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvent".
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
...
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvent".
at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:595)
... 11 common frames omitted

最佳答案

这里的问题是调度程序发生阻塞。

在 JVM 上,线程由操作系统线程支持,这在内存和进程调度程序开销方面都很昂贵。 Akka 的优点之一是它允许您在较少数量的线程上运行多个 Actor,从而更有效地使用线程。

这很棒,但确实意味着您永远不应该在 actor 内执行阻塞调用。此处的 CompletableFuture::join 调用处于阻塞状态,这可能是导致您出现问题的原因。

通过避免阻塞调用并使用异步 API(例如 CompletableFuture.allOf),您的问题应该会消失。

关于java - Akka:询问时丢失了 `child.path.name` 的引用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54976829/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com