gpt4 book ai didi

java - Akka调度程序: Run next only when current run is complete

转载 作者:行者123 更新时间:2023-12-01 19:51:59 24 4
gpt4 key购买 nike

使用 Akka Scheduler 安排作业是这样的(至少从文档来看):

system.scheduler().schedule(
Duration.Zero(),
Duration.create(5, TimeUnit.SECONDS),
workerActor,
new MessageToTheActor(),
system.dispatcher(), ActorRef.noSender());

但是我不明白如何确保下一次运行仅在当前运行完成时发生。我一直在四处寻找但没有成功:(

最佳答案

调度程序不适合您的用例。

替代方案是 Akka Stream 的 Sink.actorRefWithAck (下面的代码改编自链接文档中的示例,并借用了其中定义的实用程序类)。您需要调整工作参与者来处理一些与流状态相关的消息并回复确认消息。确认消息充当反压信号,并指示 Actor 已准备好处理下一条 MessageToTheActor 消息。 worker Actor 看起来像下面这样:

enum Ack {
INSTANCE;
}

static class StreamInitialized {}
static class StreamCompleted {}
static class StreamFailure {
private final Throwable cause;
public StreamFailure(Throwable cause) { this.cause = cause; }

public Throwable getCause() { return cause; }
}

public class MyWorker extends AbstractLoggingActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(StreamInitialized.class, init -> {
log().info("Stream initialized");
sender().tell(Ack.INSTANCE, self());
})
.match(MessageToTheActor.class, msg -> {
log().info("Received message: {}", msg);
// do something with the message...
sender().tell(Ack.INSTANCE, self());
})
.match(StreamCompleted.class, completed -> {
log().info("Stream completed");
})
.match(StreamFailure.class, failed -> {
log().error(failed.getCause(),"Stream failed!");
})
.build();
}
}

要将 Sink.actorRefWithAck 与上述 actor 一起使用:

final ActorSystem system = ActorSystem.create("MySystem");
final Materializer materializer = ActorMaterializer.create(system);

ActorRef workerActor = system.actorOf(Props.create(MyWorker.class, "worker"));

Source<MessageToTheActor, NotUsed> messages = Source.repeat(new MessageToTheActor());

Sink<String, NotUsed> sink = Sink.<String>actorRefWithAck(
workerActor,
new StreamInitialized(),
Ack.INSTANCE,
new StreamCompleted(),
ex -> new StreamFailure(ex)
);

messages.runWith(sink, materializer);

注意 Source.repeat 的使用,在本例中不断发出 MessageToTheActor 消息。使用 Sink.actorRefWithAck 可确保 Actor 在处理完当前消息之前不会收到另一条消息。

需要以下导入(显然,Akka Streams 依赖项也是如此):

import akka.NotUsed;
import akka.actor.*;
import akka.stream.*;
import akka.stream.javadsl.*;

关于java - Akka调度程序: Run next only when current run is complete,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51091636/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com