- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
Java 8 parallelStream 似乎使用了比系统属性 java.util.concurrent.ForkJoinPool.common.parallelism 指定的线程更多的线程。这些单元测试表明,我使用我自己的 ForkJoinPool 使用所需数量的线程来处理任务,但在使用 parallelStream 时,线程数量高于预期。
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertTrue;
public class ParallelStreamTest {
private static final int TOTAL_TASKS = 1000;
@Test
public void testParallelStreamWithParallelism1() throws InterruptedException {
final Integer maxThreads = 1;
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", maxThreads.toString());
List<Integer> objects = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
objects.add(i);
}
final AtomicInteger concurrentThreads = new AtomicInteger(0);
final AtomicInteger taskCount = new AtomicInteger(0);
objects.parallelStream().forEach(i -> {
processTask(concurrentThreads, maxThreads); //expected to be called one at the time
taskCount.addAndGet(1);
});
assertTrue(taskCount.get() == TOTAL_TASKS);
}
@Test
public void testMyOwnForkJoinPoolWithParallelism1() throws InterruptedException {
final Integer threads = 1;
List<Integer> objects = new ArrayList<>();
for (int i = 0; i < TOTAL_TASKS; i++) {
objects.add(i);
}
ForkJoinPool forkJoinPool = new ForkJoinPool(1);
final AtomicInteger concurrentThreads = new AtomicInteger(0);
final AtomicInteger taskCount = new AtomicInteger(0);
forkJoinPool.submit(() -> objects.parallelStream().forEach(i -> {
processTask(concurrentThreads, threads); //expected to be called one at the time
taskCount.addAndGet(1);
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.MINUTES);
assertTrue(taskCount.get() == TOTAL_TASKS);
}
/**
* It simply processes a task increasing first the concurrentThreads count
*
* @param concurrentThreads Counter for threads processing tasks
* @param maxThreads Maximum number of threads that are expected to be used for processing tasks
*/
private void processTask(AtomicInteger concurrentThreads, int maxThreads) {
int currentConcurrentThreads = concurrentThreads.addAndGet(1);
if (currentConcurrentThreads > maxThreads) {
throw new IllegalStateException("There should be no more than " + maxThreads + " concurrent thread(s) but found " + currentConcurrentThreads);
}
// actual processing would go here
concurrentThreads.decrementAndGet();
}
}
应该只有一个线程用于处理任务,因为 ForkJoinPool 有 parallelism=1
和 java.util.concurrent.ForkJoinPool.common.parallelism=1
。因此,这两个测试都应该通过,但 testParallelStreamWithParallelism1 失败并显示:
java.lang.IllegalStateException: There should be no more than 1 concurrent thread(s) but found 2
似乎设置 java.util.concurrent.ForkJoinPool.common.parallelism=1 没有按预期工作,同时处理了超过 1 个并发任务。
有什么想法吗?
最佳答案
Fork/Join 池的并行度设置决定了池工作线程的数量,但是由于调用者线程,例如主线程也将处理作业,使用公共(public)池时总会有一个线程。这就是为什么 default setting of the common pool is “number of cores minus one”获得等于核心数的实际工作线程数。
对于自定义的 Fork/Join 池,流操作的调用者线程已经是池的工作线程,因此,利用它来处理作业不会增加实际工作线程数。
必须强调的是,Stream 实现和 Fork/Join 池之间的交互是完全未指定的,因为流在幕后使用 Fork/Join 框架是一个实现细节。不保证更改默认池的属性对流有任何影响,也不保证从自定义 Fork/Join 池的任务中调用流操作将使用该自定义池。
关于Java parallelStream 不使用预期的线程数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42515202/
这个问题已经有答案了: How do I write a correct micro-benchmark in Java? (11 个回答) 已关闭 5 年前。 我正在对 Java 聚合操作进行一些性
我发现我对并行流提供的一致性保证略有挣扎: 1. myList.parallelStream().map(mymapper).forEach(myFn) 2. // Is myFn guarantee
我有一个方法可以并行计算列表中数字的平方并将它们相加: public static double sumSquared(List values) { return values
这个问题已经有答案了: How do Java 8 parallel streams behave on a thrown exception? (1 个回答) 已关闭 4 年前。 我正在研究java
通常当使用 Java 8 的 parallelStream() 时,结果是通过默认的、通用的 fork-join 池(即 ForkJoinPool.commonPool())执行。 这显然是不可取的,
Java 8 parallelStream 似乎使用了比系统属性 java.util.concurrent.ForkJoinPool.common.parallelism 指定的线程更多的线程。这些单
我有以下代码,有时它的行为不确定。例如,我在那里传递了 3 个事件,而输出只有两个!你能解释一下这种行为的原因吗? public List getEventResponse(final List ev
Java8并行流(parallelStream)注意点 在最初使用并行流的时候,查询列表会偶尔性报空指针异常,这令我非常纳闷 代码是这样的: ?
使用示例可能更容易解释我想要做的事情。假设我必须遵循两个数组: int firstArray[] = {1, 2, 3, 4, 5}; int secArray[] = {1, 2, 3, 4, 5}
我正在尝试使用 chronicleMap.parallelStream: myChronicleMap.entrySet().parallelStream().forEach((entry) -> {
我正在使用parallelStream并行上传一些文件,有些是大文件,有些是小文件。我注意到并非所有 worker 都被使用。 一开始一切都运行良好,所有线程都被使用(我将并行度选项设置为 16)。然
这个问题在这里已经有了答案: How can I turn a List of Lists into a List in Java 8? (12 个答案) 关闭 8 年前。 我想将并行流中的列表结果
我有以下代码 public void addNames(){ List names = new ArrayList names.parallelStream().foreach(name-
如果输入大小太小,库 automatically serializes the execution of the maps in the stream ,但这种自动化没有也不能考虑 map 操作的繁重
我创建了一个并行度为 25 的自定义 ForkJoinPool。 customForkJoinPool = new ForkJoinPool(25); 我有一个包含 700 个文件名的列表,我使用这样
我在 Internet 上看到了很多示例,为了使用流 API 来执行并行操作,只需像这样调用 .parallelStream() 方法: mySet .parallelStream()
考虑这个(完全人为的)Java 代码: final List s = Arrays.asList(1, 2, 3); final int[] a = new int[1]; a[0] = 100; s
谁能告诉我为什么会这样,这是预期的行为还是错误 List a = Arrays.asList(1,1,3,3); a.parallelStream().filter(Objects::nonNull)
方法一 通常,非常快,并且效果很好。 public static int loops = 500; private static ExecutorService customPool = Execut
默认情况下,parallelStream 内的 commonPool 大小应为 cpu_cores - 1。 但是,在我的应用程序中,它始终大于硬件 cpu_cores。 VisualVM 截图: 很
我是一名优秀的程序员,十分优秀!