gpt4 book ai didi

java - Java流在完成上一个 map 之前开始下一个 map

转载 作者:行者123 更新时间:2023-12-03 13:22:48 24 4
gpt4 key购买 nike

我有2个执行者:

ExecutorService executorDownload = Executors.newFixedThreadPool(n);
ExecutorService executorCalculate = Executors.newFixedThreadPool(m);
首先,我需要将任务放入executorDownload中,然后在它们强制执行之后将它们放入executorCalculate中,然后得到结果。我写了下一个流:
long count = IntStream.range(0, TASK_NUMBER)
.boxed()
.parallel()
.map(i -> executorDownload.submit(new Download(i)))
.map(future -> calculateResultFuture(executorCalculate, future))
.filter(Objects::nonNull)
.filter(Main::isFutureCalculated)
.count();

public static Future<CalculateResult> calculateResultFuture(ExecutorService executorCalculate, Future<DownloadResult> future) {
try {
return executorCalculate.submit(new Calculate(future.get()));
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
}

public static boolean isFutureCalculated(Future<CalculateResult> future) {
try {
return future.get().found;
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return false;
}
是否有可能开始
.map(future -> calculateResultFuture(executorCalculate, future))
.map(i -> executorDownload.submit(new Download(i)))
结束了。在第一张 map 开始后,我需要第二张 map 开始。

最佳答案

您只需要了解流将处理元素一个接一个地流,这意味着一个元素在下一个进入第一个中间步骤之前就遍历整个管道(而不是所有元素一起通过每个步骤,当然对于需要所有步骤的中间步骤除外)元素才能完成工作)。
在您的情况下,这意味着第一个元素的.get()将被阻止,从而阻止第二个元素进入第一个任务。
要强制所有元素通过第一个提交(也将适用于第二个提交),您需要在开始阻止之前强制流提交所有任务,例如:

List<Future<DownloadResult>> downloadTasks = IntStream.range(0, TASK_NUMBER)
.mapToObj(i -> executorDownload.submit(new Download(i)))
.collect(Collectors.toList());
//removed .parallel()
这将强制启动所有异步任务,然后您可以对第二个异步批处理执行相同的操作:
List<Future<CalculateResult>> calculateResults = downloadTasks.stream()
.map(future -> calculateResultFuture(executorCalculate, future))
.filter(Objects::nonNull)
.collect(Collectors.toList());
那也将迫使所有任务都提交给第二个执行者。在这里,您可以 .get()而不进行不必要的等待:
long count = calculateResults.stream()
.filter(Main::isFutureCalculated)
.count();

现在,尽管这将消除批处理中元素之间不必要的等待,但仍可能在批处理之间存在不必要的等待(如果第一个元素完成了第一个任务,它将等待所有其余的操作完成第一个批处理,然后再继续第二个批处理)。为了解决这个问题,您可能需要其他实现。这是为此目的而设计的一系列可完成的 future :
List<Completable<CalculateResult>> calculateResult = IntStream.range(0, TASK_NUMBER)
.mapToObj(i -> CompletableFuture.supplyAsync(() -> callDownload(i), executorDownload)
.thenApplyAsync(downloadResult -> calculateResultFuture(downloadResult), executorCalculate))
.collect(Collectors.toList());

long count = calculateResult.stream().map(f -> isFutureCalculated(f)).count();
当第一个任务完成时, thenApplyAsync将使第二个任务接管每个元素。
当然,这将需要您稍微更改API,以便直接调用下载方法。我使用 callDownload(i)来运行 new Download(i).call()所运行的相同逻辑。 calculateResultFuture()也将更改为删除 Future参数。

关于java - Java流在完成上一个 map 之前开始下一个 map ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66195815/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com