gpt4 book ai didi

java - 如何在循环中调用 Akka Http POST(1000-10000 次)?

转载 作者:行者123 更新时间:2023-11-30 10:07:45 24 4
gpt4 key购买 nike

我正在用 Java 学习 Akka。我用两个 Actor 编写了一个简单的程序。

我的第一个 Actor ActorA 被称为包含 1000 个字符串的列表。 ActorA 遍历列表并为每个元素调用 ActorB

ActorB 使用从 ActorA 收到的字符串参数对外部服务进行 Http POST 调用。

我预计 ActorB 将成功进行 1000 次 Http POST 调用并收到相同数量的响应。然而,ActorB 能够随机发出 80-120 次 POST 请求,然后停止发出 POST 调用。

我尝试提供一个自定义调度程序,因为 HTTP POST 调用是一个阻塞操作,但仍然没有成功!!

引用下面给出的代码和配置。

public class ActorA extends AbstractActor {
static public Props props() {
return Props.create(ActorA.class);
}


static public class IdWrapper {
List<String> ids;

public IdWrapper(List<String> ids) {
this.ids = ids;
}
}


@Override
public Receive createReceive() {
return receiveBuilder()
.match(IdWrapper.class, this::process)
.build();
}

private void process(IdWrapper msg) {
msg.ids.forEach(id -> {
context().actorSelection("actorB").tell(new MessageForB(id), ActorRef.noSender());
}
);
}

public class ActorB extends AbstractActor {   

final Http http = Http.get(getContext().system());
final Materializer materializer = ActorMaterializer.create(context());

public static Props props() {
return Props.create(ActorB.class);
}

static public class MessageForB implements Serializable {
String id;

public MessageForB(String id) {
this.id = id;
}
}


@Override
public Receive createReceive() {
return receiveBuilder()
.match(MessageForB.class, this::process)
.build();
}

private void process(MessageForB messageForB) {

ExecutionContext ec = getContext().getSystem().dispatchers().lookup("my-blocking-dispatcher");
/**
* Get id from request
*/
String reqId = messageForB.id;

/**
* Prepare request
*/
XmlRequest requestEntity = getRequest(Stream.of(reqId).collect(Collectors.toList()));
String requestAsString = null;


try {
/**
* Create and configure JAXBMarshaller.
*/
JAXBContext jaxbContext = JAXBContext.newInstance(XmlRequest.class);
Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
jaxbMarshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);

/**
* Convert request entity to string before making POST request.
*/
StringWriter sw = new StringWriter();
jaxbMarshaller.marshal(requestEntity, sw);
requestAsString = sw.toString();

} catch (JAXBException e) {
e.printStackTrace();
}


/**
* Create RequestEntity from request string.
*/
RequestEntity entity = HttpEntities.create(
MediaTypes.APPLICATION_XML.toContentType(HttpCharsets.ISO_8859_1),
requestAsString);

/**
* Create Http POST with necessary headers and call
*/
final CompletionStage<HttpResponse> responseFuture =
http.singleRequest(HttpRequest.POST("http://{hostname}:{port}/path")
.withEntity(entity));

responseFuture
.thenCompose(httpResponse -> {
/**
* Convert response into String
**/
final CompletionStage<String> res = Unmarshaller.entityToString().unmarshal
(httpResponse.entity(), ec, materializer);
/**
* Consume response bytes
**/
httpResponse.entity().getDataBytes().runWith(Sink.ignore(), materializer);
return res;

})
.thenAccept(s -> {

try {
/**
* Deserialize string to DTO.
*/
MyResponse MyResponse = getMyResponse(s);

// further processing..

} catch (JAXBException e) {
e.printStackTrace();
}
});
}

private XmlRequest getRequest(List<String> identifiers){
XmlRequest request = new XmlRequest();
// Business logic to create req entity
return request;
}

private MyResponse getMyResponse(String s) throws JAXBException {
JAXBContext jaxbContext = JAXBContext.newInstance
(MyResponse.class);
javax.xml.bind.Unmarshaller jaxbUnmarshaller = jaxbContext
.createUnmarshaller();
StringReader reader = new StringReader(s);
return (MyResponse)
jaxbUnmarshaller.unmarshal(reader);
}

my-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 5
core-pool-size-max = 20
}
throughput = 1
}

我在哪里可以改进或更正我的代码,以便 ActorB 能够成功地对 ActorA 发送的所有项目进行 Http POST 调用?

最佳答案

我看到你使用了 http.singleReques

根据akka-http docs

For these cases Akka HTTP offers the Http().singleRequest(...) method, which simply turns an HttpRequest instance into Future[HttpResponse]. Internally the request is dispatched across the (cached) host connection pool for the request’s effective URI.

http.singleRequest 使用连接池来处理请求,因此您需要从 akka http config 增加连接池中的连接数.

在具有此默认值的主机连接池部分:

host-connection-pool {
max-connections = 4
min-connections = 0
max-retries = 5
max-open-requests = 32
pipelining-limit = 1
idle-timeout = 30 s
}

解决方案 2:

使用 http.outgoingConnection

根据akka-http docs它将为每个请求创建一个特定的连接。所以你可以在没有连接池的情况下并行处理 1000 个连接。

With the connection-level API you open a new HTTP connection to a target endpoint by materializing a Flow returned by the Http().outgoingConnection(...) method. Here is an example:

def run(req:String): Unit ={
val apiBaseUrl = "example.com" //without protocol
val path = "/api/update"
val body = HttpEntity(ContentTypes.`application/json`,req.getBytes)
val request = HttpRequest(HttpMethods.POST, path,entity = body)
val connectionFlow = Http().outgoingConnection(apiBaseUrl)
val result = Source.single(request).via(connectionFlow).runWith(Sink.head)
result.onComplete{
case Success(value) =>
println(value)
case Failure(e)=>
e.printStackTrace()
}
}

关于java - 如何在循环中调用 Akka Http POST(1000-10000 次)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54096806/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com