gpt4 book ai didi

c++ - 如果我的 CPU 负载另有建议,我应该启动多个线程吗?

转载 作者:搜寻专家 更新时间:2023-10-31 01:37:23 24 4
gpt4 key购买 nike

我正在用 C++ 编写一个程序,该程序根据引用注释计算 NGS 读取比对。基本上,该程序将注释和对齐文件都读入内存,遍历注释,二进制搜索对齐文件中的可能位置,找到该位置后线性搜索围绕该可能位置的帧。

通常我想让这个框架稍微大一点(10000 次对齐),所以我想到了将框架拆分并将它的一部分放入单独的线程中。

一切都编译并运行,但我的多线程看起来不像预期的那样工作,因为我的 comp 使用一个核心来完成这项工作。有没有人能帮我弄清楚我在哪里实现了线程错误。

https://sourceforge.net/projects/fast-count/?source=directory

#include <iostream>
#include <cstdlib>
#include <vector>
#include <string>
#include <thread>
#include <sstream>
#include <fstream>
#include <math.h>
#include "api/BamReader.h"

using namespace std;
using namespace BamTools;

int hit_count = 0;

struct bam_headers{

string chr;
int start;

};

struct thread_data{

int thread_id;
int total_thread;
int start_gtf;
int stop_gtf;

};

struct gtf_headers{

string chr;
string source;
string feature;
string score;
string strand;
string frame;
string annotation;
int start;
int end;

};

void process(int* start_holder, int size, int gtf_start, int gtf_stop){

//threaded counter process

for (int t = 0; t < size; t++){
if((start_holder[t] >= gtf_start) && (start_holder[t] <= gtf_stop)){
hit_count++;
}
}

}

vector <string> find_index(vector <vector <bam_headers> > bams){

//define vector for bam_index to chromosome

vector <string> compute_holder;
for (int bam_idx = 0; bam_idx < bams.size();bam_idx++){
compute_holder.push_back(bams[bam_idx][0].chr);
}
return compute_holder;

}

vector <gtf_headers> load_gtf(char* filename){

//define matrix to memory holding gtf annotations by assoc. header

vector<gtf_headers> push_matrix;
gtf_headers holder;
ifstream gtf_file(filename);
string line;

cout << "Loading GTF to memory" << "\n";
if (gtf_file.is_open()){
int sub_count = 0;
string transfer_hold[8];
while(getline(gtf_file,line)){
//iterate through file
istringstream iss(line);
string token;
//iterate through line, and tokenize by tab delimitor
while(getline(iss,token,'\t')){
if (sub_count == 8){
//assign to hold struct, and push to vector
holder.chr = transfer_hold[0];
holder.source = transfer_hold[1];
holder.feature = transfer_hold[2];
holder.start = atoi(transfer_hold[3].c_str());
holder.end = atoi(transfer_hold[4].c_str());
holder.score = transfer_hold[5];
holder.strand = transfer_hold[6];
holder.frame = transfer_hold[7];
holder.annotation = token;
push_matrix.push_back(holder);
sub_count = 0;
} else {
//temporarily hold tokens
transfer_hold[sub_count] = token;
++sub_count;
}
}
}
cout << "GTF successfully loaded to memory" << "\n";
gtf_file.close();
return(push_matrix);
}else{
cout << "GTF unsuccessfully loaded to memory. Check path to file, and annotation format. Exiting" << "\n";
exit(-1);
}
}

vector <vector <bam_headers>> load_bam(char* filename){

//parse individual bam file to chromosome bins

vector <vector <bam_headers> > push_matrix;
vector <bam_headers> iter_chr;
int iter_refid = -1;
bam_headers bam_holder;
BamReader reader;
BamAlignment al;
const vector<RefData>& references = reader.GetReferenceData();

cout << "Loading " << filename << " to memory" << "\n";
if (reader.Open(filename)) {
while (reader.GetNextAlignmentCore(al)) {
if (al.IsMapped()){
//bam file must be sorted by chr. otherwise the lookup will segfault
if(al.RefID != iter_refid){
//check if chr. position has advanced in the bam file, if true, push empty vector
iter_refid++;
push_matrix.push_back(iter_chr);
}else{
//if chr. position hasn't advanced push to current index in 2d vector
bam_holder.chr = references[al.RefID].RefName;
bam_holder.start = al.Position;
push_matrix.at(iter_refid).push_back(bam_holder);
}
}
}
reader.Close();
cout << "Successfully loaded " << filename << " to memory" << "\n";
return(push_matrix);
}else{
cout << "Could not open input BAM file. Exiting." << endl;
exit(-1);
}

}

short int find_bin(const string & gtf_chr, const vector <string> mapping){

//determines which chr. bin the gtf line is associated with

int bin_compare = -1;
for (int i = 0; i < mapping.size(); i++){
if(gtf_chr == mapping[i]){
bin_compare = i;
}
}
return(bin_compare);

}

