gpt4 book ai didi

java - 如何在 CompletableFutures 中收集成功和错误?

转载 作者:行者123 更新时间:2023-12-05 07:37:30 24 4
gpt4 key购买 nike

我想向网络服务并行发送多个请求。结果应该通过success + error来收集,然后可以由调用者进一步分析。

public Map.Entry<Rsp, Errors> sendApiRequests(List<Req> reqs) {
//will mostly remain null as errors won't occur frequently
List<Rsp> errors = null;

List<CompletableFuture<Rsp>> futures =
reqs.stream()
.map(req -> CompletableFuture.supplyAsync(() -> send(req))
.exceptionally(ex -> {
//TODO this fails, because list should be final for it.
//but don't want to instantiate as mostly will remain just null
if (errors == null) errors = new ArrayList<>();
errors.add(req);
}))
.collect(Collectors.toList());

//send api requests in parallel
List<Rsp> responses = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());

//TODO how to collect the errors? each error should also provide the underlying Req that caused the failure.
//pending requests should not be aborted if any throwns an exception
return new SimpleEntry(responses, errors);
}

问题:我如何收集所有响应,同时收集 send() 方法期间抛出的异常?

我的目标不是返回两个列表:一个包含所有成功的响应,另一个包含错误。

最佳答案

虽然这是一个老问题,但我需要类似的东西,并且能够从上面的评论和其他不太正确的互联网片段中拼凑出一个可行的解决方案。

方法和 CompletableFutureCollector 类将返回每个请求的响应或错误列表。这是在 Java 11 中实现的,但应该适用于 Java 8。我建议对其进行调整以传入 java.util.concurrent.Executor 以控制并行度。例如,您可以像这样使用它:

    final List<CompletableFutureCollector.CollectorResult<Rsp>> results =
sendApiRequests(List.of(new Req()));
results.stream()
.filter(CompletableFutureCollector.CollectorResult::hasError)
.map(CompletableFutureCollector.CollectorResult::getError)
.forEach(error -> {
// Do something with errors
});
results.stream()
.filter(CompletableFutureCollector.CollectorResult::hasResult)
.map(CompletableFutureCollector.CollectorResult::getResult)
.forEach(rsp -> {
// Do something with responses
});
    public List<CompletableFutureCollector.CollectorResult<Rsp>> sendApiRequests(List<Req> reqs) {
// The actual send implementation could be anything that you'd like to do asynchronously
return CompletableFutureCollector.mapAsyncAndCollectResult(reqs, req -> send(req));
}

// ...

private final static class CompletableFutureCollector {
private CompletableFutureCollector() {
}

public static <X, T extends CompletableFuture<X>> Collector<T, ?, CompletableFuture<List<X>>> collectResult() {
return Collectors.collectingAndThen(Collectors.toList(), joinResult());
}

private static <X, T extends CompletableFuture<X>> Function<List<T>, CompletableFuture<List<X>>> joinResult() {
return futures -> allOf(futures)
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}

private static <T extends CompletableFuture<?>> CompletableFuture<Void> allOf(final List<T> futures) {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

public static <T, R> List<CompletableFutureCollector.CollectorResult<R>> mapAsyncAndCollectResult(
final Collection<T> items, final Function<T, R> action) {
return items.parallelStream()
.map(task -> CompletableFuture.supplyAsync(() -> action.apply(task)))
.map(CompletableFutureCollector.CollectorResult::handle)
.collect(CompletableFutureCollector.collectResult())
.join();
}

private final static class CollectorResult<R> {
private final R result;
private final Throwable error;

private CollectorResult(final R result, final Throwable error) {
this.result = result;
this.error = error;
}

public boolean hasError() {
return getError() != null;
}

public boolean hasResult() {
return !hasError();
}

public R getResult() {
return result;
}

public Throwable getError() {
return error instanceof CompletionException ? error.getCause() : error;
}

public static <R> CompletableFuture<CompletableFutureCollector.CollectorResult<R>> handle(final CompletableFuture<R> future) {
return future.handle(CompletableFutureCollector.CollectorResult::new);
}
}
}

注意:我还没有完全测试这个解决方案,因为它改编自一个具有一些额外的快速失败取消逻辑的工作实现。

关于java - 如何在 CompletableFutures 中收集成功和错误?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48562846/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com