gpt4 book ai didi

Java 并行流内部结构

转载 作者:塔克拉玛干 更新时间:2023-11-01 23:02:11 26 4
gpt4 key购买 nike

我注意到,取决于 doSth() 方法的实现(如果线程 hibernate 一段恒定或随机的时间),并行流的执行方式不同。

例子:

import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static java.lang.System.out;

public class AtomicInt {

public static void main(String[] args) throws ExecutionException, InterruptedException {
out.println("Result: " + count());
}

public static int count() throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool(10);

AtomicInteger counter = new AtomicInteger(0);

forkJoinPool.submit(() -> IntStream
.rangeClosed(1, 20)
.parallel()
.map(i -> doSth(counter))
.forEach(i -> out.println(">>>forEach: " + Thread.currentThread().getName() + " value: " + i))
).get();

return counter.get();
}

private static int doSth(AtomicInteger counter) {
try {
out.println(">>doSth1: " + Thread.currentThread().getName());
Thread.sleep(100 + new Random().nextInt(1000));
// Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

int counterValue = counter.incrementAndGet();
out.println(">>doSth2: " + Thread.currentThread().getName() + " value: " + counterValue);

return counterValue;
}
}

每个数字正在按顺序处理:

>>doSth1: ForkJoinPool-1-worker-9
>>doSth1: ForkJoinPool-1-worker-8
>>doSth1: ForkJoinPool-1-worker-2
>>doSth1: ForkJoinPool-1-worker-1
>>doSth1: ForkJoinPool-1-worker-6
>>doSth1: ForkJoinPool-1-worker-11
>>doSth1: ForkJoinPool-1-worker-15
>>doSth1: ForkJoinPool-1-worker-4
>>doSth1: ForkJoinPool-1-worker-13
>>doSth1: ForkJoinPool-1-worker-10
>>doSth2: ForkJoinPool-1-worker-8 value: 1
>>>forEach: ForkJoinPool-1-worker-8 value: 1
>>doSth1: ForkJoinPool-1-worker-8
>>doSth2: ForkJoinPool-1-worker-15 value: 2
>>>forEach: ForkJoinPool-1-worker-15 value: 2
>>doSth1: ForkJoinPool-1-worker-15
>>doSth2: ForkJoinPool-1-worker-11 value: 3
>>>forEach: ForkJoinPool-1-worker-11 value: 3
>>doSth1: ForkJoinPool-1-worker-11
>>doSth2: ForkJoinPool-1-worker-2 value: 4
>>>forEach: ForkJoinPool-1-worker-2 value: 4
>>doSth1: ForkJoinPool-1-worker-2
>>doSth2: ForkJoinPool-1-worker-9 value: 5
>>>forEach: ForkJoinPool-1-worker-9 value: 5
>>doSth1: ForkJoinPool-1-worker-9
>>doSth2: ForkJoinPool-1-worker-11 value: 6
>>>forEach: ForkJoinPool-1-worker-11 value: 6
>>doSth1: ForkJoinPool-1-worker-11
>>doSth2: ForkJoinPool-1-worker-1 value: 7
>>>forEach: ForkJoinPool-1-worker-1 value: 7
>>doSth1: ForkJoinPool-1-worker-1
>>doSth2: ForkJoinPool-1-worker-15 value: 8
>>>forEach: ForkJoinPool-1-worker-15 value: 8
>>doSth1: ForkJoinPool-1-worker-15
>>doSth2: ForkJoinPool-1-worker-8 value: 9
>>>forEach: ForkJoinPool-1-worker-8 value: 9
>>doSth1: ForkJoinPool-1-worker-8
>>doSth2: ForkJoinPool-1-worker-13 value: 10
>>>forEach: ForkJoinPool-1-worker-13 value: 10
>>doSth1: ForkJoinPool-1-worker-13
>>doSth2: ForkJoinPool-1-worker-9 value: 11
>>>forEach: ForkJoinPool-1-worker-9 value: 11
>>doSth2: ForkJoinPool-1-worker-15 value: 12
>>>forEach: ForkJoinPool-1-worker-15 value: 12
>>doSth2: ForkJoinPool-1-worker-10 value: 13
>>>forEach: ForkJoinPool-1-worker-10 value: 13
>>doSth2: ForkJoinPool-1-worker-4 value: 14
>>>forEach: ForkJoinPool-1-worker-4 value: 14
>>doSth2: ForkJoinPool-1-worker-6 value: 15
>>>forEach: ForkJoinPool-1-worker-6 value: 15
>>doSth2: ForkJoinPool-1-worker-11 value: 16
>>>forEach: ForkJoinPool-1-worker-11 value: 16
>>doSth2: ForkJoinPool-1-worker-2 value: 17
>>>forEach: ForkJoinPool-1-worker-2 value: 17
>>doSth2: ForkJoinPool-1-worker-13 value: 18
>>>forEach: ForkJoinPool-1-worker-13 value: 18
>>doSth2: ForkJoinPool-1-worker-1 value: 19
>>>forEach: ForkJoinPool-1-worker-1 value: 19
>>doSth2: ForkJoinPool-1-worker-8 value: 20
>>>forEach: ForkJoinPool-1-worker-8 value: 20
Result: 20

