gpt4 book ai didi

java - 用于计算任务的 Akka 模型

转载 作者:塔克拉玛干 更新时间:2023-11-03 04:22:12 25 4
gpt4 key购买 nike

我有以下需求

  • 使用用户名和密码连接到网络服务器并获取身份验证 token
  • 读取文件获取不同的参数
  • 使用第 1 步中的身份验证 token 和第 2 步中的参数向 Web 服务器发送 http 请求

现在我有一个执行以下所有上述任务的 Actor

package akka.first.java;

import akka.actor.UntypedActor;

public class MySingleActor extends UntypedActor {

public void onReceive(Object msg) {

if( msg instanceof sendRequest ) {

//Connect to a webserver with a username and password and get an authetication token
String token = getToken();
// Read file to get different parameters
Param param = readFile();
// Use the auth token fro step 1 and parameters from step 2 to send an http request to the web server
Response response = sendRequest (server, token, param);


}

}

private Param readFile() {
// reads file
}

private String getToken() {
//gets token
}
}

readFile操作包含各种子任务,我觉得应该是一个单独的actor。但是由于 actor 需要从 readFile() 操作返回才能执行其发送请求的主要任务,这可能会阻塞,根据文档不建议这样做,最好的方法是什么? future ?

最佳答案

Official documentation提供以下解决方案:

  • Do the blocking call within an actor (or a set of actors managed by a router [Java, Scala]), making sure to configure a thread pool which is either dedicated for this purpose or sufficiently sized.
  • Do the blocking call within a Future, ensuring an upper bound on the number of such calls at any point in time (submitting an unbounded number of tasks of this nature will exhaust your memory or thread limits).
  • Do the blocking call within a Future, providing a thread pool with an upper limit on the number of threads which is appropriate for the hardware on which the application runs.
  • Dedicate a single thread to manage a set of blocking resources (e.g. a NIO selector driving multiple channels) and dispatch events as they occur as actor messages.

使用 future 是官方建议的方法之一,但要格外小心。

让我们考虑第一种方法,因为 IMO 它更一致。

首先,将所有阻塞 IO 操作提取到仅执行一个阻塞 IO 操作的新参与者中。为简洁起见,假设只有一个这样的操作:

public class MyBlockingIOActor extends UntypedActor {
public void onReceive(Object msg) {
// do blocking IO call here and send the result back to sender
}
}

添加configuration for dispatcher ,这将在 actor system configuration file 中处理阻塞 Actor (通常是 application.conf):

#Configuring a dispatcher with fixed thread pool size, e.g. for actors that perform blocking IO
blocking-io-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 1
}

请确保在创建 actor 系统时使用配置文件(特别是如果您决定使用非标准文件名进行配置):

ActorSystem actorSystem = ActorSystem.create("my-actor-system", ConfigFactory.load("application.conf"));

之后,您想要将执行阻塞 IO 的 actor 分配给专用调度程序。您可以按照描述在配置中执行此操作 here或者在创建 Actor 时:

ActorRef blockingActor = context().actorOf(Props.create(MyBlockingIOActor.class).withDispatcher("blocking-io-dispatcher"));

为了获得更多吞吐量,考虑将阻塞 actor 包装到池中:

SupervisorStrategy strategy = new OneForOneStrategy(
5,
Duration.create(1, TimeUnit.MINUTES),
Collections.singletonList(Exception.class)
);
ActorRef blockingActor = context().actorOf(new SmallestMailboxPool(5).withSupervisorStrategy(strategy).props(Props.create(MyBlockingIOActor.class).withDispatcher("blocking-io-dispatcher")));

您可以通过以下方式确保 Actor 使用正确的调度程序:

public class MyBlockingIOActor extends UntypedActor {
public void preStart() {
LOGGER.debug("using dispatcher: {}", ((Dispatcher)context().dispatcher()).id());
}
}

关于java - 用于计算任务的 Akka 模型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39477447/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com