- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
Java 8 parallelStream 似乎使用了比系统属性 java.util.concurrent.ForkJoinPool.common.parallelism 指定的线程更多的线程。这些单元测试表明,我使用我自己的 ForkJoinPool 使用所需数量的线程来处理任务,但在使用 parallelStream 时,线程数量高于预期。
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertTrue;
public class ParallelStreamTest {
private static final int TOTAL_TASKS = 1000;
@Test
public void testParallelStreamWithParallelism1() throws InterruptedException {
final Integer maxThreads = 1;
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", maxThreads.toString());
List<Integer> objects = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
objects.add(i);
}
final AtomicInteger concurrentThreads = new AtomicInteger(0);
final AtomicInteger taskCount = new AtomicInteger(0);
objects.parallelStream().forEach(i -> {
processTask(concurrentThreads, maxThreads); //expected to be called one at the time
taskCount.addAndGet(1);
});
assertTrue(taskCount.get() == TOTAL_TASKS);
}
@Test
public void testMyOwnForkJoinPoolWithParallelism1() throws InterruptedException {
final Integer threads = 1;
List<Integer> objects = new ArrayList<>();
for (int i = 0; i < TOTAL_TASKS; i++) {
objects.add(i);
}
ForkJoinPool forkJoinPool = new ForkJoinPool(1);
final AtomicInteger concurrentThreads = new AtomicInteger(0);
final AtomicInteger taskCount = new AtomicInteger(0);
forkJoinPool.submit(() -> objects.parallelStream().forEach(i -> {
processTask(concurrentThreads, threads); //expected to be called one at the time
taskCount.addAndGet(1);
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.MINUTES);
assertTrue(taskCount.get() == TOTAL_TASKS);
}
/**
* It simply processes a task increasing first the concurrentThreads count
*
* @param concurrentThreads Counter for threads processing tasks
* @param maxThreads Maximum number of threads that are expected to be used for processing tasks
*/
private void processTask(AtomicInteger concurrentThreads, int maxThreads) {
int currentConcurrentThreads = concurrentThreads.addAndGet(1);
if (currentConcurrentThreads > maxThreads) {
throw new IllegalStateException("There should be no more than " + maxThreads + " concurrent thread(s) but found " + currentConcurrentThreads);
}
// actual processing would go here
concurrentThreads.decrementAndGet();
}
}
应该只有一个线程用于处理任务,因为 ForkJoinPool 有 parallelism=1
和 java.util.concurrent.ForkJoinPool.common.parallelism=1
。因此,这两个测试都应该通过,但 testParallelStreamWithParallelism1 失败并显示:
java.lang.IllegalStateException: There should be no more than 1 concurrent thread(s) but found 2
似乎设置 java.util.concurrent.ForkJoinPool.common.parallelism=1 没有按预期工作,同时处理了超过 1 个并发任务。
有什么想法吗?
最佳答案
Fork/Join 池的并行度设置决定了池工作线程的数量,但是由于调用者线程,例如主线程也将处理作业,使用公共(public)池时总会有一个线程。这就是为什么 default setting of the common pool is “number of cores minus one”获得等于核心数的实际工作线程数。
对于自定义的 Fork/Join 池,流操作的调用者线程已经是池的工作线程,因此,利用它来处理作业不会增加实际工作线程数。
必须强调的是,Stream 实现和 Fork/Join 池之间的交互是完全未指定的,因为流在幕后使用 Fork/Join 框架是一个实现细节。不保证更改默认池的属性对流有任何影响,也不保证从自定义 Fork/Join 池的任务中调用流操作将使用该自定义池。
关于Java parallelStream 不使用预期的线程数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42515202/
我是一名优秀的程序员,十分优秀!