gpt4 book ai didi

java - 具有多个线程的 ExecutorService 无法正常工作,但在 Debug模式下工作正常

转载 作者:行者123 更新时间:2023-11-30 02:34:41 24 4
gpt4 key购买 nike

基本上,我必须使用多线程处理一个包含近 100 万条记录的大型 csv 文件。

我创建了一个类 IngestionCallerThread

public class IngestionCallerThread {

public static void main(String[] args) {

try {
int count = 0;
InputStream ios = IngestionCallerThread.class.getClassLoader().getResourceAsStream("aa10.csv");

byte[] buff = new byte[8000];

int bytesRead = 0;
ByteArrayOutputStream bao = new ByteArrayOutputStream();

while ((bytesRead = ios.read(buff)) != -1) {
bao.write(buff, 0, bytesRead);
}

byte[] data = bao.toByteArray();

ByteArrayInputStream bin = new ByteArrayInputStream(data);
BufferedReader fileInputStreamBufferedReader = new BufferedReader(new InputStreamReader(bin));

while ((fileInputStreamBufferedReader.readLine()) != null) {
count++;
}
bin.reset();

int numberOfThreads = 12;
int rowsForEachThread = count / numberOfThreads;
int remRows = count % numberOfThreads;
int startPosition = 0;
System.out.println(count);
ExecutorService es = Executors.newCachedThreadPool();
for (int i = 0; i < numberOfThreads && startPosition < count; i++) {
if (remRows > 0 && i + 1 >= numberOfThreads)
rowsForEachThread = remRows;

IngestionThread ingThread = new IngestionThread(bin, startPosition, rowsForEachThread);
es.execute(ingThread);
startPosition = (startPosition + rowsForEachThread);
}
es.shutdown();
if (es.isTerminated()) {
System.out.println("Completed");
}
// t2.start();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}

catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

}

我用它来调用我已经实现的另一个 Runnable 类

public class IngestionThread implements Runnable {

InputStream is;
long startPosition;
long length;

public IngestionThread(InputStream targetStream, long position, long length) {
this.is = targetStream;
this.startPosition = position;
this.length = length;
}

@Override
public void run() {
// TODO Auto-generated method stub
int currentPosition = 0;
try {
is.reset();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
BufferedReader fileInputStreamBufferedReader = new BufferedReader(new InputStreamReader(is));
if (startPosition != 0) {
String line;
try {
while (((line = fileInputStreamBufferedReader.readLine())) != null) {
if (currentPosition + 1 == startPosition)
break;
currentPosition++;
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
int execLength = 0;
String line;
while ((line = fileInputStreamBufferedReader.readLine()) != null && execLength < length) {
System.out.println(line);
execLength++;
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

我使用包含 20 条记录的小型 csv 文件进行了测试。问题是当我调试该类时,几乎所有记录都会被打印。但是当我运行该类时,有时会读取 15 条记录,有时会读取 12 条记录。我不确定是什么问题。任何帮助将非常感激。提前致谢。

最佳答案

问题的原因是您有多个线程从包装共享ByteArrayInputStream的不同BufferedReader对象中读取数据。没有同步,这意味着不同的线程将读取其他线程应该读取的流部分。

每个线程都需要自己的ByteArrayInputStream

关于java - 具有多个线程的 ExecutorService 无法正常工作,但在 Debug模式下工作正常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43412735/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com