gpt4 book ai didi

java - 在嵌套的 Java 8 并行流操作中使用信号量可能会死锁。这是一个错误吗?

转载 作者:IT老高 更新时间:2023-10-28 20:43:43 28 4
gpt4 key购买 nike

考虑以下情况:我们正在使用 Java 8 并行流来执行并行 forEach 循环,例如,

IntStream.range(0,20).parallel().forEach(i -> { /* work done here */})

并行线程的数量由系统属性“java.util.concurrent.ForkJoinPool.common.parallelism”控制,通常等于处理器的数量。

现在假设我们希望限制特定工作的并行执行次数 - 例如因为那部分是内存密集型的,而且内存限制意味着并行执行的限制。

一种明显而优雅的限制并行执行的方法是使用信号量(建议 here),例如,以下代码将并行执行的数量限制为 5:

        final Semaphore concurrentExecutions = new Semaphore(5);
IntStream.range(0,20).parallel().forEach(i -> {

concurrentExecutions.acquireUninterruptibly();

try {
/* WORK DONE HERE */
}
finally {
concurrentExecutions.release();
}
});

这很好用!

但是:在工作线程中使用任何其他并行流(在 /* WORK DONE HERE */ 处)可能会导致死锁

对我来说,这是一个意外的行为。

解释:由于 Java 流使用 ForkJoin 池,因此内部 forEach 正在 fork ,并且连接似乎永远在等待。但是,这种行为仍然是意料之外的。请注意,即使将 "java.util.concurrent.ForkJoinPool.common.parallelism" 设置为 1,并行流也可以工作。

另请注意,如果存在内部并行 forEach,则它可能不透明。

问题: 这种行为是否符合 Java 8 规范(在这种情况下,这意味着禁止在并行流 worker 中使用信号量)还是这是一个错误?

为方便起见:下面是一个完整的测试用例。这两个 boolean 值的任何组合都有效,但“true, true”除外,这会导致死锁。

澄清:为了明确一点,我强调一个方面:信号量的acquire不会发生死锁。请注意,代码由

  1. 获取信号量
  2. 运行一些代码
  3. 释放信号量

如果那段代码正在使用另一个并行流,则死锁发生在 2。然后死锁发生在该 OTHER 流中。因此,似乎不允许同时使用嵌套的并行流和阻塞操作(如信号量)!

请注意,据记载,并行流使用 ForkJoinPool 并且 ForkJoinPool 和 Semaphore 属于同一个包 - java.util.concurrent(因此人们会期望它们可以很好地互操作)。

/*
* (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: email@christian-fries.de.
*
* Created on 03.05.2014
*/
package net.finmath.experiments.concurrency;

import java.util.concurrent.Semaphore;
import java.util.stream.IntStream;

/**
* This is a test of Java 8 parallel streams.
*
* The idea behind this code is that the Semaphore concurrentExecutions
* should limit the parallel executions of the outer forEach (which is an
* <code>IntStream.range(0,numberOfTasks).parallel().forEach</code> (for example:
* the parallel executions of the outer forEach should be limited due to a
* memory constrain).
*
* Inside the execution block of the outer forEach we use another parallel stream
* to create an inner forEach. The number of concurrent
* executions of the inner forEach is not limited by us (it is however limited by a
* system property "java.util.concurrent.ForkJoinPool.common.parallelism").
*
* Problem: If the semaphore is used AND the inner forEach is active, then
* the execution will be DEADLOCKED.
*
* Note: A practical application is the implementation of the parallel
* LevenbergMarquardt optimizer in
* {@link http://finmath.net/java/finmath-lib/apidocs/net/finmath/optimizer/LevenbergMarquardt.html}
* In one application the number of tasks in the outer and inner loop is very large (>1000)
* and due to memory limitation the outer loop should be limited to a small (5) number
* of concurrent executions.
*
* @author Christian Fries
*/
public class ForkJoinPoolTest {

public static void main(String[] args) {

// Any combination of the booleans works, except (true,true)
final boolean isUseSemaphore = true;
final boolean isUseInnerStream = true;

final int numberOfTasksInOuterLoop = 20; // In real applications this can be a large number (e.g. > 1000).
final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
final int concurrentExecusionsLimitInOuterLoop = 5;
final int concurrentExecutionsLimitForStreams = 10;

final Semaphore concurrentExecutions = new Semaphore(concurrentExecusionsLimitInOuterLoop);

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams));
System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));

IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {

if(isUseSemaphore) {
concurrentExecutions.acquireUninterruptibly();
}

try {
System.out.println(i + "\t" + concurrentExecutions.availablePermits() + "\t" + Thread.currentThread());

if(isUseInnerStream) {
runCodeWhichUsesParallelStream(numberOfTasksInInnerLoop);
}
else {
try {
Thread.sleep(10*numberOfTasksInInnerLoop);
} catch (Exception e) {
}
}
}
finally {
if(isUseSemaphore) {
concurrentExecutions.release();
}
}
});

System.out.println("D O N E");
}

/**
* Runs code in a parallel forEach using streams.
*
* @param numberOfTasksInInnerLoop Number of tasks to execute.
*/
private static void runCodeWhichUsesParallelStream(int numberOfTasksInInnerLoop) {
IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
try {
Thread.sleep(10);
} catch (Exception e) {
}
});
}
}

最佳答案

每当您将问题分解为任务时,这些任务可能会在其他任务上被阻塞,并尝试在有限的线程池中执行它们,您就会面临池引发的死锁 .请参阅 Java 并发实践 8.1。

这无疑是一个错误——在您的代码中。您正在用将阻塞等待同一池中其他任务的结果的任务填充 FJ 池。有时你很幸运,事情不会陷入僵局(就像并非所有的锁顺序错误都会一直导致僵局一样),但从根本上说,你在这里是在一些非常薄的冰上滑冰。

关于java - 在嵌套的 Java 8 并行流操作中使用信号量可能会死锁。这是一个错误吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23442183/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com