gpt4 book ai didi

java - 将多个 ReactiveX 流合并为一个结果流

转载 作者:行者123 更新时间:2023-11-30 06:08:30 24 4
gpt4 key购买 nike

我试图使用 RxJava 来理解 ReactiveX,但我无法理解整个 Reactive 的想法。我的案例如下:

我有 Task 类。它有 perform() 方法,该方法执行 HTTP 请求并通过 executeRequest() 方法获取响应。该请求可以执行多次(定义的重复次数)。我想获取 executeRequest() 的所有结果并将它们合并到 Flowable 数据流中,这样我就可以在 perform 中返回这个 Flowable () 方法。所以最后我希望我的方法返回我的 Task 执行的请求的所有结果。

executeRequest() 返回 Single 因为它只执行一个请求,并且可能只提供一个响应或根本不提供响应(在超时的情况下)。在perform() 中,我为每次重复创建Flowable 数字范围。订阅此 Flowable 我每次重复都会执行一个请求。我还订阅了每个响应 Single,以便记录响应并将其收集到集合中以供以后使用。现在我有一组 Single ,如何将它们合并到 Flowable 中以在 perform() 中返回它?我尝试使用像 merge() 这样的运算符,但我不理解它的参数类型。

我在网上阅读了一些指南,但它们都非常笼统,或者没有根据我的情况提供示例。

public Flowable<HttpClientResponse> perform() {

Long startTime = System.currentTimeMillis();

List<HttpClientResponse> responses = new ArrayList<>();
List<Long> failedRepetitionNumbers = new ArrayList<>();

Flowable.rangeLong(0, repetitions)
.subscribe(repetition -> {
logger.debug("Performing repetition {} of {}", repetition + 1, repetitions);

Long currentTime = System.currentTimeMillis();

if (durationCap == 0 || currentTime - startTime < durationCap) {

Single<HttpClientResponse> response = executeRequest(method, url, headers, body);

response.subscribe(successResult -> {
logger.info("Received response with code {} in the {}. repetition.", successResult
.statusCode(), repetition + 1);
responses.add(successResult);
},
error -> {
logger.error("Failed to receive response from {}.", url);
failedRepetitionNumbers.add(repetition);
});
waitInterval(minInterval, maxInterval);
} else {
logger.info("Reached duration cap of {}ms for task {}.", durationCap, this);
}
});

return Flowable.merge(???);
}

executeRequest()

private Single<HttpClientResponse> executeRequest(HttpMethod method, String url, LinkedMultiValueMap<String, String>
headers, JsonNode body) {

CompletableFuture<HttpClientResponse> responseFuture = new CompletableFuture<>();

HttpClient client = vertx.createHttpClient();
HttpClientRequest request = client.request(method, url, responseFuture::complete);
headers.forEach(request::putHeader);
request.write(body.toString());
request.setTimeout(timeout);
request.end();

return Single.fromFuture(responseFuture);
}

最佳答案

不要在 perform 方法中订阅每个可观察量(每个 HTTP 请求),只需继续像这样链接可观察量即可。您的代码可以简化为类似的内容。

    public Flowable<HttpClientResponse> perform() {
// Here return a flowable , which can emit n number of times. (where n = your number of HTTP requests)
return Flowable.rangeLong(0, repetitions) // start a counter
.doOnNext(repetition -> logger.debug("Performing repetition {} of {}", repetition + 1, repetitions)) // print the current count
.flatMap(count -> executeRequest(method, url, headers, body).toFlowable()) // get the executeRequest as Flowable
.timeout(durationCap, TimeUnit.MILLISECONDS); // apply a timeout policy
}

最后,您可以在实际需要执行这一切的地方订阅perform,如下所示

             perform()
.subscribeWith(new DisposableSubscriber<HttpClientResponse>() {
@Override
public void onNext(HttpClientResponse httpClientResponse) {
// onNext will be triggered each time, whenever a request has executed and ready with result
// if you had 5 HTTP request, this can trigger 5 times with each "httpClientResponse" (if all calls were success)
}

@Override
public void onError(Throwable t) {
// any error during the execution of these request,
// including a TimeoutException in case timeout happens in between
}

@Override
public void onComplete() {
// will be called finally if no errors happened and onNext delivered all the results
}
});

关于java - 将多个 ReactiveX 流合并为一个结果流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50759639/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com