gpt4 book ai didi

java-8 - 默认 ForkJoinPool 执行器花费很长时间

转载 作者:行者123 更新时间:2023-12-02 08:21:57 25 4
gpt4 key购买 nike

我正在使用 CompletableFuture 来异步执行从列表源生成的流。

所以我正在测试 CompletableFuture 的重载方法,即“supplyAsync”,其中一种方法仅采用单个供应商参数,另一种方法采用供应商参数和执行程序参数。这是两者的文档:

一个

supplyAsync(Supplier supplier)

Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.

第二个

supplyAsync(Supplier supplier, Executor executor)

Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier.

这是我的测试类:

public class TestCompleteableAndParallelStream {

public static void main(String[] args) {
List<MyTask> tasks = IntStream.range(0, 10)
.mapToObj(i -> new MyTask(1))
.collect(Collectors.toList());

useCompletableFuture(tasks);

useCompletableFutureWithExecutor(tasks);

}

public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
long start = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
.collect(Collectors.toList());

List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
executor.shutdown();
}

public static void useCompletableFuture(List<MyTask> tasks) {
long start = System.nanoTime();
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
.collect(Collectors.toList());

List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
}



}


class MyTask {
private final int duration;
public MyTask(int duration) {
this.duration = duration;
}
public int calculate() {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(duration * 1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
return duration;
}
}

“useCompletableFuture”方法大约需要 4 秒才能完成,而“useCompletableFutureWithExecutor”方法只需 1 秒即可完成。

不,我的问题是,ForkJoinPool.commonPool() 有哪些不同的处理可以产生开销?难道我们不应该总是更喜欢自定义执行器池而不是 ForkJoinPool 吗?

最佳答案

检查ForkJoinPool.commonPool()大小。默认情况下,它创建一个大小为的池

Runtime.getRuntime().availableProcessors() - 1

我在我的 Intel i7-4800MQ(4 个核心 + 4 个虚拟核心)上运行您的示例,在我的例子中,公共(public)池的大小是 7,因此整个计算花费了约 2000 毫秒:

ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 2005 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

在第二种情况下,您使用了

Executors.newFixedThreadPool(Math.min(tasks.size(), 10));

因此池中有 10 个线程准备执行计算,因此所有任务都在约 1000 毫秒内运行:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1002 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

ForkJoinPoolExecutorService 之间的区别

Eugene他的评论中还提到了一件更重要的事情。 ForkJoinPool 使用工作窃取方法:

A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.

ExecutorService 使用 .newFixedThreadPool() 创建采用分而治之的方法。

如何确定池大小?

有一个关于最佳线程池大小的问题,您可能会在那里找到有用的信息:

Setting Ideal size of Thread Pool

这个帖子也是一个调查的好地方:

Custom thread pool in Java 8 parallel stream

关于java-8 - 默认 ForkJoinPool 执行器花费很长时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45460577/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com