gpt4 book ai didi

c++ - Boost Threads 生产者/消费者意外行为

转载 作者:行者123 更新时间:2023-11-28 03:34:30 34 4
gpt4 key购买 nike

我目前正在编写一个应用程序(使用boost),它将有一个生产者抓取帧和一个消费者读取帧。我在生产者中添加了一个 sleep 语句来模拟抓取帧的时间。我希望消费者等待条件变量,并在生产者的第一个通知被唤醒以读取框架。然而,我在日志文件中看到的是消费者(主线程)在等待条件变量,但是生产者在消费者从等待读取框架中出来之前经历了几次通知。

这是我的 Worker.h

class Worker {
static log4cxx::LoggerPtr m_log;

public:
Worker();
virtual ~Worker();

void start();
void stop();
void getCurrentFrame(/*cv::Mat& frame*/);

private:
void processFrames();

volatile bool m_stopRequested;

bool m_bFrameReady;
boost::mutex m_mutex;
boost::condition_variable condF;

boost::shared_ptr<boost::thread> m_thread;
};

worker .cpp

LoggerPtr Worker::m_log(Logger::getLogger("fdx.Worker"));

Worker::Worker() {
m_bFrameReady = false;

LOG4CXX_INFO(m_log, "Worker() c-tor");

m_stopRequested = false;

}

Worker::~Worker() {
LOG4CXX_INFO(m_log, "Worker() d-tor");
}

void Worker::start()
{
LOG4CXX_INFO(m_log, "Worker()::start()");
assert(!m_thread);

m_thread = boost::shared_ptr<boost::thread>(new boost::thread(&Worker::processFrames, this));

LOG4CXX_WARN(m_log, "Worker()::start() thread[" << m_thread->get_id() << "] started!");
}

void Worker::stop()
{
LOG4CXX_INFO(m_log, "Worker()::stop()");

if(m_thread != NULL)
{
LOG4CXX_INFO(m_log, "Worker()::stop() ThrId [" << m_thread->get_id() << "]");
m_stopRequested = true;
m_thread->join();
}
else
{
LOG4CXX_WARN(m_log, "Worker()::stop() The thread for this camera was never started.");
}

LOG4CXX_INFO(m_log, "Worker()::stop() thread stopped!");
}

void Worker::processFrames()
{
LOG4CXX_WARN(m_log, "Worker()::processFrames() Thread[" << boost::this_thread::get_id() << "] starting...");

int rc = 0;
std::stringstream ss;

while(!this->m_stopRequested)
{
boost::mutex::scoped_lock lock(m_mutex);
LOG4CXX_WARN(m_log, "Worker()::processFrames() Got a Write lock");

m_bFrameReady = true;
LOG4CXX_WARN(m_log, "Worker()::processFrames() Frame ready set to true");

boost::this_thread::sleep(boost::posix_time::milliseconds(200));

LOG4CXX_WARN(m_log, "Worker()::processFrames() Write Un-lock");

lock.unlock();

LOG4CXX_WARN(m_log, "Worker()::processFrames() Notify");

condF.notify_one();
}
}

void Worker::getCurrentFrame()
{
boost::mutex::scoped_lock lock(m_mutex);

while(!this->m_bFrameReady)
{
LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() wait for Read lock");
condF.wait(lock);
}

LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() Frame ready; Got a Read lock");

m_bFrameReady = false;

LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() Frame ready set to false");

LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() Read Un-lock");
lock.unlock();

}

主要.cpp

LoggerPtr logger(Logger::getLogger("TCamApp"));

int main(int argc, char** argv)
{
int rc = 0;

char cwDir[FILENAME_MAX];

Worker* pWorker = NULL;

memset(cwDir, 0, sizeof(cwDir));
getcwd(cwDir, FILENAME_MAX);

std::cout << "Current Working Dir[" << cwDir << "]" << endl;

std::stringstream ss;
ss << "" << cwDir << "/logs.properties";
std::cout << "logs.properties file[" << ss.str() << "]" << endl;

struct stat st;
if(!stat(ss.str().c_str(), &st))
{
PropertyConfigurator::configure(ss.str());
}
else
{
BasicConfigurator::configure();
}

LOG4CXX_INFO(logger, "Application [" << argv[0] << "] starting...");

pWorker = new Worker();
assert(pWorker);

pWorker->start();

for(int i = 0; i < 100; i++)
{
pWorker->getCurrentFrame();

LOG4CXX_INFO(logger, "Iteration [" << i << "]");


//boost::this_thread::sleep(boost::posix_time::milliseconds(20));
}

pWorker->stop();

LOG4CXX_INFO(logger, "Application [" << argv[0] << "] stopping...");

return rc;
}

以下是我的日志文件的摘录:

2012-07-11 15:33:53,943 [0x7f5707bcf780] INFO  TCamApp - Application [/home/op/workspace/TestThreads/Debug/TestThreads] starting...
2012-07-11 15:33:53,944 [0x7f5707bcf780] WARN fdx.Worker - Worker()::start() thread[0x15e4c50] started!
2012-07-11 15:33:53,944 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() wait for Read lock
2012-07-11 15:33:53,944 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Thread[0x15e4c50] starting...
2012-07-11 15:33:53,944 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:53,944 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:54,345 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:54,345 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:54,345 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() Frame ready; Got a Read lock
2012-07-11 15:33:54,345 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() Frame ready set to false
2012-07-11 15:33:54,345 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() Read Un-lock
2012-07-11 15:33:54,346 [0x7f5707bcf780] INFO TCamApp - Iteration [0]
2012-07-11 15:33:54,346 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() wait for Read lock
2012-07-11 15:33:54,346 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:54,346 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:54,546 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:54,547 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:54,547 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:54,547 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:55,148 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:55,149 [0x7f57059c1700] WARN fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:55,149 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() Frame ready; Got a Read lock
2012-07-11 15:33:55,149 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() Frame ready set to false
2012-07-11 15:33:55,149 [0x7f5707bcf780] WARN fdx.Worker - Worker::getCurrentFrame() Read Un-lock
2012-07-11 15:33:55,149 [0x7f5707bcf780] INFO TCamApp - Iteration [1]

正如您从日志中看到的,主线程等待读取,但在主线程退出 wait() 之前,另一个线程将产生多个通知。

我研究了一些并认为我已经正确编码,但它的行为并不像我预期的那样。我将不胜感激有关解决方案的任何建议。谢谢。

最佳答案

这是预料之中的,因为生产者线程在锁定互斥锁的情况下休眠。一旦醒来,它就会通知消费者并再次锁定它。对于谁将锁定互斥量,无法保证“公平”。

您似乎要实现的是一个异步队列。它通常包含 2 个条件变量:一个在队列满时压低生产者,另一个在队列空时压低消费者。无论生产或消费队列中的项目需要多长时间,互斥体只会在推/弹出操作期间被锁定 - 这应该非常快。

您的 sleep 语句可能只是使操作系统的调度程序偏向生产者线程。将 sleep 移出关键部分,以模拟推送操作之外的处理,您应该会看到消费者线程的响应速度更快。

在相关说明中,您可以将哨兵对象(即特殊值,如指针队列中的空指针)推送到队列中,让使用者线程知道它们必须停下来。

关于c++ - Boost Threads 生产者/消费者意外行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11441965/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com