gpt4 book ai didi

multithreading - 使用条件变量在线程之间进行通信

转载 作者:太空宇宙 更新时间:2023-11-03 22:53:16 25 4
gpt4 key购买 nike

我正在尝试使用多线程实现处理直方图图像的算法。

最常见的方法之一是拆分多个线程,在每个线程上创建一个缓存缓冲区,在缓存缓冲区上绘制直方图,然后锁定互斥锁,将本地值添加到输出向量,然后解锁缓冲区。

这种方法非常有效,但可能会造成“堵塞”。我的意思是数据的添加不能同时实现。

在大多数情况下,当值的范围很短(例如 0-255)时,进行加法所需的时间非常快,可以忽略不计。

如果数据的范围更大,例如热图像,这一次可能会变得更加重要。

热图像通常是无符号短矩阵,即使值不使用完整范围 (0-65535),算法也必须处理所有范围。

为了稍微加快处理速度,我想启动一个后台线程来执行加法,而“前台”线程只会将数据写入预分配的缓冲区。

所以基本上“前台”线程的工作过去是这样的:

  • 从循环缓冲区中获取缓冲区。

  • 处理指定数据集的直方图(例如从第 n 行到第 m 行)。

  • 通知后台缓冲区操作完成。

后台线程用来做:

  • 等待通知到达并检查可用缓冲区的数量是否低于缓冲区的数量。

  • 如果条件为真,则从可用缓冲区中查找要处理的缓冲区。

  • 用输出缓冲区做加法。

  • 使处理后的缓冲区可重用。

我对条件变量不是很熟悉。

因此,为了使用条件变量检查线程之间的通信,我编写了以下玩具示例:

玩具.h

#ifndef TOY
#define TOY

#include <opencv2/core.hpp>
#include <opencv2/core/utility.hpp>

#include <iostream>
#include <iterator>

#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>

namespace toy
{

class toy_t : public cv::ParallelLoopBody
{
private:

struct elem_t
{
int _id;
bool _is_available;
bool _is_inprocess;

inline elem_t():
_id(-1),
_is_available(true),
_is_inprocess(false)
{}
// help for the initialization using iota.
inline elem_t& operator=(const int& id)
{
this->_id = id;

return (*this);
}
};

const int _nb_thread_available;

std::vector<elem_t> buf;
elem_t* buf_begin;
elem_t* buf_end;

mutable std::atomic_size_t _nb_buffer_available;

std::atomic_bool _run;
std::atomic_bool _is_background_terminate;

mutable std::mutex _mtx_fgd;
mutable std::mutex _mtx_bgd;

mutable std::condition_variable _cv_foreground;
mutable std::condition_variable _cv_background;

std::condition_variable _cv_thread;

elem_t* get_buffer()const
{
// Wait until a conditional variable notify that a buffer is ready to be reused.
std::unique_lock<std::mutex> lck(this->_mtx_fgd);

this->_cv_foreground.wait(lck,[&]{ return (this->_nb_buffer_available > 0);});

elem_t* it = this->buf_begin;

// Look for available buffer.

while(!it->_is_available )
it++;

it->_is_available = false;
it->_is_inprocess = true;

this->_nb_buffer_available--;

return it;
}

void background()
{
std::cout<<"background launch "<<std::endl;
while(this->_run)
{
std::unique_lock<std::mutex> lck(this->_mtx_bgd);

// Wait for a notification.
this->_cv_background.wait(lck,[&]{return (this->_nb_buffer_available != cv::getNumThreads()) ;});

//
if(!this->_run)
continue;

elem_t* it = this->buf_begin;

// Method by spinning.
// While the available buffer is not find I am looking for it.
// When I'll find I may have done multiple pass.
while(it->_is_available || it->_is_inprocess)
{
it++;

if(it == this->buf_end)
it = this->buf_begin;
}

// This method is more logic than the spinner.
// A condition variable has notify a buffer is ready to be reused, so a one pass check is made in order to find which is this buffer.
// while(!it->_is_available )
// it++;

std::cout<<"the background thread is making the buffer : "<<it->_id<<" availlable."<<std::endl;

// Do something.

it->_is_available = true;
it->_is_inprocess = false;

this->_nb_buffer_available++;

this->_cv_foreground.notify_one();
}

this->_is_background_terminate = true;
}

public:

toy_t():
_nb_thread_available(cv::getNumThreads()), // In my computer getNumThreads() == 8
buf(),
buf_begin(nullptr),
buf_end(nullptr),
_nb_buffer_available(this->_nb_thread_available),
_run(false),
_is_background_terminate(false)
{

this->buf.reserve(this->_nb_buffer_available);
this->buf.resize(this->buf.capacity());

std::iota(this->buf.begin(),this->buf.end(),0);

this->buf_begin = this->buf.data();
this->buf_end = this->buf_begin + this->buf.size();

std::thread th([this]{ this->_cv_thread.notify_one(); this->background();});

this->_run = true;

th.detach();
}

virtual ~toy_t()
{
this->_run = false;

this->_nb_buffer_available = 0;
this->_cv_background.notify_one();

while(!this->_is_background_terminate)
std::this_thread::yield();
}

// foreground threads
virtual void operator()(const cv::Range& range)const
{
elem_t* it = this->get_buffer();

std::cout<<"the foreground thread is processing the buffer : "<<it->_id<<std::endl;

for(int r=range.start;r<range.end;r++)
{
// Do something.
}

std::this_thread::sleep_for(std::chrono::seconds(1));

it->_is_inprocess = false;


this->_cv_background.notify_one();
}
};
}

