gpt4 book ai didi

java - 在 Java8 中并行运行 IO 计算

转载 作者:搜寻专家 更新时间:2023-10-30 21:27:18 25 4
gpt4 key购买 nike

我熟悉函数式编程语言,通常是 Scala 和 Javascript。我正在处理一个 Java8 项目,但不确定我应该如何运行一个项目列表/流,并使用自定义线程池对它们中的每一个并行执行一些副作用,并返回一个对象可以监听是否完成(无论是成功还是失败)。

目前我有以下代码,它似乎可以工作(我正在使用 Play 框架 Promise 实现作为返回)但它似乎并不理想,因为 ForkJoinPool 一开始并不打算用于 IO 密集型计算。

public static F.Promise<Void> performAllItemsBackup(Stream<Item> items) {
ForkJoinPool pool = new ForkJoinPool(3);
ForkJoinTask<F.Promise<Void>> result = pool
.submit(() -> {
try {
items.parallel().forEach(performSingleItemBackup);
return F.Promise.<Void>pure(null);
} catch (Exception e) {
return F.Promise.<Void>throwing(e);
}
});

try {
return result.get();
} catch (Exception e) {
throw new RuntimeException("Unable to get result", e);
}
}

有人可以给我一个上述功能的更惯用的实现吗?理想情况下不使用 ForkJoinPool,使用更标准的返回类型和最新的 Java8 API?不确定我应该在 CompletableFuture、CompletionStage、ForkJoinTask 之间使用什么...

最佳答案

规范的解决方案是

public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
ForkJoinPool pool = new ForkJoinPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new));
} finally {
pool.shutdown();
}
}

请注意,ForkJoin 池和并行流之间的交互是您不应依赖的未指定实现细节。相比之下,CompletableFuture提供了一个专用 API 来提供 Executor .它甚至不必是 ForkJoinPool :

public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new));
} finally {
pool.shutdown();
}
}

在任何一种情况下,您都应该明确关闭执行程序,而不是依赖自动清理。

如果你需要 F.Promise<Void>结果,你可以使用

public static F.Promise<Void> performAllItemsBackup(Stream<Item> items) {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new))
.handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
.join();
} finally {
pool.shutdown();
}
}

但请注意,这与您的原始代码一样,仅在操作完成时返回,而返回 CompletableFuture 的方法允许操作异步运行,直到调用者调用 joinget .

返回一个真正异步的Promise ,你必须包装整个操作,例如

public static F.Promise<Void> performAllItemsBackup(Stream<Item> stream) {
return F.Promise.pure(stream).flatMap(items -> {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new))
.handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
.join();
} finally {
pool.shutdown();
}
});
}

但最好还是选择一种 API,而不是在两种不同的 API 之间来回跳转。

关于java - 在 Java8 中并行运行 IO 计算,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48076098/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com