int find_frame(gtf_headers gtf_matrix, vector <bam_headers> bam_file_bin){

//binary search to find alignment index with greater and less than gtf position

int bin_size = bam_file_bin.size();
int high_end = bin_size;
int low_end = 0;
int binary_i = bin_size / 2;
int repeat = 0;
int frame_start;
bool found = false;

while (found != true){
if ((bam_file_bin[binary_i].start >= gtf_matrix.start) && (bam_file_bin[binary_i].start <= gtf_matrix.end)){
frame_start = binary_i;
found = true;
}else{
if(repeat != binary_i){
if(bam_file_bin[binary_i].start > gtf_matrix.end){
if(repeat != binary_i){
repeat = binary_i;
high_end = binary_i;
binary_i = ((high_end - low_end) / 2) + low_end;
}
}else{
if(repeat != binary_i){
repeat = binary_i;
low_end = binary_i;
binary_i = ((high_end - low_end) / 2) + low_end;
}
}
}else{
frame_start = low_end;
found = true;
}
}
}
return(frame_start);
}

vector <int > define_frame(int frame_size, int frame_start, int bam_matrix){

//define the frame for the search
vector <int> push_ints;
push_ints.push_back(frame_start - (frame_size / 2));
push_ints.push_back(frame_start + (frame_size / 2));
if(push_ints[0] < 0){
push_ints[0] = 0;
push_ints[1] = frame_size;
if(push_ints[1] > bam_matrix){
push_ints[1] = frame_size;
}
}
if(push_ints[1] > bam_matrix){
push_ints[1] = bam_matrix;
push_ints[0] = bam_matrix - (frame_size / 2);
if(push_ints[0] < 0){
push_ints[0] = 0;
}
}
return(push_ints);

}

void thread_handler(int nthread, vector <int> frame, vector <bam_headers> bam_matrix, gtf_headers gtf_record){

int thread_divide = frame[1]-frame[0];//frame_size / nthread;
int thread_remain = (frame[1]-frame[0]) % nthread;
int* start_holder = new int[thread_divide];

for(int i = 0; i < nthread; i++){
if (i < nthread - 1){
for (int frame_index = 0; frame_index < thread_divide; frame_index++){
start_holder[frame_index] = bam_matrix[frame[0]+frame_index].start;
}
frame[0] = frame[0] + thread_divide;
thread first(process, start_holder,thread_divide,gtf_record.start,gtf_record.end);
first.join();
}else{
for (int frame_index = 0; frame_index < thread_divide + thread_remain; frame_index++){
start_holder[frame_index] = bam_matrix[frame[0]+frame_index].start;
}
thread last(process, start_holder,thread_divide + thread_remain,gtf_record.start,gtf_record.end);
last.join();
}
}

}



int main (int argc, char *argv[])
{

// usage
// ./count threads frame_size gtf_file files

//define matrix to memory holding gtf annotations by assoc. header
vector <gtf_headers> gtf_matrix = load_gtf(argv[3]);

//load bam, perform counts
for(int i = 4;i < argc;i++){

//iterate through filenames in argv, define matrix to memory holding bam alignments chr and bp position
vector <vector <bam_headers> > bam_matrix = load_bam(argv[i]);

//map chromosome to bam matrix index
vector <string> index_mapping = find_index(bam_matrix);

//iterate through gtf matrix, find corresponding bins for chr, set search frames, and count
for(int gtf_i = 0; gtf_i < gtf_i < gtf_matrix.size();gtf_i++){ //gtf_i < gtf_matrix.size()

hit_count = 0;
//find corresponding bins for gtf chr
short int bin_compare = find_bin(gtf_matrix[gtf_i].chr,index_mapping);

if(bin_compare != -1){

//find start of search frame
int frame_start = find_frame(gtf_matrix[gtf_i], bam_matrix[bin_compare]);

//get up lower bounds of search frame;
vector <int> full_frame = define_frame(atoi(argv[2]),frame_start,bam_matrix[bin_compare].size());

//create c array of bam positional data for the frame, and post to thread process
thread_handler(atoi(argv[1]),full_frame,bam_matrix[bin_compare],gtf_matrix[gtf_i]);

}

//counts displayed in STOUT
cout << gtf_matrix[gtf_i].chr << "\t" << gtf_matrix[gtf_i].source << "\t" << gtf_matrix[gtf_i].feature << "\t" << gtf_matrix[gtf_i].start << "\t" << gtf_matrix[gtf_i].end << "\t" << gtf_matrix[gtf_i].score << "\t" << gtf_matrix[gtf_i].strand << "\t" << gtf_matrix[gtf_i].frame << "\t" << gtf_matrix[gtf_i].annotation << "\t" << hit_count << "\n";

}
}
}

最佳答案

你的问题的答案很简单:

thread last(process, start_holder,thread_divide + thread_remain,gtf_record.start,gtf_record.end);
last.join();

在这里,父任务创建了一个新线程,并且...立即等待线程完成。这就是 join() 所做的,它等待线程终止。

因此,您的代码启动一个新线程,并立即等待它完成,然后再做任何其他事情,例如启动下一个线程。

你需要重写 thread_handler() 来实例化所有的 std::thread 实例,然后在实例化所有实例之后,在每个实例上调用 join(),以等待他们全部完成。

典型的方法是使用 std::thread 的默认构造函数预先创建一个所有线程实例的 std::vector,然后遍历它们以初始化每个一个,然后再次遍历它们,对每个调用 join()。

关于c++ - 如果我的 CPU 负载另有建议,我应该启动多个线程吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34192171/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com