gpt4 book ai didi

c++ - 处理长文件行的最佳多线程方案是什么?

转载 作者:行者123 更新时间:2023-11-30 02:28:55 25 4
gpt4 key购买 nike

我有一个大文件,我想用多线程读取并[处理]文件的所有行(偶数行)。

有人建议读取整个文件并将其分解为多个文件(与线程数相同),然后让每个线程处理一个特定的文件。因为这个想法将读取整个文件,再次写入并读取多个文件,所以它似乎很慢(3x I/O),我认为必须有更好的方案,

我自己认为这可能是一个更好的场景:

一个线程将读取文件并将数据放在全局变量中,其他线程将从该变量和进程中读取数据。更详细:

一个线程将使用运行的func1 函数读取主文件,并将每个偶数行放入最大大小为MAX_BUFFER_SIZE 的缓冲区:line1Buffer其他线程将从 Buffer 中弹出它们的数据,并通过运行 func2 函数对其进行处理。在代码中:

全局变量:

#define MAX_BUFFER_SIZE 100
vector<string> line1Buffer;
bool continue = true;// to end thread 2 to last thread by setting to false
string file = "reads.fq";

函数func1:(线程1)

void func1(){
ifstream ifstr(file.c_str());
for (long long i = 0; i < numberOfReads; i++) { // 2 lines per read
getline(ifstr,ReadSeq);
getline(ifstr,ReadSeq);// reading even lines
while( line1Buffer.size() == MAX_BUFFER_SIZE )
; // to delay when the buffer is full
line1Buffer.push_back(ReadSeq);
}
continue = false;
return;
}

和函数func2 : (其他线程)

void func2(){
string ReadSeq;
while(continue){
if(line2Buffer.size() > 0 ){
ReadSeq = line1Buffer.pop_back();
// do the proccessing....
}
}
}

关于速度:

如果读取部分较慢,那么总时间将等于只读取一次文件(并且缓冲区每次可能只包含 1 个文件,因此只有 1 个另一个线程将能够与线程 1 一起工作).如果处理部分较慢,则总时间将等于使用 numberOfThreads - 1 线程的整个处理时间。这两种情况都比用 1 个线程读取文件和写入多个文件然后用多线程和进程读取文件更快...

所以有 2 个问题:

1- 如何按线程 1 运行 func1 而其他线程运行 func2 的方式调用函数?

2-有没有更快的方案?

3-[已删除] 谁能把这个想法扩展到M个线程读取和N个线程处理?显然我们知道:M+N==umberOfThreads 是真的

编辑:第三个问题不对,因为多个线程无法帮助读取单个文件

谢谢大家

最佳答案

另一种方法可以是交错线程。读取由每个线程完成,但一次只能读取 1 个。由于在第一次迭代中等待,线程将交错。

但这只是一个可扩展的选项,如果 work() 是瓶颈(那么每次非并行执行会更好)

主题:

while (!end) {
// should be fair!
lock();
read();
unlock();

work();
}

基本示例:(您可能应该添加一些错误处理)

void thread_exec(ifstream* file,std::mutex* mutex,int* global_line_counter) {
std::string line;
std::vector<std::string> data;
int i;
do {
i = 0;
// only 1 concurrent reader
mutex->lock();
// try to read the maximum number of lines
while(i < MAX_NUMBER_OF_LINES_PER_ITERATION && getline(*file,line)) {
// only the even lines we want to process
if (*global_line_counter % 2 == 0) {
data.push_back(line);
i++;
}
(*global_line_counter)++;

}
mutex->unlock();

// execute work for every line
for (int j=0; j < data.size(); j++) {
work(data[j]);
}

// free old data
data.clear();
//until EOF was not reached
} while(i == MAX_NUMBER_OF_LINES_PER_ITERATION);

}

void process_data(std::string file) {
// counter for checking if line is even
int global_line_counter = 0;
// open file
ifstream ifstr(file.c_str());
// mutex for synchronization
// maybe a fair-lock would be a better solution
std::mutex mutex;
// create threads and start them with thread_exec(&ifstr, &mutex, &global_line_counter);
std::vector<std::thread> threads(NUM_THREADS);
for (int i=0; i < NUM_THREADS; i++) {
threads[i] = std::thread(thread_exec, &ifstr, &mutex, &global_line_counter);
}
// wait until all threads have finished
for (int i=0; i < NUM_THREADS; i++) {
threads[i].join();
}
}

关于c++ - 处理长文件行的最佳多线程方案是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40239090/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com