gpt4 book ai didi

java-8 - 如何为 Java 8 并行流指定 ForkJoinPool?

转载 作者:行者123 更新时间:2023-12-01 13:36:03 29 4
gpt4 key购买 nike

据我所知,并行流使用默认值 ForkJoinPool.commonPool默认情况下,它比您的处理器少一个线程。我想使用我自己的自定义线程池。

像这样:

@Test
public void stream() throws Exception {
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (Exception e) {
}
return item * 10;
})).get().collect(Collectors.toList());
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);
}

结果:
enter image description here

我的定制 ForkJoinPool从不使用。
我像这样更改默认并行度:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

它运行良好 - 任务仅花费约 1 秒。

在我的应用程序中,任务包含大量 IO 操作(从 db 读取数据)。
所以我需要更高的并行度,但我不想更改 JVM 属性。

那么指定我自己的 ForkJoinPool 的正确方法是什么? ?

或者如何在 IO 密集型情况下使用并行流?

最佳答案

流是懒惰的;当您开始终端操作时,所有工作都已完成。在你的情况下,终端操作是 .collect(Collectors.toList()) ,您在 main 中调用它关于 get() 的结果的线程.因此,实际工作的完成方式与您在 main 中构建整个流的方式相同。线。

为了让您的池生效,您必须将终端操作移动到提交的任务中:

ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Arrays.asList(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
}).collect(Collectors.toList())).join();
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);

我们还可以通过在 main中构造流来证明终端操作的相关性。线程并且只向池提交终端操作:
Stream<Integer> stream = testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
});
List<Integer> result = pool.submit(() -> stream.collect(Collectors.toList())).join();

但您应该记住,这是未记录的行为,无法保证。实际的答案必须是当前形式的 Stream API,没有线程控制(也没有处理检查异常的帮助),不适合并行 I/O 操作。

关于java-8 - 如何为 Java 8 并行流指定 ForkJoinPool?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52287717/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com