gpt4 book ai didi

java - ForkJoinPool - 为什么程序抛出 OutOfMemoryError?

转载 作者:行者123 更新时间:2023-12-04 14:22:41 27 4
gpt4 key购买 nike

我想在 Java 8 中尝试 ForkJoinPool,所以我编写了一个小程序来搜索给定目录中名称包含特定关键字的所有文件。

程序 :

public class DirectoryService {

public static void main(String[] args) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
List<String> files = pool.invoke(task);
pool.shutdown();
System.out.println("Total no of files with hello" + files.size());
}

}

class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
private String path;
public FileSearchRecursiveTask(String path) {
this.path = path;
}

@Override
protected List<String> compute() {
File mainDirectory = new File(path);
List<String> filetedFileList = new ArrayList<>();
List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
if(mainDirectory.isDirectory()) {
System.out.println(Thread.currentThread() + " - Directory is " + mainDirectory.getName());
if(mainDirectory.canRead()) {
File[] fileList = mainDirectory.listFiles();
for(File file : fileList) {
System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
if(file.isDirectory()) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
recursiveTasks.add(task);
task.fork();
} else {
if (file.getName().contains("hello")) {
System.out.println(file.getName());
filetedFileList.add(file.getName());
}
}
}
}

for(FileSearchRecursiveTask task : recursiveTasks) {
filetedFileList.addAll(task.join());
}

}
return filetedFileList;

}
}

当目录没有太多子目录和文件时,这个程序工作正常,但如果它真的很大,那么它会抛出 OutOfMemoryError。

我的理解是最大线程数(包括补偿线程)是有界的,那么为什么会出现这个错误呢?我的程序中是否缺少任何内容?
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1486)
at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2020)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2057)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.tryRemoveAndExec(ForkJoinPool.java:1107)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2046)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)

最佳答案

你不应该 fork 出超出所有认知的新任务。基本上,只要另一个工作线程有机会获得 fork 的工作并在本地进行评估,您就应该 fork 。然后,一旦你 fork 了一个任务,不要打电话 join()就在之后。虽然底层框架将启动补偿线程以确保您的作业继续进行,而不是让所有线程被阻塞等待子任务,但这将创建可能超出系统能力的大量线程。

这是您的代码的修订版本:

public class DirectoryService {

public static void main(String[] args) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask(new File("./DIR"));
List<String> files = task.invoke();
System.out.println("Total no of files with hello " + files.size());
}

}

class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
private static final int TARGET_SURPLUS = 3;
private File path;
public FileSearchRecursiveTask(File file) {
this.path = file;
}

@Override
protected List<String> compute() {
File directory = path;
if(directory.isDirectory() && directory.canRead()) {
System.out.println(Thread.currentThread() + " - Directory is " + directory.getName());
return scan(directory);
}
return Collections.emptyList();
}

private List<String> scan(File directory)
{
File[] fileList = directory.listFiles();
if(fileList == null || fileList.length == 0) return Collections.emptyList();
List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
List<String> filteredFileList = new ArrayList<>();
for(File file: fileList) {
System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
if(file.isDirectory())
{
if(getSurplusQueuedTaskCount() < TARGET_SURPLUS)
{
FileSearchRecursiveTask task = new FileSearchRecursiveTask(file);
recursiveTasks.add(task);
task.fork();
}
else filteredFileList.addAll(scan(file));
}
else if(file.getName().contains("hello")) {
filteredFileList.add(file.getAbsolutePath());
}
}

for(int ix = recursiveTasks.size() - 1; ix >= 0; ix--) {
FileSearchRecursiveTask task = recursiveTasks.get(ix);
if(task.tryUnfork()) task.complete(scan(task.path));
}

for(FileSearchRecursiveTask task: recursiveTasks) {
filteredFileList.addAll(task.join());
}
return filteredFileList;
}
}

进行处理的方法已被分解为接收目录作为参数的方法,因此我们可以在本地将其用于任意目录,而不必与 FileSearchRecursiveTask 相关联。实例。

然后,该方法使用 getSurplusQueuedTaskCount() 以确定未被其他工作线程接收的本地排队任务的数量。确保有一些有助于工作平衡。但如果这个数量超过阈值,则处理将在本地完成,而不会 fork 更多作业。

在本地处理之后,它遍历任务并使用 tryUnfork() 识别未被其他工作线程窃取的作业并在本地处理它们。向后迭代以从最年轻的工作开始,增加了找到一些工作的机会。

之后才 join() s 与所有现在由另一个工作线程完成或当前处理的子作业。

请注意,我更改了启动代码以使用默认池。这使用“CPU 核心数”减去一个工作线程,再加上启动线程,即 main本例中的线程。

关于java - ForkJoinPool - 为什么程序抛出 OutOfMemoryError?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51650782/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com