gpt4 book ai didi

java - 如何使用 Mono & Flux 限制并发 http 请求

转载 作者:行者123 更新时间:2023-12-04 07:14:35 66 4
gpt4 key购买 nike

我要处理Flux限制 Mono 列表发出的并发 HTTP 请求.
当一些请求完成时(收到响应),然后服务请求另一个,直到等待请求的总数为 15。
单个请求返回一个列表并根据结果触发另一个请求。
此时,我想以有限的并发性发送请求。
因为消费者端,太多的 HTTP 请求会使对端服务器陷入困境。
我用了flatMapMany如下所示。

public Flux<JsonNode> syncData() {
return service1
.getData(param1)
.flatMapMany(res -> {
List<Mono<JsonNode>> totalTask = new ArrayList<>();
Map<String, Object> originData = service2.getDataFromDB(param2);
res.withArray("data").forEach(row -> {
String id = row.get("id").asText();
if (originData.containsKey(id)) {
totalTask.add(service1.updateRequest(param3));
} else {
totalTask.add(service1.deleteRequest(param4));
}
originData.remove(id);
});
for (left) {
totalTask.add(service1.createRequest(param5));
}
return Flux.merge(totalTask);
});
}
void syncData() {
syncDataService.syncData().????;
}
我尝试链接 .window(15) ,但它不起作用。所有请求都是同时发送的。
我该如何处理 Flux为了我的目标?

最佳答案

恐怕 Project Reactor 没有提供任何速率或时间限制的实现。
但是,您可以找到一堆提供此类功能并与 Project Reactor 兼容的 3rd 方库。据我所知,resilience4-reactor支持这一点,并且还与 Spring 和 Spring Boot 框架兼容。

The RateLimiterOperator checks if a downstream subscriber/observer can acquire a permission to subscribe to an upstream Publisher. If the rate limit would be exceeded, the RateLimiterOperator could either delay requesting data from the upstream or it can emit a RequestNotPermitted error to the downstream subscriber.

RateLimiter rateLimiter = RateLimiter.ofDefaults("name");
Mono.fromCallable(backendService::doSomething)
.transformDeferred(RateLimiterOperator.of(rateLimiter))
有关 RateLimiter 模块本身的更多信息: https://resilience4j.readme.io/docs/ratelimiter

关于java - 如何使用 Mono & Flux 限制并发 http 请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68856529/

66 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com