gpt4 book ai didi

java - ParallelStream 中线程的大小大于 cpu 核心数

转载 作者:行者123 更新时间:2023-12-02 23:50:02 29 4
gpt4 key购买 nike

默认情况下,parallelStream 内的 commonPool 大小应为 cpu_cores - 1

但是,在我的应用程序中,它始终大于硬件 cpu_cores

VisualVM 截图:

enter image description here

很困惑,我已经搜索了一半的互联网,但找不到答案。

我的配置:

Runtime.getRuntime().availableProcessors()=12

java.util.concurrent.ForkJoinPool.common.parallelism=null(默认设置)

我的代码:

            final CountDownLatch countDownLatch = new CountDownLatch(tempList.size());
tempList.parallelStream().forEach(om -> {
countDownLatch.countDown();
redisReBloomService.add(config.getRedisKey(), om.getChannelNo());
});
countDownLatch.await();

此外,我尝试过自定义池设置,但它也不起作用 -

ForkJoinPool forkJoinPool = new ForkJoinPool(3);  
forkJoinPool.submit(() -> {
tempList.parallelStream().forEach(om -> {
countDownLatch.countDown();
redisReBloomService.add(config.getRedisKey(), om.getChannelNo());
}).get();
});

一些信息: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html Custom thread pool in Java 8 parallel stream

最佳答案

ForkJoinPool 中的并行性不是池中的最大线程数。它是 Activity 线程的目标。如果某些线程被阻塞,池可能会启动新线程以达到所需的并行度。

摘自ForkJoinPool的文档:

The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others. However, no such adjustments are guaranteed in the face of blocked I/O or other unmanaged synchronization. The nested ForkJoinPool.ManagedBlocker interface enables extension of the kinds of synchronization accommodated.

屏幕截图显示,当其他线程切换到状态Monitor时,新线程恰好在同一时间启动。 (粉红色的)。我的猜测是redisReBloomService.add(…)调用使用 ManagedBlocker当它必须在该监视器上等待时,会在内部导致池启动更多工作线程。

这是一个使用ManagedBlocker的小例子这表明了您观察到的类似行为。当 ManagedBlocker hibernate 1 秒,通常可以在 VisualVM 中观察到一个新的工作线程。

public class ForkJoinPoolTest {

@Test
public void testManagedBlocker() throws InterruptedException {
// wait to be able to connect with VisualVM
Thread.sleep(10_000);

IntStream.range(0, 100).parallel().peek(number -> {
doWork();

// Run a managed blocker some times.
// Every time it blocks, a new worker thread might be started.
if (ThreadLocalRandom.current().nextInt(10) == 0) {
try {
ForkJoinPool.managedBlock(new ManagedBlocker() {
@Override
public boolean block() throws InterruptedException {
Thread.sleep(1_000);
return true;
}

@Override
public boolean isReleasable() {
return false;
}
});
} catch (InterruptedException ignored) { }
}
})
.sum();
}

/** Some CPU bound workload **/
void doWork() {
for (int i = 0; i < 1_000_000; i++) {
Math.random();
}
}
}

关于java - ParallelStream 中线程的大小大于 cpu 核心数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59466375/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com