gpt4 book ai didi

java - SupplyAsync 等待所有 CompletableFutures 完成

转载 作者:行者123 更新时间:2023-12-02 19:10:39 32 4
gpt4 key购买 nike

我正在运行下面的一些异步任务,需要等待它们全部完成。我不确定为什么,但是 join() 不会强制等待所有任务,并且代码会继续执行而无需等待。连接流未按预期工作是否有原因?

CompletableFutures列表只是一个映射supplyAsync的流

List<Integer> items = Arrays.asList(1, 2, 3);

List<CompletableFuture<Integer>> futures = items
.stream()
.map(item -> CompletableFuture.supplyAsync(() -> {

System.out.println("processing");
// do some processing here
return item;

}))
.collect(Collectors.toList());

我等待着 future 。

CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(ignored -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));

我可以使用 futures.forEach(CompletableFuture::join); 来等待,但我想知道为什么我的流方法不起作用。

最佳答案

这段代码:

CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(ignored -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));

是否等待futures中的所有futures完成。它的作用是创建一个新的 future,它将等待 futures 中的所有异步执行在其本身完成之前完成(但在所有这些 future 完成之前不会阻塞)。当这个 allOf future 完成时,您的 thenApply 代码就会运行。但是 allOf() 将立即返回,不会阻塞。

这意味着代码中的 futures.stream().map(CompletableFuture::join).collect(Collectors.toList()) 仅在所有异步执行之后运行完成,这违背了你的目的。 join() 调用都将立即返回。但这还不是更大的问题。您面临的挑战是 allOf().thenApply() 不会等待异步执行完成。它只会创造另一个不会阻碍的 future 。

最简单的解决方案是使用第一个管道并映射到整数列表:

List<Integer> results = items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> {

System.out.println("processing " + item);
// do some processing here
return item;

}))
.collect(Collectors.toList()) //force-submit all
.stream()
.map(CompletableFuture::join) //wait for each
.collect(Collectors.toList());

如果您想使用类似于原始代码的内容,那么您的第二个代码段必须更改为:

List<Integer> reuslts = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());

那是因为CompletableFuture.allOf不会等待,它只是将所有 future 组合成一个新的 future,并在全部完成时完成:

Returns a new CompletableFuture that is completed when all of the given CompletableFutures complete.

或者,您仍然可以将 allOf()join() 一起使用,然后运行当前的 thenApply() 代码:

//wrapper future completes when all futures have completed
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.join();

//join() calls below return immediately
List<Integer> result = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());

最后一个语句中的 join() 调用立即返回,因为包装器上的 join() 调用 (allOf()) future 将等待传递给它的所有 future 完成。这就是为什么当您可以使用第一种方法时我不认为这样做的原因。

关于java - SupplyAsync 等待所有 CompletableFutures 完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64323232/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com