gpt4 book ai didi

java - 使用并发类并行处理目录中的文件

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:56:30 25 4
gpt4 key购买 nike

我正在尝试弄清楚如何使用 java.util.concurrent 包中的类型来并行处理目录中的所有文件。

我熟悉 Python 中的 multiprocessing 包,它使用起来非常简单,所以理想情况下我正在寻找类似的东西:

public interface FictionalFunctor<T>{
void handle(T arg);
}

public class FictionalThreadPool {
public FictionalThreadPool(int threadCount){
...
}
public <T> FictionalThreadPoolMapResult<T> map(FictionalFunctor<T> functor, List<T> args){
// Executes the given functor on each and every arg from args in parallel. Returns, when
// all the parallel branches return.
// FictionalThreadPoolMapResult allows to abort the whole mapping process, at the least.
}
}

dir = getDirectoryToProcess();
pool = new FictionalThreadPool(10); // 10 threads in the pool
pool.map(new FictionalFunctor<File>(){
@Override
public void handle(File file){
// process the file
}
}, dir.listFiles());

我感觉 java.util.concurrent 中的类型允许我做类似的事情,但我完全不知道从哪里开始。

有什么想法吗?

谢谢。

编辑 1

按照答案中给出的建议,我写了这样的东西:

public void processAllFiles() throws IOException {
ExecutorService exec = Executors.newFixedThreadPool(6);
BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<Runnable>(5); // Figured we can keep the contents of 6 files simultaneously.
exec.submit(new MyCoordinator(exec, tasks));
for (File file : dir.listFiles(getMyFilter()) {
try {
tasks.add(new MyTask(file));
} catch (IOException exc) {
System.err.println(String.format("Failed to read %s - %s", file.getName(), exc.getMessage()));
}
}
}

public class MyTask implements Runnable {
private final byte[] m_buffer;
private final String m_name;

public MyTask(File file) throws IOException {
m_name = file.getName();
m_buffer = Files.toByteArray(file);
}

@Override
public void run() {
// Process the file contents
}
}

private class MyCoordinator implements Runnable {
private final ExecutorService m_exec;
private final BlockingQueue<Runnable> m_tasks;

public MyCoordinator(ExecutorService exec, BlockingQueue<Runnable> tasks) {
m_exec = exec;
m_tasks = tasks;
}

@Override
public void run() {
while (true) {
Runnable task = m_tasks.remove();
m_exec.submit(task);
}
}
}

认为代码的工作方式是:

  1. 文件一个接一个地读。
  2. 文件内容保存在专用的 MyTask 实例中。
  3. 一个容量为 5 的阻塞队列来容纳任务。我依靠服务器一次最多保留 6 个文件内容的能力 - 5 个在队列中,另一个完全初始化的任务等待进入队列。
  4. 一个特殊的 MyCoordinator 任务从队列中获取文件任务并将它们分派(dispatch)到同一个池中。

好的,所以有一个错误 - 可以创建超过 6 个任务。有些将被提交,即使所有池线程都很忙。我打算稍后再解决。

问题是它根本不起作用。 MyCoordinator 线程在第一次删除时阻塞 - 这很好。但它永远不会解除阻塞,即使新任务被放入队列中也是如此。谁能告诉我我做错了什么?

最佳答案

您要找的线程池是ExecutorService类(class)。您可以使用 newFixedThreadPool 创建一个固定大小的线程池。这使您可以轻松实现生产者-消费者模式,池为您封装了所有队列和工作程序功能:

ExecutorService exec = Executors.newFixedThreadPool(10);

然后您可以提交类型为Runnable(或者Callable,如果您还想获得结果)的对象形式的任务:

class ThreadTask implements Runnable {
public void run() {
// task code
}
}

...

exec.submit(new ThreadTask());
// alternatively, using an anonymous type
exec.submit(new Runnable() {
public void run() {
// task code
}
});

关于并行处理多个文件的重要建议:如果您有一个单独的机械磁盘来保存文件,那么使用一个线程一个一个地读取它们并提交每个文件是明智的文件到线程池任务如上,进行处理。不要并行进行实际读取,因为这会降低性能。

关于java - 使用并发类并行处理目录中的文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11693235/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com