gpt4 book ai didi

java-8 - 如何在链式 CompletableFuture 中扇出?

转载 作者:行者123 更新时间:2023-12-04 23:36:19 31 4
gpt4 key购买 nike

我想链接一个 CompletableFuture 以便它在处理过程中散开。我的意思是我有一个针对列表的开放 CompletableFuture,我想对该列表中的每个项目应用计算。

第一步是调用发出异步调用的 m_myApi.getResponse(request, executor)。

该异步调用的结果有一个 getCandidates 方法。我想并行解析所有这些候选人。

目前,我的代码会依次解析它们

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
return candidates.thenApplyAsync(response -> response.getCandidates()
.stream()
.map(MyParser::ParseCandidates)
.collect(Collectors.toList()));
}

我想要这样的东西:
public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
return candidates.thenApplyAsync(response -> response.getCandidates()
.stream()
.PARSE_IN_PARALLEL_USING_EXECUTOR
}

最佳答案

this answer 中所述,如果 Executor碰巧是 Fork/Join 池,有一个(未记录的)功能,即在其工作线程之一中启动并行流将使用该执行程序执行并行操作。

当您想支持任意时Executor实现,事情更复杂。一种解决方案看起来像

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(
@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
return candidates.thenComposeAsync(
response -> {
List<CompletableFuture<DOMAIN_OBJECT>> list = response.getCandidates()
.stream()
.map(CompletableFuture::completedFuture)
.map(f -> f.thenApplyAsync(MyParser::ParseCandidates, executor))
.collect(Collectors.toList());
return CompletableFuture.allOf(list.toArray(new CompletableFuture<?>[0]))
.thenApplyAsync(x ->
list.stream().map(CompletableFuture::join).collect(Collectors.toList()),
executor);
},
executor);
}

第一个至关重要的事情是,我们必须在开始等待任何作业之前提交所有潜在的异步作业,以启用执行程序可能支持的最大并行度。因此,我们必须收集 List 中的所有 future 。在第一步。

在第二步中,我们可以遍历列表和 join所有 future 。如果 executor 是一个 Fork/Join 池并且 future 尚未完成,它会检测到这一点并启动一个补偿线程以重新获得配置的并行度。但是,对于任意执行器,我们不能假设这样的特性。最值得注意的是,如果执行程序是单线程执行程序,这可能会导致死锁。

因此,该解决方案使用 CompletableFuture.allOf只有当所有 future 都已经完成时,才执行迭代和加入所有 future 的操作。因此,该解决方案永远不会阻塞执行程序的线程,使其与任何 Executor 兼容。执行。

关于java-8 - 如何在链式 CompletableFuture 中扇出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52998449/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com