- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在尝试使用多线程实现处理直方图图像的算法。
最常见的方法之一是拆分多个线程,在每个线程上创建一个缓存缓冲区,在缓存缓冲区上绘制直方图,然后锁定互斥锁,将本地值添加到输出向量,然后解锁缓冲区。
这种方法非常有效,但可能会造成“堵塞”。我的意思是数据的添加不能同时实现。
在大多数情况下,当值的范围很短(例如 0-255)时,进行加法所需的时间非常快,可以忽略不计。
如果数据的范围更大,例如热图像,这一次可能会变得更加重要。
热图像通常是无符号短矩阵,即使值不使用完整范围 (0-65535),算法也必须处理所有范围。
为了稍微加快处理速度,我想启动一个后台线程来执行加法,而“前台”线程只会将数据写入预分配的缓冲区。
所以基本上“前台”线程的工作过去是这样的:
从循环缓冲区中获取缓冲区。
处理指定数据集的直方图(例如从第 n 行到第 m 行)。
通知后台缓冲区操作完成。
后台线程用来做:
等待通知到达并检查可用缓冲区的数量是否低于缓冲区的数量。
如果条件为真,则从可用缓冲区中查找要处理的缓冲区。
用输出缓冲区做加法。
使处理后的缓冲区可重用。
我对条件变量不是很熟悉。
因此,为了使用条件变量检查线程之间的通信,我编写了以下玩具示例:
玩具.h
#ifndef TOY
#define TOY
#include <opencv2/core.hpp>
#include <opencv2/core/utility.hpp>
#include <iostream>
#include <iterator>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
namespace toy
{
class toy_t : public cv::ParallelLoopBody
{
private:
struct elem_t
{
int _id;
bool _is_available;
bool _is_inprocess;
inline elem_t():
_id(-1),
_is_available(true),
_is_inprocess(false)
{}
// help for the initialization using iota.
inline elem_t& operator=(const int& id)
{
this->_id = id;
return (*this);
}
};
const int _nb_thread_available;
std::vector<elem_t> buf;
elem_t* buf_begin;
elem_t* buf_end;
mutable std::atomic_size_t _nb_buffer_available;
std::atomic_bool _run;
std::atomic_bool _is_background_terminate;
mutable std::mutex _mtx_fgd;
mutable std::mutex _mtx_bgd;
mutable std::condition_variable _cv_foreground;
mutable std::condition_variable _cv_background;
std::condition_variable _cv_thread;
elem_t* get_buffer()const
{
// Wait until a conditional variable notify that a buffer is ready to be reused.
std::unique_lock<std::mutex> lck(this->_mtx_fgd);
this->_cv_foreground.wait(lck,[&]{ return (this->_nb_buffer_available > 0);});
elem_t* it = this->buf_begin;
// Look for available buffer.
while(!it->_is_available )
it++;
it->_is_available = false;
it->_is_inprocess = true;
this->_nb_buffer_available--;
return it;
}
void background()
{
std::cout<<"background launch "<<std::endl;
while(this->_run)
{
std::unique_lock<std::mutex> lck(this->_mtx_bgd);
// Wait for a notification.
this->_cv_background.wait(lck,[&]{return (this->_nb_buffer_available != cv::getNumThreads()) ;});
//
if(!this->_run)
continue;
elem_t* it = this->buf_begin;
// Method by spinning.
// While the available buffer is not find I am looking for it.
// When I'll find I may have done multiple pass.
while(it->_is_available || it->_is_inprocess)
{
it++;
if(it == this->buf_end)
it = this->buf_begin;
}
// This method is more logic than the spinner.
// A condition variable has notify a buffer is ready to be reused, so a one pass check is made in order to find which is this buffer.
// while(!it->_is_available )
// it++;
std::cout<<"the background thread is making the buffer : "<<it->_id<<" availlable."<<std::endl;
// Do something.
it->_is_available = true;
it->_is_inprocess = false;
this->_nb_buffer_available++;
this->_cv_foreground.notify_one();
}
this->_is_background_terminate = true;
}
public:
toy_t():
_nb_thread_available(cv::getNumThreads()), // In my computer getNumThreads() == 8
buf(),
buf_begin(nullptr),
buf_end(nullptr),
_nb_buffer_available(this->_nb_thread_available),
_run(false),
_is_background_terminate(false)
{
this->buf.reserve(this->_nb_buffer_available);
this->buf.resize(this->buf.capacity());
std::iota(this->buf.begin(),this->buf.end(),0);
this->buf_begin = this->buf.data();
this->buf_end = this->buf_begin + this->buf.size();
std::thread th([this]{ this->_cv_thread.notify_one(); this->background();});
this->_run = true;
th.detach();
}
virtual ~toy_t()
{
this->_run = false;
this->_nb_buffer_available = 0;
this->_cv_background.notify_one();
while(!this->_is_background_terminate)
std::this_thread::yield();
}
// foreground threads
virtual void operator()(const cv::Range& range)const
{
elem_t* it = this->get_buffer();
std::cout<<"the foreground thread is processing the buffer : "<<it->_id<<std::endl;
for(int r=range.start;r<range.end;r++)
{
// Do something.
}
std::this_thread::sleep_for(std::chrono::seconds(1));
it->_is_inprocess = false;
this->_cv_background.notify_one();
}
};
}
#endif // TOY
主要.cpp
#include <iostream>
#include <cstdlib>
#include "toy.h"
int main(int argc,char* argv[])
{
toy::toy_t tt;
cv::parallel_for_(cv::Range(0,15),tt);
std::cout << "Hello World!" << std::endl;
return EXIT_SUCCESS;
}
使用此代码可以毫无困难地工作。
代码不需要的方面写在方法背景上:
// Method by spinning.
// While the available buffer is not find I am looking for it.
// When I'll find I may have done multiple pass.
while(it->_is_available || it->_is_inprocess)
{
it++;
if(it == this->buf_end)
it = this->buf_begin;
}
我必须检查变量“it”的位置,否则它可能会占据缓冲区大小之外的位置。
我的想法是:
当后台线程正在寻找它用于在 Buf_start 和 _Buf_end 之间找到它的线程时,遵循这些语句。
所以在方法后台寻找缓冲区是:
while(!it->_is_available )
it++;
经过几个小时的测试,我不知道出了什么问题。我也很想知道这个算法是否真的像我想的那样工作?有没有更高效、处理更少的线程间通信方式?
提前致谢。
最佳答案
我解决了! :).
我从“玩具”类中识别出几个问题。
在 operator ()
的重载中,我复制了 get buffer()
方法的代码。我围绕等待条件做了一个 block 函数,以免在线程上顺序。条件变量所需的 unique_lock
仅存在于 block 函数的括号之间,就像这样,当等待条件被评估时,互斥体 _mtx_foreground
被解锁。
为了防止在搜索可用缓冲区期间发生出站,while
循环已被替换为 for
循环。问题是满足递增指针的条件。
在“后台”线程上,我添加了一个循环。这个想法是:
这里有些部分已经被“深度”修改。
名为 toy2 的 class toy
的最新实现是这个:
#ifndef TOY
#define TOY
#include <opencv2/core.hpp>
#include <opencv2/core/utility.hpp>
#include <iostream>
#include <iterator>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
namespace toy
{
class toy2 : public cv::ParallelLoopBody
{
private:
struct elem_t
{
int _id;
std::atomic_bool _is_available;
std::atomic_bool _is_inprocess;
inline elem_t():
_id(-1),
_is_available(true),
_is_inprocess(false)
{}
// needed for the memory reservation because atomic_bool copy constructor is deleted.
inline elem_t(const elem_t& obj):
_id(obj._id),
_is_available((bool)obj._is_available),
_is_inprocess((bool)obj._is_inprocess)
{}
// needed for the memory reservation because atomic_bool copy constructor is deleted.
inline elem_t(elem_t&& obj):
_id(obj._id),
_is_available((bool)obj._is_available),
_is_inprocess((bool)obj._is_inprocess)
{}
// help for the initialization using iota.
inline elem_t& operator=(const int& id)
{
this->_id = id;
return (*this);
}
};
mutable std::vector<elem_t> _buffer;
std::vector<elem_t*> _elements;
std::atomic_bool _run;
mutable std::atomic_size_t _nb_available_buffers;
mutable std::mutex _mtx_thread;
mutable std::mutex _mtx_foreground;
mutable std::mutex _mtx_background;
mutable std::condition_variable _cv_ctor_dtor;
mutable std::condition_variable _cv_foreground;
mutable std::condition_variable _cv_background;
void background()
{
std::cout<<"background has been detach"<<std::endl;
while(this->_run)
{
{
std::unique_lock<std::mutex> lck(this->_mtx_background);
this->_cv_background.wait(lck);
}
// Condition for stoping terminate the thread.
if(!this->_run)
break;
while(true)
{
typename std::vector<elem_t>::iterator it = std::find_if(this->_buffer.begin(),this->_buffer.end(),[](const elem_t& v){ return (!v._is_available && !v._is_inprocess);});
if(it == this->_buffer.end())
break;
std::cout<<"the background is making the element : "<<it->_id<<" available."<<std::endl;
it->_is_available = true;
it->_is_inprocess = false;
this->_nb_available_buffers++;
this->_cv_foreground.notify_one();
}
}
}
public:
toy2():
_buffer(),
_elements(),
_run(false),
_nb_available_buffers(0),
_mtx_thread(),
_mtx_foreground(),
_mtx_background(),
_cv_ctor_dtor(),
_cv_foreground(),
_cv_background()
{
const int nb_threads = cv::getNumThreads();
this->_nb_available_buffers = nb_threads;
this->_buffer.reserve(nb_threads);
this->_buffer.resize(this->_buffer.capacity());
this->_elements.reserve(this->_buffer.size());
this->_elements.resize(this->_buffer.size(),nullptr);
std::iota(this->_buffer.begin(),this->_buffer.end(),0);
for(int i=0;i<this->_buffer.size();i++)
this->_elements[i] = std::addressof(this->_buffer[i]);
std::thread th([this]
{
// Notify to the constructor.
this->_cv_ctor_dtor.notify_one();
this->background();
// Notify to the destructor.
this->_cv_ctor_dtor.notify_one();
});
this->_run = true;
std::unique_lock<std::mutex> lck(this->_mtx_thread);
th.detach();
this->_cv_ctor_dtor.wait(lck);
}
~toy2()
{
this->_run = false;
this->_cv_background.notify_one();
std::unique_lock<std::mutex> lck(this->_mtx_thread);
this->_cv_ctor_dtor.wait(lck);
}
void operator()(const cv::Range& range)const
{
{
std::unique_lock<std::mutex> lck(this->_mtx_foreground);
this->_cv_foreground.wait(lck,[&]{return this->_nb_available_buffers>0;});
}
typename std::vector<elem_t>::iterator it = std::find_if(this->_buffer.begin(),this->_buffer.end(),[](const elem_t& v){return (bool)v._is_available;});
// for(it = this->_buffer.begin();it != this->_buffer.end();it++)
// if(it->_is_available)
// break;
it->_is_available = false;
it->_is_inprocess = true;
this->_nb_available_buffers--;
std::cout<<"the foreground is processing the element : "<<it->_id<<" "<<std::this_thread::get_id()<<std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(2));
// std::this_thread::sleep_for(std::chrono::seconds(2));
it->_is_inprocess = false;
this->_cv_background.notify_one();
std::cout<<"end thread"<<std::endl;
}
};
}
#endif
关于multithreading - 使用条件变量在线程之间进行通信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35324855/
我有2个功能: function func1() while true do -- listen on connection end end function func2()
我的问题可能看起来很奇怪,但我想我正面临着 volatile 的问题。对象。 我写了一个这样实现的库(只是一个方案,不是真正的内容): (def var1 (volatile! nil)) (def
由于 maven 支持多线程构建,是否可以同时运行 Sonar 多线程? (例如 mvn sonar:sonar -T 4 ) 我运行了它,当模块报告成功时,它报告整个构建失败并返回 java.uti
我们正在启动一个网站,该网站在短时间内的交易量非常大。它基本上是在给票。该代码是用Java,Spring和Hibernate编写的。我想通过产生多个线程并尝试使用JUnit测试用例来获取票证来模仿高容
我正在尝试访问像素数据并将图像从游戏中的相机保存到磁盘。最初,简单的方法是使用渲染目标,然后使用RenderTarget-> ReadPixels(),但是由于ReadPixels()的 native
我们有以下系统: 用户数:〜500k 项目数:〜100k UserSimilarity userSimilarity = new TanimotoCoefficientSimilarity(dataM
也许这是一个经常出现的问题,但我需要根据我的上下文进行一些自定义。 我正在使用 Spring Batch 3.0.1.RELEASE 我有一个简单的工作,有一些步骤。一个步骤是这样的 block :
也许这是一个经常出现的问题,但我需要根据我的上下文进行一些自定义。 我正在使用 Spring Batch 3.0.1.RELEASE 我有一个简单的工作,有一些步骤。一个步骤是这样的 block :
我正在尝试使用PyBrain和Python的multiprocessing软件包在Python中训练神经网络。 这是我的代码(它训练了一个简单的神经网络来学习XOR逻辑)。 import pybrai
我有一个繁重的功能,不适合在主时间轴上执行(因为要花很长时间才能完成并使程序崩溃)。 因此我在air(as3)中搜索多线程,但是我发现的所有示例都说明了如何在worker中运行单独的swf文件。如何在
我想实现线程A 和线程B 并行运行并共享全局变量。 下面是用python编写的代码。我想在中执行相同操作Dart (我不想使用future等待,因为它正在等待其他线程完成或必须等待。) 大小写变量:
我的一个项目只适用于调试 DLL,而不适用于非调试 DLL。 在 Debug DLL 设置下发布项目有哪些注意事项?例如,是否丢失了某些优化? 如何通过将调试版本设置为非调试 DLL 来调试此项目?我
我正在尝试比较 Matlab 和 Julia 之间的速度和性能。我正在查看一个代码,该代码对承受给定负载的连续体结构进行拓扑优化。我正在查看的代码是公共(public)代码topopt88.m:htt
Serving Flask 应用程序“服务器”(延迟加载) 环境:生产警告:这是一个开发服务器。不要在生产部署中使用它。请改用生产 WSGI 服务器。 Debug模式:开启 在 http://0.0.
我对 PyQT 很陌生。我正在学习如何制作 Progressbar 并随着算法的进展对其进行更新。我已经能够制作一个使用此链接进行 self 更新的基本进度条:Python pyqt pulsing
我正在尝试指定在特定线程上运行任务,这样我就可以使用两个专用于“放入” channel 的耗时任务的线程,而其他线程则用于处理该任务。 我对如何将特定任务分配给特定线程感到困惑。我以为我可以使用类似
我正在编写一个软件,它对很多(潜在的大)图像进行大量图像操作/合成。 多线程有助于提高速度,但 QT 不允许同时在同一图像上使用多个 QPainter。 所以我必须在副本的每个线程中进行图像操作/合成
此脚本读取 url 文件以执行多线程 HTTP 请求。 如何使用带有 url 的数组来发出多线程请求? 我的阵列将有类似的东西: @array = ("https://example.com/xsd"
Java 文档声明了以下关于构造函数同步的内容: Note that constructors cannot be synchronized — using the synchronized keyw
我有一个程序,其中主线程创建了很多线程。它崩溃了,我正在调试核心文件。崩溃发生在其中一个子线程中。为了找到原因,我需要知道主线程是否还活着。有什么方法可以找出哪个线程是初始线程? 最佳答案 Is th
我是一名优秀的程序员,十分优秀!