gpt4 book ai didi

java - 如何将 1 个 completablefuture 划分为流中的多个 completablefuture?

转载 作者:搜寻专家 更新时间:2023-11-01 03:18:22 26 4
gpt4 key购买 nike

比如我有这样的方法:

public CompletableFuture<Page> getPage(int i) {
...
}
public CompletableFuture<Document> getDocument(int i) {
...
}
public CompletableFuture<Void> parseLinks(Document doc) {
...
}

还有我的流程:

List<CompletableFuture> list = IntStream
.range(0, 10)
.mapToObj(i -> getPage(i))

// I want method like this:
.thenApplyAndSplit(CompletableFuture<Page> page -> {
List<CompletableFuture<Document>> docs = page.getDocsId()
.stream()
.map(i -> getDocument(i))
.collect(Collectors.toList());
return docs;
})
.map(CompletableFuture<Document> future -> {
return future.thenApply(Document doc -> parseLink(doc);
})
.collect(Collectors.toList());

对于 CompletableFuture 应该是 flatMap() 这样的东西,所以我想实现这个流程:

List<Integer> -> Stream<CompletableFuture<Page>>
-> Stream<CompletableFuture<Document>>
-> parse each

更新

Stream<CompletableFuture<Page>> pagesCFS = IntStream
.range(0, 10)
.mapToObj(i -> getPage(i));

Stream<CompletableFuture<Document>> documentCFS = listCFS.flatMap(page -> {
// How to return stream of Document when page finishes?
// page.thenApply( ... )
})

最佳答案

我还想尝试实现 Spliterator对于 Stream<CompletableFuture> ,所以这是我的尝试。

此解决方案创建一个 Stream future 的结果,一旦这些 future 中的任何一个完成。当然,这会丢失原始流中存在的任何顺序。

原始流将被该方法立即消耗,因此触发所有元素的整个管道——从而失去原始流管道的惰性。假设构建流是快速的,繁重的工作由 futures 自己完成,因此消耗流不应该是昂贵的。这也确保所有任务都已触发,因为它强制处理源流。

下面是实现:

public static <T> Stream<T> flattenStreamOfFutures(Stream<CompletableFuture<? extends T>> stream, boolean parallel) {
return StreamSupport.stream(new CompletableFutureSpliterator<T>(stream), parallel);
}

public static <T> Stream<T> flattenStreamOfFuturesOfStream(Stream<CompletableFuture<? extends Stream<T>>> stream,
boolean parallel) {
return flattenStreamOfFutures(stream, parallel).flatMap(Function.identity());
}

public static class CompletableFutureSpliterator<T> implements Spliterator<T> {
private List<CompletableFuture<? extends T>> futures;

CompletableFutureSpliterator(Stream<CompletableFuture<? extends T>> stream) {
futures = stream.collect(Collectors.toList());
}

CompletableFutureSpliterator(CompletableFuture<T>[] futures) {
this.futures = new ArrayList<>(Arrays.asList(futures));
}

CompletableFutureSpliterator(final List<CompletableFuture<? extends T>> futures) {
this.futures = new ArrayList<>(futures);
}

@Override
public boolean tryAdvance(final Consumer<? super T> action) {
if (futures.isEmpty())
return false;
CompletableFuture.anyOf(futures.stream().toArray(CompletableFuture[]::new)).join();
// now at least one of the futures has finished, get its value and remove it
ListIterator<CompletableFuture<? extends T>> it = futures.listIterator(futures.size());
while (it.hasPrevious()) {
final CompletableFuture<? extends T> future = it.previous();
if (future.isDone()) {
it.remove();
action.accept(future.join());
return true;
}
}
throw new IllegalStateException("Should not reach here");
}

@Override
public Spliterator<T> trySplit() {
if (futures.size() > 1) {
int middle = futures.size() >>> 1;
// relies on the constructor copying the list, as it gets modified in place
Spliterator<T> result = new CompletableFutureSpliterator<>(futures.subList(0, middle));
futures = futures.subList(middle, futures.size());
return result;
}
return null;
}

@Override
public long estimateSize() {
return futures.size();
}

@Override
public int characteristics() {
return IMMUTABLE | SIZED | SUBSIZED;
}
}

它通过转换给定的 Stream<CompletableFuture<T>> 来工作进入List这些 future 。

为了生成输出流,它只是等待任何 future 完成,然后再传输其值。

一个简单的非并行使用示例(执行器用于 CompletableFuture s,以便同时启动它们):

ExecutorService executor = Executors.newFixedThreadPool(20);
long start = System.currentTimeMillis();
flattenStreamOfFutures(IntStream.range(0, 20)
.mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((i % 10) * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
System.out.println("Finished " + i + " @ " + (System.currentTimeMillis() - start) + "ms");
return i;
}, executor)), false)
.forEach(x -> {
System.out.println(Thread.currentThread().getName() + " @ " + (System.currentTimeMillis() - start) + "ms handle result: " + x);
});
executor.shutdown();

输出:

Finished 10 @ 103ms
Finished 0 @ 105ms
main @ 114ms handle result: 10
main @ 114ms handle result: 0
Finished 1 @ 1102ms
main @ 1102ms handle result: 1
Finished 11 @ 1104ms
main @ 1104ms handle result: 11
Finished 2 @ 2102ms
main @ 2102ms handle result: 2
Finished 12 @ 2104ms
main @ 2105ms handle result: 12
Finished 3 @ 3102ms
main @ 3102ms handle result: 3
Finished 13 @ 3104ms
main @ 3105ms handle result: 13

如您所见,流几乎立即生成值,即使 futures 没有按顺序完成。

将其应用于问题中的示例,这将给出(假设 parseLinks() 返回 CompletableFuture<String> 而不是 ~<Void> ):

flattenStreamOfFuturesOfStream(IntStream.range(0, 10)
.mapToObj(this::getPage)
// the next map() will give a Stream<CompletableFuture<Stream<String>>>
// hence the need for flattenStreamOfFuturesOfStream()
.map(pcf -> pcf
.thenApply(page -> flattenStreamOfFutures(page
.getDocsId()
.stream()
.map(this::getDocument)
.map(docCF -> docCF.thenCompose(this::parseLinks)),
false))),
false)
.forEach(System.out::println);

请注意,如果您在 并行 模式下使用它,请注意使用不同的 ForkJoinPool对于在 CompletableFuture 后面运行的流和任务的。流将等待 futures 完成,因此如果它们共享同一个执行程序,甚至会遇到死锁,你实际上可能会降低性能。 – 编辑:我认为这是不正确的。 ForkJoinPool应该能够处理任何阻塞的线程,并相应地增加线程数。

关于java - 如何将 1 个 completablefuture 划分为流中的多个 completablefuture?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39792291/

26 4 0