gpt4 book ai didi

java - 返回一个包含 CompletableFuture 列表的 CompletableFuture

转载 作者:行者123 更新时间:2023-12-02 09:14:30 30 4
gpt4 key购买 nike

我正在尝试加快对多个 API 的调用。

在下面的代码中,getFilteredEvents是当前同步版本。我有这样的感觉 map(x -> x.getFilteredEvents(eventResearch))操作将等待每个 API 的响应(它在内部使用 RestTemplate.exchange()),然后再传递到下一个 API 以构建 List<Event>我想回来。解决方案可能是启动 map调用单独的线程,但我想尝试 CompletableFuture API。

因此,getFilteredEventsFaster是我努力缩短响应时间的结果。

@Service
public class EventsResearchService {

@Autowired
private List<UniformEventsResearchApi> eventsResearchApis;

// this works, but I'm trying to improve it
public EventResearchResponse getFilteredEvents(EventResearch eventResearch) {
List<Event> eventsList = eventsResearchApis
.stream()
.map(x -> x.getFilteredEvents(eventResearch))
.flatMap(List::stream)
.collect(Collectors.toList());

return extractResponse(eventResearch, eventsList);
}

// this doesn't work yet: what is wrong?
public CompletableFuture<List<Event>> getFilteredEventsFaster(EventResearch eventResearch) {
List<CompletableFuture<List<Event>>> futureEventsList = eventsResearchApis
.parallelStream()
.map(x -> CompletableFuture.supplyAsync(() -> x.getFilteredEvents(eventResearch)))
.collect(Collectors.toList());

return CompletableFuture.allOf(futureEventsList.toArray(new CompletableFuture<List<Event>>[0]));
}
}

我的理解是我想发送 CompletableFuture<List<Event>>回到我的前端,而不是 List<CompletableFuture<List<Event>>> ,因此 CompletableFuture.allOf()调用(如果我理解正确的话,它类似于 flatmap 操作,从多个 CompletableFuture 创建 CompleteableFuture )。

不幸的是,事实上,我得到了 Generic array creation使用new CompletableFuture<List<Event>>[0]时出现编译错误.

我做错了什么?

我有一种感觉,使用join方法确实允许我收集所有答案,但这将是我的服务线程上的阻塞操作,不是吗? (如果我理解正确的话,这将违背试图将 CompletableFuture 返回到我的前端的目的。)

最佳答案

以下代码片段展示了如何使用listOfFutures.stream().map(CompletableFuture::join)来收集allOF的结果。我从 this page 中获取了这个例子这表明它不会等待每个 Future 完成。

class Test {

public static void main(String[] args) throws Exception {

long millisBefore = System.currentTimeMillis();

List<String> strings = Arrays.asList("1","2", "3", "4", "5", "6", "7", "8");
List<CompletableFuture<String>> listOfFutures = strings.stream().map(Test::downloadWebPage).collect(toList());
CompletableFuture<List<String>> futureOfList = CompletableFuture
.allOf(listOfFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> listOfFutures.stream().map(CompletableFuture::join).collect(toList()));

System.out.println(futureOfList.get()); // blocks here
System.out.printf("time taken : %.4fs\n", (System.currentTimeMillis() - millisBefore)/1000d);
}

private static CompletableFuture<String> downloadWebPage(String webPageLink) {
return CompletableFuture.supplyAsync( () ->{
try { TimeUnit.SECONDS.sleep(4); }
catch (Exception io){ throw new RuntimeException(io); }
finally { return "downloaded : "+ webPageLink; }
});
}

}

由于效率似乎是这里的问题,因此我添加了一个虚拟基准来证明执行不需要 32 秒。

输出:

[downloaded : 1, downloaded : 2, downloaded : 3, downloaded : 4, downloaded : 5, downloaded : 6, downloaded : 7, downloaded : 8]
time taken : 8.0630s
<小时/>

编辑原始问题海报

感谢这个答案,并通过使用 this website (讨论与 allOf 相关的异常处理),我想出了这个完整的版本:

    public CompletableFuture<List<Event>> getFilteredEventsFaster(EventResearch eventResearch) {

/* Collecting the list of all the async requests that build a List<Event>. */
List<CompletableFuture<List<Event>>> completableFutures = eventsResearchApis.stream()
.map(api -> getFilteredEventsAsync(api, eventResearch))
.collect(Collectors.toList());

/* Creating a single Future that contains all the Futures we just created ("flatmap"). */
CompletableFuture<Void> allFutures =CompletableFuture.allOf(completableFutures
.toArray(new CompletableFuture[eventsResearchApis.size()]));

/* When all the Futures have completed, we join them to create merged List<Event>. */
CompletableFuture<List<Event>> allCompletableFutures = allFutures
.thenApply(future -> completableFutures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream) // creating a List<Event> from List<List<Event>>
.collect(Collectors.toList())
);

return allCompletableFutures;
}

private CompletableFuture<List<Event>> getFilteredEventsAsync(UniformEventsResearchApi api,
EventResearch eventResearch) {
/* Manage the Exceptions here to ensure the wrapping Future returns the other calls. */
return CompletableFuture.supplyAsync(() -> api.getFilteredEvents(eventResearch))
.exceptionally(ex -> {
LOGGER.error("Extraction of events from API went wrong: ", ex);
return Collections.emptyList(); // gets managed in the wrapping Future
});
}

关于java - 返回一个包含 CompletableFuture 列表的 CompletableFuture,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59108125/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com