- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我尝试使用 Intel TBB 编写流水线版本的 Bitonic Sort,使用文件读取、排序、文件写入阶段,如下所示。代码卡住在 while(!outQueue.try_pop(line)) 的自旋锁处;在 FileWriter 过滤器中。有人可以解释为什么会这样吗?
更新:我做了一些进一步的测试,发现 try_pop 从头文件 _concurrent_queue_internal.h 调用的 internal_try_pop 有一个 compare_and_swap 操作,对于这个特定的 try_pop 永远失败。以下是我从 internal_try_pop 中提取的值
head counter(k)15
tail counter1605177747
item ticket(tk)15
k after head CAS 15
(k=tk)15,15---break!!
我认为尾计数器值是垃圾。对于这种情况,我能想到的唯一原因是排序器添加到队列中的值可能会被它隐式修改,从而使其不可用。
有什么想法吗?
谢谢:)
#include <iostream>
#include <sstream>
#include <string>
#include <algorithm>
#include <fstream>
#include "tbb\parallel_for.h"
#include "tbb\blocked_range.h"
#include "tbb\pipeline.h"
#include "tbb\concurrent_queue.h"
using namespace tbb;
using namespace std;
// Filter that writes lines to the output file.
class FileWriterFilter: public tbb::filter {
string outPath;
public:
FileWriterFilter(string outPath );
/*override*/void* operator()( void* item );
};
FileWriterFilter::FileWriterFilter( string outPath ) :
tbb::filter(/*is_serial=*/true),
outPath(outPath)
{
}
void* FileWriterFilter::operator()( void* item ) {
concurrent_queue<string>& outQueue = *static_cast<concurrent_queue<string>*>(item);
string line;
while(!outQueue.try_pop(line));
ofstream myfile(outPath);
if (myfile.is_open())
{
myfile <<line<<endl;
}
//myfile.close();
return NULL;
}
class FileReaderFilter: public tbb::filter {
public:
FileReaderFilter(string inPath);
private:
ifstream ifs;
tbb::concurrent_queue<string> queue;
/*override*/ void* operator()(void*);
};
FileReaderFilter::FileReaderFilter(string inPath ) :
filter(/*is_serial=*/true),
ifs(inPath)
{
}
void* FileReaderFilter::operator()(void*) {
string temp;
if( getline( ifs, temp ))
{
queue.push(temp);
}
return &queue;
}
class BitonicSort: public tbb::filter{
public:
BitonicSort();
/*override*/void* operator()( void* item );
size_t *a;
private : static const bool ASCENDING=true, DESCENDING=false;
public :void sort(size_t *b,int n)
{
a=b;
bitonicSort(0,n,ASCENDING);
}
private: void bitonicSort(int lo,int n,bool dir)
{
if(n>1)
{
int m=n/2;
bitonicSort(lo,m,ASCENDING);
bitonicSort(lo+m,m,DESCENDING);
bitonicMerge(lo,n,dir);
}
}
private : void bitonicMerge(int lo,int n,bool dir)
{
if(n>1)
{
int m=n/2;
for(int i=lo;i<lo+m;i++)
{
compare(i,i+m,dir);
}
bitonicMerge(lo,m,dir);
bitonicMerge(lo+m,m,dir);
}
}
private : void compare(int i,int j, bool dir)
{
if(dir==a[i]>a[j])
{
exchange(i,j);
}
}
private : void exchange(int i,int j)
{
/* cout<<a[i]<<" "<<a[j]<<endl;*/
int t=a[i];
a[i]=a[j];
a[j]=t;
/*cout<<a[i]<<" "<<a[j]<<endl<<endl;*/
}
private :string convertInt(int number)
{
stringstream ss;//create a stringstream
ss << number;//add number to the stream
return ss.str();//return a string with the contents of the stream
}
};
BitonicSort::BitonicSort() :
tbb::filter(/*serial=*/false)
{}
/*override*/void* BitonicSort::operator()( void* item ) {
int num_elem=2048;
size_t *max = new size_t[num_elem];
concurrent_queue<string>& queue = *static_cast<concurrent_queue<string>*>(item);
concurrent_queue<string> outQueue;
string line;
while(!queue.try_pop(line));
istringstream iss(line);
int i=0;
do
{
string sub;
iss >> sub;
max[i]=atoi(sub.c_str());;
i++;
} while (iss);
sort(max,num_elem);
string out;
for(int i=0;i<num_elem;i++)
{
out.append(convertInt(max[i]).append(" "));
}
outQueue.push(out);
return &outQueue;
}
int main() {
tbb::pipeline pipeline;
FileReaderFilter reader("sample.txt");
pipeline.add_filter(reader);
BitonicSort sorter;
pipeline.add_filter(sorter);
FileWriterFilter writer("test.txt");
pipeline.add_filter(writer);
pipeline.run(3);
pipeline.clear();
system("PAUSE");
}
最佳答案
我找到了!这是一个微不足道的错误。我已将第二个 concurrent_queue 声明为管道排序器过滤器的运算符方法中的方法变量。因此,每次执行 operator 方法时,队列都会重新初始化,从而使发送到 writer filter 的指针无效。队列必须是排序过滤器的类变量,一切正常。文件编写器存在另一个错误,已在以下内容中更改。 `
#include <string>
#include <algorithm>
#include <fstream>
#include "tbb\parallel_for.h"
#include "tbb\blocked_range.h"
#include "tbb\pipeline.h"
#include "tbb\concurrent_queue.h"
#include "tbb\task_scheduler_init.h"
#include "tbb\tbb_thread.h"
#include "tbb\task.h"
#include <iostream>
#include <sstream>
using namespace tbb;
using namespace std;
// Filter that writes lines to the output file.
class FileWriterFilter: public tbb::filter {
public:
int count;
FileWriterFilter(FILE* outFile);
private:
FILE* outFile;
/*override*/void* operator()( void* item );
};
FileWriterFilter::FileWriterFilter(FILE* outFile) :
tbb::filter(/*is_serial=*/true),
outFile(outFile),count(0)
{
}
/*override*/void* FileWriterFilter::operator()( void* item ) {
tbb::concurrent_queue<string> &outQueue = *static_cast<tbb::concurrent_queue<string>*> (item);
string outLine;
while(!outQueue.try_pop(outLine))
this_tbb_thread::yield();
fprintf(outFile,outLine.append("\n").c_str());
count++;
if(count==10000){
cout<<"over"<<endl;
}
return NULL;
}
class FileReaderFilter: public tbb::filter {
public:
FileReaderFilter(char* inPath);
private:
int count;
ifstream ifs;
tbb::concurrent_queue<string> queue;
/*override*/ void* operator()(void*);
};
FileReaderFilter::FileReaderFilter(char* inPath ) :
filter(/*is_serial=*/true),
ifs(inPath),count(0)
{
}
/*override*/void* FileReaderFilter::operator()(void*) {
string temp;
count++;
if(count<=10000){
if( getline( ifs, temp ))
{
queue.push(temp);
}
return &queue;
}
else{
return NULL;
}
}
class bitonicMerger : public tbb::task{
int lo;
int n;
bool dir;
size_t* a;
private : static const bool ASCENDING=true, DESCENDING=false;
public:
bitonicMerger(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {}
task* execute() {
if(n>1)
{
int m=n/2;
for(int i=lo;i<lo+m;i++)
{
compare(i,i+m,dir);
}
int count = 1;
tbb::task_list list;
++count;
list.push_back( *new( allocate_child() ) bitonicMerger(lo,m,dir,a) );
++count;
list.push_back( *new( allocate_child() ) bitonicMerger(lo+m,m,dir,a) );
set_ref_count(count);
spawn_and_wait_for_all(list);
}
return NULL;
}
private : void compare(int i,int j, bool dir)
{
if(dir==a[i]>a[j])
{
exchange(i,j);
}
}
private : void exchange(int i,int j)
{
int t=a[i];
a[i]=a[j];
a[j]=t;
}
};
class bitonicSorter : public tbb::task{
int lo;
int n;
bool dir;
size_t* a;
private : static const bool ASCENDING=true, DESCENDING=false;
public:
bitonicSorter(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {}
task* execute() {
if(n>1)
{
int m=n/2;
int count = 1;
tbb::task_list list;
++count;
list.push_back( *new( allocate_child() ) bitonicSorter(lo,m,ASCENDING,a) );
++count;
list.push_back( *new( allocate_child() ) bitonicSorter(lo+m,m,DESCENDING,a) );
set_ref_count(count);
spawn_and_wait_for_all(list);
count = 1;
tbb::task_list list1;
++count;
list1.push_back( *new( allocate_child() ) bitonicMerger(lo,n,dir,a) );
set_ref_count(count);
spawn_and_wait_for_all(list1);
}
return NULL;
}
};
class TBitonicSort : public tbb::filter{
public:
TBitonicSort();
/*override*/void* operator()( void* item );
size_t *a;
private : static const bool ASCENDING=true, DESCENDING=false;
private : tbb::concurrent_queue<string> outQueue;
public :void sort(size_t *b,int n)
{
a=b;
bitonicSorter& tt = *new(tbb::task::allocate_root()) bitonicSorter(0,n,ASCENDING,a);
tbb::task::spawn_root_and_wait(tt);
}
};
string convertInt(int number)
{
stringstream ss;//create a stringstream
ss << number;//add number to the stream
return ss.str();//return a string with the contents of the stream
}
TBitonicSort::TBitonicSort() :
filter(/*is_serial=*/true)
{}
/*override*/void* TBitonicSort::operator()( void* item ) {
int num_elem=2048;
size_t *max = new size_t[num_elem];
tbb::concurrent_queue<string>& queue = *static_cast<tbb::concurrent_queue<string>*>(item);
string line;
while(!queue.try_pop(line))
this_tbb_thread::yield();
istringstream iss(line);
int i=0;
do
{
string sub;
iss >> sub;
max[i]=atoi(sub.c_str());;
i++;
} while (iss);
sort(max,num_elem);
string out;
for(int i=0;i<num_elem;i++)
{
out.append(convertInt(max[i]).append(" "));
}
outQueue.push(out);
return &outQueue;
}
int run_pipe(int threads)
{
FILE* output_file = fopen("test.txt","w");
if( !output_file ) {
perror( "test.txt" );
return 0;
}
char* input_file="sample.txt";
tbb::pipeline pipeline;
FileReaderFilter reader(input_file);
pipeline.add_filter(reader);
TBitonicSort sorter;
pipeline.add_filter(sorter);
FileWriterFilter writer(output_file);
pipeline.add_filter(writer);
tbb::tick_count t0 = tbb::tick_count::now();
pipeline.run(threads);
tbb::tick_count t1 = tbb::tick_count::now();
fclose( output_file );
pipeline.clear();
if(threads==1){
printf("serial run time = %g\n", (t1-t0).seconds());
}
else{
printf("parallel run time = %g\n", (t1-t0).seconds());
}
return 0;
}
int main() {
int threads[2]={1,3};
for(int i=0;i<2;i++)
{
run_pipe(threads[i]);
}
system("PAUSE");
}
关于c++ - 使用英特尔 TBB 在并发 C++ 代码中阻止/卡住,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5397104/
我尝试设置一个文件来使用 PyCharm 编写 AI。 我正在使用的教程:https://www.youtube.com/watch?v=ujTCoH21GlA 当我运行代码时: $ import t
我一直在寻找很长一段时间,似乎无法找到一个官方/结论性的数字来引用英特尔至强四核可以完成的单精度浮点运算/时钟周期的数量。我有一个 Intel Xeon 四核 E5530 CPU。 我希望用它来计算我
在深度学习与神经网络领域,研究人员通常离不开 GPU。得益于 GPU 极高内存带宽和较多核心数,研究人员可以更快地获得模型训练的结果。与此同时,CPU 受限于自身较少的核心数,计算运行需要较长的时间
我试图了解 TCS 启用的 SGX 线程与 SDK 提供的不受信任线程之间的区别. 如果我理解正确的话,TCS 允许多个逻辑处理器进入同一个飞地。每个逻辑处理器都有自己的 TCS,因此也有自己的入口点
我想通过 IACA 分析器运行一些代码以查看它使用了多少个 uops——我从一个简单的函数开始,看看它是否在工作。 不幸的是,当我插入 IACA 说要使用的宏时,生成的程序集非常不同,因此对它的任何分
是否有可能获得许可的开发人员证书,以在生产模式下签署经过安全审查、社区开发的开源 SGX 软件二进制文件,并将其发布在 apt 或 rpm 等开源存储库中? 我刚问过英特尔 SGX 团队,他们说只有经
我正在尝试模拟 Intel 8080 指令集,但我被这条指令卡住了 OUT D8 ,根据书Intel 8080/8085 Assembly Language Programming它说 OUT ins
我在使用一些现有的 FORTRAN 代码时发现了一个问题。尽管它已经预料到需要在重新分配之前释放数组,但这从来没有必要。我现在需要它来执行此操作,但它无法正常运行。 当前的伪代码大约是: MODULE
我正在尝试在内存中对齐以下类型的数据: type foo real, allocatable, dimension(:) :: bar1, bar2 !dir$ attributes al
关闭。这个问题是off-topic .它目前不接受答案。 想改善这个问题吗? Update the question所以它是 on-topic对于堆栈溢出。 8年前关闭。 Improve this q
似乎获得和释放语义的公认定义是这样的: (引自 http://msdn.microsoft.com/en-us/library/windows/hardware/ff540496(v=vs.85).a
这是我对英特尔 TBB 流图性能进行基准测试的尝试。这是设置: 一个广播节点发送continue_msg到 N 后继节点 (一broadcast_node) 每个后继节点执行一次计算,该计算需要 t
我有两个问题 第一个问题:我使用 css3、HTML5、JavaScript 开发应用程序。在我的应用程序中,我需要从数据库中获取数据。我该怎么做? 第二个问题:intel xdk 在构建 l 时必须
在英特尔手册的第 3 卷中,它包含硬件事件计数器的描述: BACLEAR_FORCE_IQ Counts number of times a BACLEAR was forced by the Ins
嘿,我正在使用 Intel xdk 开发混合应用程序。我已经创建了注册表,然后我将代码放在那里。我尝试使用 Php Mysql 将数据库插入我的数据库后。 如果我单击注册按钮,它会显示这样的错误 [
我想知道是否可以让英特尔 C++ 编译器(或其他编译器,如 gcc 或 clang)显示一些来自优化器的消息。我想知道优化器究竟对我的代码做了什么。默认情况下,编译器只打印非常基本的东西,比如未使用的
我正在使用 64 位架构的 Intel 程序集优化我的视频解码器。为了优化,我使用 AVX2 指令集。 我的开发环境:- 操作系统:- Win 7(64位) IDE:- MSVS 2008(教授) C
如果这是一个非常愚蠢的问题,我很抱歉;我的 Fortran 不太好。我正在移植一些旧的 Fortran 代码,并遇到了这个子例程定义: SUBROUTINE SET_HYDROMODULE(HYDRO
请问,我是否可以将 Intel XDK API 和 Phonegap API 集成到单个移动应用程序中?这是因为,某些 API 仅在 Phonegap 中可用,反之亦然。 最佳答案 是的,如果我正确理
在 x86 中查找任意操作码的含义(例如 0xC8 )的相对快速简便的方法是什么? Intel Software Developer's manual搜索起来不是很有趣...... 最佳答案 查询 t
我是一名优秀的程序员,十分优秀!