当我将 doSth() 方法更改为始终 hibernate 1 秒而不是随机 hibernate 时,结果将被乱序计算:

>>doSth1: ForkJoinPool-1-worker-6
>>doSth1: ForkJoinPool-1-worker-1
>>doSth1: ForkJoinPool-1-worker-10
>>doSth1: ForkJoinPool-1-worker-2
>>doSth1: ForkJoinPool-1-worker-13
>>doSth1: ForkJoinPool-1-worker-15
>>doSth1: ForkJoinPool-1-worker-8
>>doSth1: ForkJoinPool-1-worker-11
>>doSth1: ForkJoinPool-1-worker-4
>>doSth1: ForkJoinPool-1-worker-9
>>doSth2: ForkJoinPool-1-worker-1 value: 1
>>doSth2: ForkJoinPool-1-worker-10 value: 2
>>doSth2: ForkJoinPool-1-worker-6 value: 3
>>>forEach: ForkJoinPool-1-worker-6 value: 3
>>>forEach: ForkJoinPool-1-worker-10 value: 2
>>>forEach: ForkJoinPool-1-worker-1 value: 1
>>doSth1: ForkJoinPool-1-worker-10
>>doSth1: ForkJoinPool-1-worker-6
>>doSth1: ForkJoinPool-1-worker-1
>>doSth2: ForkJoinPool-1-worker-15 value: 4
>>doSth2: ForkJoinPool-1-worker-9 value: 10
>>doSth2: ForkJoinPool-1-worker-8 value: 7
>>>forEach: ForkJoinPool-1-worker-8 value: 7
>>doSth1: ForkJoinPool-1-worker-8
>>doSth2: ForkJoinPool-1-worker-4 value: 9
>>>forEach: ForkJoinPool-1-worker-4 value: 9
>>doSth2: ForkJoinPool-1-worker-11 value: 8
>>doSth2: ForkJoinPool-1-worker-13 value: 6
>>doSth2: ForkJoinPool-1-worker-2 value: 5
>>>forEach: ForkJoinPool-1-worker-13 value: 6
>>>forEach: ForkJoinPool-1-worker-11 value: 8
>>doSth1: ForkJoinPool-1-worker-4
>>>forEach: ForkJoinPool-1-worker-9 value: 10
>>>forEach: ForkJoinPool-1-worker-15 value: 4
>>doSth1: ForkJoinPool-1-worker-9
>>doSth1: ForkJoinPool-1-worker-11
>>doSth1: ForkJoinPool-1-worker-13
>>>forEach: ForkJoinPool-1-worker-2 value: 5
>>doSth1: ForkJoinPool-1-worker-15
>>doSth1: ForkJoinPool-1-worker-2
>>doSth2: ForkJoinPool-1-worker-10 value: 12
>>>forEach: ForkJoinPool-1-worker-10 value: 12
>>doSth2: ForkJoinPool-1-worker-6 value: 11
>>doSth2: ForkJoinPool-1-worker-1 value: 13
>>>forEach: ForkJoinPool-1-worker-6 value: 11
>>>forEach: ForkJoinPool-1-worker-1 value: 13
>>doSth2: ForkJoinPool-1-worker-9 value: 15
>>doSth2: ForkJoinPool-1-worker-2 value: 20
>>>forEach: ForkJoinPool-1-worker-2 value: 20
>>doSth2: ForkJoinPool-1-worker-15 value: 19
>>>forEach: ForkJoinPool-1-worker-15 value: 19
>>doSth2: ForkJoinPool-1-worker-8 value: 14
>>doSth2: ForkJoinPool-1-worker-11 value: 17
>>doSth2: ForkJoinPool-1-worker-4 value: 16
>>doSth2: ForkJoinPool-1-worker-13 value: 18
>>>forEach: ForkJoinPool-1-worker-4 value: 16
>>>forEach: ForkJoinPool-1-worker-11 value: 17
>>>forEach: ForkJoinPool-1-worker-8 value: 14
>>>forEach: ForkJoinPool-1-worker-9 value: 15
>>>forEach: ForkJoinPool-1-worker-13 value: 18
Result: 20

