gpt4 book ai didi

java - 为什么在使用 forEachOrdered 的静态初始化程序 block 中使用 lambda 进行并行流处理会产生死锁,而使用 forEach 则不会?

转载 作者:行者123 更新时间:2023-11-30 10:09:50 28 4
gpt4 key购买 nike

在玩 Java 并行流时,当一些并行操作在静态初始化 block 中完成时,我遇到了死锁。

当使用顺序流时,一切正常:

import java.util.Arrays;
public class Example1 {
static {
// displays the numbers from 1 to 10 ordered => no thread issue
Arrays.asList(1,2,3,4,5,6,7,8,9,10)
.forEach(s->System.out.println(s));
}
public static final void main(String[] args) {}
}

并行处理流时,每项工作(数字无序显示):

import java.util.Arrays;
public class Example2 {
static {
// displays the numbers from 1 to 10 unordered => no thread issue
Arrays.asList(1,2,3,4,5,6,7,8,9,10).parallelStream()
.forEach(s->System.out.println(s));
}
public static final void main(String[] args) {}
}

但是,在使用forEachOrdered()处理Stream时,出现了死锁(我想这与主线程与ForkJoinPool管理的交互有关):

import java.util.Arrays;
public class Example3 {
static {
// hangs forever (deadlock between the main thread which loads the class and the underlying ForkJoinPool which join several tasks)
Arrays.asList(1,2,3,4,5,6,7,8,9,10).parallelStream()
.forEachOrdered(s->System.out.println(s));
}
public static final void main(String[] args) {}
}

但是当在单独的线程中生成流处理时,一切顺利:

import java.util.Arrays;
public class Example4 {
static {
// displays the numbers from 1 to 10 ordered => no thread issue
new Thread(()->
Arrays.asList(1,2,3,4,5,6,7,8,9,10).parallelStream()
.forEachOrdered(s->System.out.println(s))
).start();
}
public static final void main(String[] args) {}
}

从我从线程转储中看到的,主线程正在等待 .forEachOrdered() 中使用的 ForkJoinPool 完成他的工作,但池中的第一个工作线程是阻塞等待某些东西(很可能被 main 线程阻塞)。

我真的很想了解为什么在某些情况下会发生死锁,而在其他情况下不会。这显然不仅仅是因为使用了静态初始化 block 、并行流和 lambda,因为 Example2Example3Example4 使用了这三个概念,但只有 Example3 导致死锁。

虽然这个问题看起来像是 Why does parallel stream with lambda in static initializer cause a deadlock? 的重复, 它不是。我的问题超出了链接问题,因为它提供了 Example2,我们有静态初始化 block 、并行流和 lambda,但没有死锁。这就是为什么问题标题包含“可能导致死锁但不一定”。

最佳答案

这种死锁行为有两个根本原因:

  1. main 线程正在等待另一个线程(比方说 OtherThread)完成它的工作(在 Example3 中,OtherThread 是其中之一forEachOrdered() 操作使用的 ForkJoinPool 的线程)
  2. OtherThread 使用 Lambda 表达式,该表达式将由 main 线程定义,但稍后(回想一下:Lambda 是在运行时创建的,而不是在编译时创建的)。在示例 3 中,此 Lambda 是 .forEachOrdered() 中的 Lambda。

让我们回顾一下示例并解释它们为什么会产生或不会产生死锁。

例子1

只有一个线程(main)执行以下操作:

  1. 处理静态初始化 block
  2. 对每个元素执行foreach
  3. 在处理第一个流元素时在运行时创建 lambda 表达式

由于只有一个线程,不会出现死锁。

例子2

为了更好的理解处理过程,我们可以改写为:

import java.util.Arrays;
public class Example2Instrumented {
static {
// displays the numbers from 1 to 10 unordered => no thread issue
System.out.println(Thread.currentThread().getName()+" : "+"static initializer");
Arrays.asList(1,2,3,4,5,6,7,8,9,10)
.parallelStream()
.forEach(s->System.out.println(Thread.currentThread().getName()+" : "+s));
}
public static final void main(String[] args) {}
}

