gpt4 book ai didi

Java parallelStream 不使用预期的线程数

转载 作者:塔克拉玛干 更新时间:2023-11-01 23:04:35 29 4
gpt4 key购买 nike

Java 8 parallelStream 似乎使用了比系统属性 java.util.concurrent.ForkJoinPool.common.parallelism 指定的线程更多的线程。这些单元测试表明,我使用我自己的 ForkJoinPool 使用所需数量的线程来处理任务,但在使用 parallelStream 时,线程数量高于预期。

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertTrue;

public class ParallelStreamTest {

private static final int TOTAL_TASKS = 1000;

@Test
public void testParallelStreamWithParallelism1() throws InterruptedException {
final Integer maxThreads = 1;
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", maxThreads.toString());
List<Integer> objects = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
objects.add(i);
}

final AtomicInteger concurrentThreads = new AtomicInteger(0);
final AtomicInteger taskCount = new AtomicInteger(0);

objects.parallelStream().forEach(i -> {
processTask(concurrentThreads, maxThreads); //expected to be called one at the time
taskCount.addAndGet(1);
});

assertTrue(taskCount.get() == TOTAL_TASKS);
}

@Test
public void testMyOwnForkJoinPoolWithParallelism1() throws InterruptedException {
final Integer threads = 1;
List<Integer> objects = new ArrayList<>();
for (int i = 0; i < TOTAL_TASKS; i++) {
objects.add(i);
}

ForkJoinPool forkJoinPool = new ForkJoinPool(1);
final AtomicInteger concurrentThreads = new AtomicInteger(0);
final AtomicInteger taskCount = new AtomicInteger(0);

forkJoinPool.submit(() -> objects.parallelStream().forEach(i -> {
processTask(concurrentThreads, threads); //expected to be called one at the time
taskCount.addAndGet(1);
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.MINUTES);

assertTrue(taskCount.get() == TOTAL_TASKS);
}

/**
* It simply processes a task increasing first the concurrentThreads count
*
* @param concurrentThreads Counter for threads processing tasks
* @param maxThreads Maximum number of threads that are expected to be used for processing tasks
*/
private void processTask(AtomicInteger concurrentThreads, int maxThreads) {
int currentConcurrentThreads = concurrentThreads.addAndGet(1);
if (currentConcurrentThreads > maxThreads) {
throw new IllegalStateException("There should be no more than " + maxThreads + " concurrent thread(s) but found " + currentConcurrentThreads);
}

// actual processing would go here

concurrentThreads.decrementAndGet();
}
}

应该只有一个线程用于处理任务,因为 ForkJoinPool 有 parallelism=1java.util.concurrent.ForkJoinPool.common.parallelism=1。因此,这两个测试都应该通过,但 testParallelStreamWithParallelism1 失败并显示:

java.lang.IllegalStateException: There should be no more than 1 concurrent thread(s) but found 2

似乎设置 java.util.concurrent.ForkJoinPool.common.parallelism=1 没有按预期工作,同时处理了超过 1 个并发任务。

有什么想法吗?

最佳答案

Fork/Join 池的并行度设置决定了池工作线程的数量,但是由于调用者线程,例如主线程也将处理作业,使用公共(public)池时总会有一个线程。这就是为什么 default setting of the common pool is “number of cores minus one”获得等于核心数的实际工作线程数。

对于自定义的 Fork/Join 池,流操作的调用者线程已经是池的工作线程,因此,利用它来处理作业不会增加实际工作线程数。

必须强调的是,Stream 实现和 Fork/Join 池之间的交互是完全未指定的,因为流在​​幕后使用 Fork/Join 框架是一个实现细节。不保证更改默认池的属性对流有任何影响,也不保证从自定义 Fork/Join 池的任务中调用流操作将使用该自定义池。

关于Java parallelStream 不使用预期的线程数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42515202/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com