这是巧合还是对这种行为有解释?

最佳答案

此时,您正在执行 sleep 语句,没有定义的顺序。虽然通过 IntStream.range 创建的流具有定义的遇到顺序,但您通过忽略实际的 int 值将操作变成无序操作.

产生可感知顺序的第一个 Action 是counter.incrementAndGet()。在此之前,哪个线程到达那个点以及它与哪个流元素相关联并不重要。此时它从 AtomicInteger 中获取它的编号。之后,仅执行两个使用该号码的附加操作,即使用该号码打印两条消息。对于这两个不同的结果,重要的是这三个操作,counter.incrementAndGet() 和打印这两条消息,是否被另一个线程拦截。

我们可以很容易地将这种情况简化为

AtomicInteger counter = new AtomicInteger();
ExecutorService es = Executors.newFixedThreadPool(20);
es.invokeAll(Collections.nCopies(20, () -> {
out.println("1st: " + Thread.currentThread().getName());
Thread.sleep(100 + new Random().nextInt(1000));
// Thread.sleep(1000);
int counterValue = counter.incrementAndGet();
out.println("2nd: " + Thread.currentThread().getName() + " value: " + counterValue);
out.println("3rd: " + Thread.currentThread().getName() + " value: " + counterValue);
return null;
}));
es.shutdown();

请注意,对于 invokeAll,根本没有定义顺序,但是,如前所述,这无关紧要。任务最迟在调用 incrementAndGet() 时获得分配的序列号。行为与流示例相同。


虽然我总是强调concurrent 并不意味着parallel,但由于未指定的执行时间和线程调度行为,仍然很有可能开始相同的短代码同时在没有后台 Activity 给 CPU 内核带来不可预测的工作负载时真正并行运行。

当所有线程并行运行时,它们同时命中了out.println的内部同步,只有一个线程可以继续执行,其他线程被放入队列。然后,synchronized不公平 性质开始发挥作用。任意线程将获胜,之后,任意线程将被放回调度。这会导致数字以随机顺序打印。

当您让线程 hibernate 一段随机时间时,它们不再完全并行运行,从而增加了在不同时间到达打印语句的机会,从而能够无竞争地执行它们。哪个线程首先到达这一点是随机的,但是由于它们在 sleep 后分配了编号,因此第一个到达这一点的线程将获得编号,依此类推。

关于Java 并行流内部结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47500239/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com