gpt4 book ai didi

Java 多线程与 CompletableFuture 运行速度较慢

转载 作者:行者123 更新时间:2023-12-02 09:04:14 25 4
gpt4 key购买 nike

我尝试编写代码来计算计算机上某种类型的文件。我测试了单线程解决方案和多线程异步解决方案,似乎单线程工作速度更快。我的代码有什么问题吗?如果不是,为什么它不能运行得更快?

代码如下:AsynchFileCounter - 异步版本。ExtensionFilter - 文件过滤器仅列出具有指定扩展名的目录和文件BasicFileCounter - 单线程版本。

public class AsynchFileCounter {
public int countFiles(String path, String extension) throws InterruptedException, ExecutionException {
ExtensionFilter filter = new ExtensionFilter(extension, true);
File f = new File(path);
return countFilesRecursive(f, filter);
}

private int countFilesRecursive(File f, ExtensionFilter filter) throws InterruptedException, ExecutionException {
return CompletableFuture.supplyAsync(() -> f.listFiles(filter))
.thenApplyAsync(files -> {
int count = 0;
for (File file : files) {
if(file.isFile())
count++;
else
try {
count += countFilesRecursive(file, filter);
} catch (Exception e) {
e.printStackTrace();
}
}
return count;
}).get();
}

}

public class ExtensionFilter implements FileFilter {
private String extension;
private boolean allowDirectories;

public ExtensionFilter(String extension, boolean allowDirectories) {
if(extension.startsWith("."))
extension = extension.substring(1);
this.extension = extension;
this.allowDirectories = allowDirectories;
}

@Override
public boolean accept(File pathname) {
if(pathname.isFile() && pathname.getName().endsWith("." + extension))
return true;
if(allowDirectories) {
if(pathname.isDirectory())
return true;
}
return false;
}
}

public class BasicFileCounter {
public int countFiles(String path, String extension) {
ExtensionFilter filter = new ExtensionFilter(extension, true);
File f = new File(path);
return countFilesRecursive(f, filter);
}

private int countFilesRecursive(File f, ExtensionFilter filter) {
int count = 0;
File [] ar = f.listFiles(filter);
for (File file : ar) {
if(file.isFile())
count++;
else
count += countFilesRecursive(file, filter);
}
return count;
}
}

最佳答案

您必须生成多个异步作业,并且不能立即等待它们完成:

public int countFiles(String path, String extension) {
ExtensionFilter filter = new ExtensionFilter(extension, true);
File f = new File(path);
return countFilesRecursive(f, filter).join();
}
private CompletableFuture<Integer> countFilesRecursive(File f, FileFilter filter) {
return CompletableFuture.supplyAsync(() -> f.listFiles(filter))
.thenCompose(files -> {
if(files == null) return CompletableFuture.completedFuture(0);
int count = 0;
CompletableFuture<Integer> fileCount = new CompletableFuture<>(), all=fileCount;
for (File file : files) {
if(file.isFile())
count++;
else
all = countFilesRecursive(file, filter).thenCombine(all, Integer::sum);
}
fileCount.complete(count);
return all;
});
}

请注意,File.listFiles 可能会返回 null

此代码将立即计算目录中的所有文件,但为子目录启动新的异步作业。子目录作业的结果通过 thenCombine 进行组合,以求和。为了简化,我们创建另一个 CompletableFuturefileCount 来表示本地计数的文件。 thenCompose 返回一个 future,它将用指定函数返回的 future 的结果来完成,因此调用者可以使用 join() 来等待该 future 的最终结果整个操作。

对于 I/O 操作,使用不同的线程池可能会有所帮助,因为默认的 ForkJoinPool 配置为利用 CPU 内核而不是 I/O 带宽:

public int countFiles(String path, String extension) {
ExecutorService es = Executors.newFixedThreadPool(30);
ExtensionFilter filter = new ExtensionFilter(extension, true);
File f = new File(path);
int count = countFilesRecursive(f, filter, es).join();
es.shutdown();
return count;
}
private CompletableFuture<Integer> countFilesRecursive(File f,FileFilter filter,Executor e){
return CompletableFuture.supplyAsync(() -> f.listFiles(filter), e)
.thenCompose(files -> {
if(files == null) return CompletableFuture.completedFuture(0);
int count = 0;
CompletableFuture<Integer> fileCount = new CompletableFuture<>(), all=fileCount;
for (File file : files) {
if(file.isFile())
count++;
else
all = countFilesRecursive(file, filter,e).thenCombine(all,Integer::sum);
}
fileCount.complete(count);
return all;
});
}

没有最佳线程数,这取决于实际的执行环境,并且会受到测量和调整。当应用程序需要在不同的环境中运行时,这应该是一个可配置的参数。

<小时/>

但请考虑您可能使用了错误的工具来完成这项工作。另一种选择是 Fork/Join 任务,它支持与线程池交互以确定当前的饱和度,因此一旦所有工作线程都繁忙,它将以普通递归方式在本地进行扫描,而不是提交更多异步作业:

public int countFiles(String path, String extension) {
ExtensionFilter filter = new ExtensionFilter(extension, true);
File f = new File(path);
return POOL.invoke(new FileCountTask(f, filter));
}

private static final int TARGET_SURPLUS = 3, TARGET_PARALLELISM = 30;

private static final ForkJoinPool POOL = new ForkJoinPool(TARGET_PARALLELISM);

static final class FileCountTask extends RecursiveTask<Integer> {
private final File path;
private final FileFilter filter;
public FileCountTask(File file, FileFilter ff) {
this.path = file;
this.filter = ff;
}

@Override
protected Integer compute() {
return scan(path, filter);
}

private static int scan(File directory, FileFilter filter) {
File[] fileList = directory.listFiles(filter);
if(fileList == null || fileList.length == 0) return 0;
List<FileCountTask> recursiveTasks = new ArrayList<>();
int count = 0;
for(File file: fileList) {
if(file.isFile()) count++;
else {
if(getSurplusQueuedTaskCount() < TARGET_SURPLUS) {
FileCountTask task = new FileCountTask(file, filter);
recursiveTasks.add(task);
task.fork();
}
else count += scan(file, filter);
}
}

for(int ix = recursiveTasks.size() - 1; ix >= 0; ix--) {
FileCountTask task = recursiveTasks.get(ix);
if(task.tryUnfork()) task.complete(scan(task.path, task.filter));
}

for(FileCountTask task: recursiveTasks) {
count += task.join();
}
return count;
}
}

关于Java 多线程与 CompletableFuture 运行速度较慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59951954/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com