#endif // TOY

主要.cpp

#include <iostream>
#include <cstdlib>

#include "toy.h"

int main(int argc,char* argv[])
{
toy::toy_t tt;

cv::parallel_for_(cv::Range(0,15),tt);

std::cout << "Hello World!" << std::endl;
return EXIT_SUCCESS;
}

使用此代码可以毫无困难地工作。

代码不需要的方面写在方法背景上:

    // Method by spinning.
// While the available buffer is not find I am looking for it.
// When I'll find I may have done multiple pass.
while(it->_is_available || it->_is_inprocess)
{
it++;

if(it == this->buf_end)
it = this->buf_begin;
}

我必须检查变量“it”的位置,否则它可能会占据缓冲区大小之外的位置。

我的想法是:

  • 在前台线程结束时,通知会发送到后台线程。
  • 然后后台线程处理缓冲区(或缓冲区,具体取决于线程结束的速度)。
  • 最后,后台线程通知下一个前台线程(在方法 get_buffer() 中)它已完成对缓冲区的处理并使其可重用。

当后台线程正在寻找它用于在 Buf_start 和 _Buf_end 之间找到它的线程时,遵循这些语句。

所以在方法后台寻找缓冲区是:

while(!it->_is_available )
it++;

经过几个小时的测试,我不知道出了什么问题。我也很想知道这个算法是否真的像我想的那样工作?有没有更高效、处理更少的线程间通信方式?

提前致谢。

最佳答案

我解决了! :).

我从“玩具”类中识别出几个问题。

operator () 的重载中,我复制了 get buffer() 方法的代码。我围绕等待条件做了一个 block 函数,以免在线程上顺序。条件变量所需的 unique_lock 仅存在于 block 函数的括号之间,就像这样,当等待条件被评估时,互斥体 _mtx_foreground 被解锁。

为了防止在搜索可用缓冲区期间发生出站,while 循环已被替换为 for 循环。问题是满足递增指针的条件。

在“后台”线程上,我添加了一个循环。这个想法是:

  1. 后台线程必须运行到结束算法。
  2. 后台线程正在等待通知来自一个前台线程。
  3. 寻找一个缓冲区来处理,对其进行处理,重新使其可用并将其通知到前台线程。
  4. 重做直到处理完所有缓冲区然后返回2).

这里有些部分已经被“深度”修改。

名为 toy2 的 class toy 的最新实现是这个:

#ifndef TOY
#define TOY


#include <opencv2/core.hpp>
#include <opencv2/core/utility.hpp>

#include <iostream>
#include <iterator>

#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>

