gpt4 book ai didi

java - 使用固定数量的线程处理大量多线程数据并允许异常

转载 作者:行者123 更新时间:2023-11-30 11:15:30 25 4
gpt4 key购买 nike

我正在通过主线程逐行处理大型文本文件 (5GB)。创建了一些其他线程来同时格式化这些行。

我已经使用 Runnable 类和 Semaphore 编写了一个解决方案,它控制运行的线程数量。不幸的是,Runnable 不提供返回值或抛出异常。如果在任何线程中抛出异常,我希望我的整个应用程序停止。

我现在正在尝试使用 CallableFuture,但出现内存不足错误。

public class ProcessLine implements Callable<Boolean> {
private final String inputLine;

public ProcessLine(String inputLine) {
this.inputLine = inputLine;
}

@Override
public Boolean call() throws Exception {
formatLine(inputLine); // huge method which can throw exceptions

return true;
}
}

在打开文本文件之前:

ExecutorService executor = Executors.newFixedThreadPool(threads, new DaemonThreadFactory());
List<Future> futures = new ArrayList<Future>();

然后在遍历所有行的循环中:

ProcessLine processLine = new ProcessLine(inputLine);

Future f = executor.submit(processLine);
futures.add(f);

这里的第一个问题是所有 Future 对象都收集在 futures 列表中。当我每行只有一个项目时,内存不足也就不足为奇了。

第二个问题是:我会在处理文本文件的最后使用 get() 方法检查所有 Future 项目。如果第一行抛出异常,我什至不会注意到。

请帮我找出解决方法。

最佳答案

您可以通过使用 this constructor 创建自定义 ThreadPoolExecutor 来限制待处理任务的数量。如下:

ExecutorService executor = new ThreadPoolExecutor(
threads,
threads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(WORK_QUEUE_SIZE));

其中 WORK_QUEUE_SIZE 确定待处理行的最大数量。


这是我想出的另一种方法。我不确定如何以优雅的方式合并 ExecutorService

import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class Scratch {

static Object lock = new Object();
static AtomicBoolean keepRunning = new AtomicBoolean(true);
static BlockingQueue<String> buf = new LinkedBlockingDeque<>(100);
static List<Consumer> consumers = Arrays.asList(new Consumer(),
new Consumer(),
new Consumer(),
new Consumer());

public static void main(String [] args) {

// Start a producer
new Producer().start();

// Start consumers
for (Consumer c : consumers)
c.start();
}

static void stopConsumers() {
System.out.println("Stopping consumers");
keepRunning.set(false);
for (Consumer c : consumers)
c.interrupt();
}

static class Producer extends Thread {
public void run() {
try (BufferedReader br =
new BufferedReader(new FileReader("lines.txt"))) {
String line;
while (null != (line = br.readLine())) {
System.out.println(line);
buf.put(line);
}
} catch (Exception e) {
e.printStackTrace();
// Producer exception
}

// Wait for the consumers to finish off the last lines in the queue
synchronized (lock) {
while (!buf.isEmpty()) {
try {
lock.wait();
} catch (InterruptedException e) {
// TODO: Deal with interruption
}
}
}

// The consumers are now hanging on buf.take. Interrupt them!
stopConsumers();
}
}


static class Consumer extends Thread {

// Dummy process
private boolean process(String str) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
}
return true;
}

public void run() {
System.out.println("Starting");

while (keepRunning.get()) {
try {
process(buf.take());
} catch (InterruptedException e) {
// TODO: Handle interrupt
e.printStackTrace();
} catch (Exception e) {
stopConsumers(); // Processing exception: Graceful shutdown
}

// Notify the producer that the queue might be empty.
synchronized (lock) {
lock.notify();
}
}

System.out.println("Stopping");
}
}

}

关于java - 使用固定数量的线程处理大量多线程数据并允许异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25378498/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com