这会产生以下结果:

main : static initializer
main : 7
main : 6
ForkJoinPool.commonPool-worker-2 : 9
ForkJoinPool.commonPool-worker-4 : 5
ForkJoinPool.commonPool-worker-9 : 3
ForkJoinPool.commonPool-worker-11 : 2
ForkJoinPool.commonPool-worker-2 : 10
ForkJoinPool.commonPool-worker-4 : 4
ForkJoinPool.commonPool-worker-9 : 1
ForkJoinPool.commonPool-worker-13 : 8

main 线程处理静态初始值设定项,然后在处理第一个元素时启动 forEach 并在运行时构建 lambda。其他流元素由 ForkJoinPool 中的工作线程处理。没有死锁,因为 main 线程处理了第一个元素并构建了 lambda。

例子3

我们可以在没有 Lambda 的情况下重写 Example3 来打破僵局:

import java.util.Arrays;
import java.util.function.Consumer;
public class Example3NoDeadlock {
static {
// displays the numbers from 1 to 10 ordered => no thread issue anymore
Arrays.asList(1,2,3,4,5,6,7,8,9,10).parallelStream()
.forEachOrdered(
new Consumer<Integer>() {
@Override
public void accept(Integer t) {
System.out.println(t);
}});
}
public static final void main(String[] args) {}
}

由于 Consumer 类是在编译时构建的(与在运行时构建的 lambda 相反),这打破了死锁循环。这证明至少 lambda 参与了死锁。

为了更好地理解,我们可以按如下方式检测代码:

import java.util.Arrays;
import java.util.function.Consumer;
public class Example3Instrumented {
static {
System.out.println("static initializer");
// hangs forever (deadlock between the main thread which loads the class and the underlying ForkJoinPool which join several tasks)
Arrays.asList(1,2,3,4,5,6,7,8,9,10).parallelStream()
.peek(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
System.out.println(Thread.currentThread().getName()+" "+t);
}})
.forEachOrdered(s->System.out.println(s));
}
public static final void main(String[] args) {}
}

这会产生以下输出:

main : static initializer
ForkJoinPool.commonPool-worker-6 1
ForkJoinPool.commonPool-worker-9 3
main 7
ForkJoinPool.commonPool-worker-4 2
ForkJoinPool.commonPool-worker-13 6
ForkJoinPool.commonPool-worker-11 8
ForkJoinPool.commonPool-worker-15 5
ForkJoinPool.commonPool-worker-2 9
ForkJoinPool.commonPool-worker-4 10
ForkJoinPool.commonPool-worker-9 4

main 线程处理静态初始值设定项,然后通过为流中的每个元素创建任务来开始处理 forEachOrdered(为了维护顺序,使用了复杂的基于树的算法,请参阅 ForEachOps.ForEachOrderedTask:创建任务,从代码中可以看出每个任务都在等待另一个任务完成运行)。所有任务都提交给 ForkJoinPool。我认为发生死锁是因为第一个任务由 ForkJoinPool 中的工作线程处理,并且该线程等待 main 线程构建 lambda。并且 main 线程已经开始处理它的 Task 并正在等待另一个工作线程完成它的 Task 来运行。因此陷入僵局。

例子4

在示例 4 中,我们生成了一个异步运行的新线程(即我们不等待结果)。这就是为什么 main 线程未锁定并且现在有时间在运行时构建 Lambda 的原因。

结论

要点是:如果您混合使用静态初始化器、线程和 lambda,您应该真正理解这些概念是如何实现的,否则您可能会遇到死锁。

关于java - 为什么在使用 forEachOrdered 的静态初始化程序 block 中使用 lambda 进行并行流处理会产生死锁,而使用 forEach 则不会?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53055951/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com