namespace toy
{

class toy2 : public cv::ParallelLoopBody
{

private:

struct elem_t
{
int _id;
std::atomic_bool _is_available;
std::atomic_bool _is_inprocess;

inline elem_t():
_id(-1),
_is_available(true),
_is_inprocess(false)
{}

// needed for the memory reservation because atomic_bool copy constructor is deleted.
inline elem_t(const elem_t& obj):
_id(obj._id),
_is_available((bool)obj._is_available),
_is_inprocess((bool)obj._is_inprocess)
{}

// needed for the memory reservation because atomic_bool copy constructor is deleted.
inline elem_t(elem_t&& obj):
_id(obj._id),
_is_available((bool)obj._is_available),
_is_inprocess((bool)obj._is_inprocess)
{}

// help for the initialization using iota.
inline elem_t& operator=(const int& id)
{
this->_id = id;

return (*this);
}
};

mutable std::vector<elem_t> _buffer;
std::vector<elem_t*> _elements;

std::atomic_bool _run;

mutable std::atomic_size_t _nb_available_buffers;

mutable std::mutex _mtx_thread;
mutable std::mutex _mtx_foreground;
mutable std::mutex _mtx_background;

mutable std::condition_variable _cv_ctor_dtor;
mutable std::condition_variable _cv_foreground;
mutable std::condition_variable _cv_background;



void background()
{
std::cout<<"background has been detach"<<std::endl;


while(this->_run)
{
{
std::unique_lock<std::mutex> lck(this->_mtx_background);

this->_cv_background.wait(lck);
}

// Condition for stoping terminate the thread.
if(!this->_run)
break;

while(true)
{

typename std::vector<elem_t>::iterator it = std::find_if(this->_buffer.begin(),this->_buffer.end(),[](const elem_t& v){ return (!v._is_available && !v._is_inprocess);});

if(it == this->_buffer.end())
break;

std::cout<<"the background is making the element : "<<it->_id<<" available."<<std::endl;

it->_is_available = true;
it->_is_inprocess = false;

this->_nb_available_buffers++;

this->_cv_foreground.notify_one();
}


}
}

public:


toy2():
_buffer(),
_elements(),
_run(false),
_nb_available_buffers(0),
_mtx_thread(),
_mtx_foreground(),
_mtx_background(),
_cv_ctor_dtor(),
_cv_foreground(),
_cv_background()
{
const int nb_threads = cv::getNumThreads();

this->_nb_available_buffers = nb_threads;

this->_buffer.reserve(nb_threads);
this->_buffer.resize(this->_buffer.capacity());

this->_elements.reserve(this->_buffer.size());
this->_elements.resize(this->_buffer.size(),nullptr);



std::iota(this->_buffer.begin(),this->_buffer.end(),0);

for(int i=0;i<this->_buffer.size();i++)
this->_elements[i] = std::addressof(this->_buffer[i]);

std::thread th([this]
{
// Notify to the constructor.
this->_cv_ctor_dtor.notify_one();

this->background();

// Notify to the destructor.
this->_cv_ctor_dtor.notify_one();
});

this->_run = true;

std::unique_lock<std::mutex> lck(this->_mtx_thread);

th.detach();

this->_cv_ctor_dtor.wait(lck);

}


~toy2()
{
this->_run = false;
this->_cv_background.notify_one();

std::unique_lock<std::mutex> lck(this->_mtx_thread);

this->_cv_ctor_dtor.wait(lck);
}

void operator()(const cv::Range& range)const
{
{
std::unique_lock<std::mutex> lck(this->_mtx_foreground);

this->_cv_foreground.wait(lck,[&]{return this->_nb_available_buffers>0;});
}

typename std::vector<elem_t>::iterator it = std::find_if(this->_buffer.begin(),this->_buffer.end(),[](const elem_t& v){return (bool)v._is_available;});

// for(it = this->_buffer.begin();it != this->_buffer.end();it++)
// if(it->_is_available)
// break;



it->_is_available = false;
it->_is_inprocess = true;

this->_nb_available_buffers--;

std::cout<<"the foreground is processing the element : "<<it->_id<<" "<<std::this_thread::get_id()<<std::endl;




std::this_thread::sleep_for(std::chrono::milliseconds(2));
// std::this_thread::sleep_for(std::chrono::seconds(2));

it->_is_inprocess = false;

this->_cv_background.notify_one();

std::cout<<"end thread"<<std::endl;
}

};

}
#endif

关于multithreading - 使用条件变量在线程之间进行通信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35324855/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com