gpt4 book ai didi

java - Java中的嵌套并行流

转载 作者:行者123 更新时间:2023-12-01 14:20:43 27 4
gpt4 key购买 nike

我想了解 Java 中嵌套流之间的排序约束。
示例 1:

public static void main(String[] args) {
IntStream.range(0, 10).forEach(i -> {
System.out.println(i);
IntStream.range(0, 10).forEach(j -> {
System.out.println(" " + i + " " + j);
});
});
}
此代码确定性地执行,因此内部循环运行 forEach在每个 j在外循环运行自己的 forEach 之前在下一个 i :
0
0 0
0 1
0 2
0 3
0 4
0 5
0 6
0 7
0 8
0 9
1
1 0
1 1
1 2
1 3
1 4
1 5
1 6
1 7
1 8
1 9
2
2 0
2 1
2 2
2 3
...
示例 2:
public static void main(String[] args) {
IntStream.range(0, 10).parallel().forEach(i -> {
System.out.println(i);
IntStream.range(0, 10).parallel().forEach(j -> {
System.out.println(" " + i + " " + j);
});
});
}
如果流是 parallel()在第二个示例中,我可以想象内部工作人员在等待线程在外部工作队列中可用时阻塞,因为外部工作队列线程必须在内部流完成时阻塞,并且只有默认线程池线程数有限。但是,死锁似乎没有发生:
6
5
8
8 6
0
1
6 2
7
1 6
8 5
7 6
8 8
2
0 6
0 2
0 8
5 2
5 4
5 6
0 5
2 6
7 2
7 5
7 8
6 4
8 9
1 5
...
两个流共享相同的默认线程池,但它们生成不同的工作单元。每个外部工作单元只能在该外部工作单元的所有内部单元完成后才能完成,因为每个并行流的末尾都有一个完成障碍。
这些内部和外部流之间的协调如何跨工作线程共享池进行管理,而不会出现任何形式的死锁?

最佳答案

并行流后面的线程池是公共(public)池,可以通过 ForkJoinPool.commonPool() 获得.它通常使用 NumberOfProcessors - 1 个 worker 。为了解决您所描述的依赖关系,如果(某些)当前工作人员被阻塞并且可能出现死锁,它能够动态创建额外的工作人员。
但是,这不是您的情况的答案。ForkJoinPool 中的任务有两个重要的功能:

  • 他们可以创建子任务并将当前任务拆分成更小的部分( fork )。
  • 他们可以等待子任务(加入)。

  • 当一个线程执行这样一个任务 A 并加入一个子任务 B 时,它不仅等待子任务完成执行,而且同时执行另一个任务 C。当 C 完成时,线程回到 A 并检查 B 是否完成。请注意,B 和 C 可以(并且很可能是)相同的任务。如果 B 完成,则 A 已成功等待/加入它(非阻塞!)。查看 this如果前面的解释不清楚,请指导。
    现在,当您使用并行流时,流的范围会递归地拆分为任务,直到任务变得非常小,以至于它们可以更有效地按顺序执行。这些任务被放入公共(public)池中的工作队列(每个工作人员有一个)。那么, IntStream.range(0, 100).parallel().forEach确实是递归地分割范围,直到它不再值得。每个最终任务,或者更确切地说是一堆迭代,可以使用 forEach 中提供的代码顺序执行.此时,公共(public)池中的工作人员可以执行这些任务,直到所有任务都完成并且流可以返回。请注意,调用线程通过加入子任务来帮助执行!
    现在,在您的情况下,这些任务中的每一个都使用并行流本身。程序相同;将其拆分为较小的任务并将这些任务放入公共(public)池中的工作队列中。来自 ForkJoinPool的观点这些只是已经存在的任务之上的额外任务。工作人员只是继续执行/加入任务,直到一切都完成并且外部流可以返回。
    这就是您在输出中看到的内容:没有确定性的行为,没有固定的顺序。也不会发生死锁,因为在给定的用例中不会有阻塞线程。
    您可以使用以下代码检查说明:
        public static void main(String[] args) {
    IntStream.range(0, 10).parallel().forEach(i -> {
    IntStream.range(0, 10).parallel().forEach(j -> {
    for (int x = 0; x < 1e6; x++) { Math.sqrt(Math.log(x)); }
    System.out.printf("%d %d %s\n", i, j, Thread.currentThread().getName());
    for (int x = 0; x < 1e6; x++) { Math.sqrt(Math.log(x)); }
    });
    });
    }
    您应该注意到主线程参与了内部迭代的执行,因此它没有(!)阻塞。普通的泳池 worker 只是一个接一个地挑选任务,直到全部完成。

    关于java - Java中的嵌套并行流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62670334/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com