gpt4 book ai didi

c++ - 使用boost::thread的Actor计算模型

转载 作者:可可西里 更新时间:2023-11-01 17:51:15 25 4
gpt4 key购买 nike

我正在尝试使用 boost::thread 在 C++ 上的线程上实现 Actor 计算模型。但是程序在执行过程中抛出了奇怪的异常。异常不稳定,有时程序会以正确的方式工作。

这是我的代码:

actor.hpp

class Actor {

public:
typedef boost::function<int()> Job;

private:
std::queue<Job> d_jobQueue;
boost::mutex d_jobQueueMutex;
boost::condition_variable d_hasJob;
boost::atomic<bool> d_keepWorkerRunning;
boost::thread d_worker;

void workerThread();

public:
Actor();
virtual ~Actor();

void execJobAsync(const Job& job);

int execJobSync(const Job& job);
};

actor.cpp

namespace {

int executeJobSync(std::string *error,
boost::promise<int> *promise,
const Actor::Job *job)
{
int rc = (*job)();

promise->set_value(rc);
return 0;
}

}

void Actor::workerThread()
{
while (d_keepWorkerRunning) try {
Job job;
{
boost::unique_lock<boost::mutex> g(d_jobQueueMutex);

while (d_jobQueue.empty()) {
d_hasJob.wait(g);
}

job = d_jobQueue.front();
d_jobQueue.pop();
}

job();
}
catch (...) {
// Log error
}
}

void Actor::execJobAsync(const Job& job)
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_jobQueue.push(job);
d_hasJob.notify_one();
}

int Actor::execJobSync(const Job& job)
{
std::string error;
boost::promise<int> promise;
boost::unique_future<int> future = promise.get_future();

{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_jobQueue.push(boost::bind(executeJobSync, &error, &promise, &job));
d_hasJob.notify_one();
}

int rc = future.get();

if (rc) {
ErrorUtil::setLastError(rc, error.c_str());
}

return rc;
}

Actor::Actor()
: d_keepWorkerRunning(true)
, d_worker(&Actor::workerThread, this)
{
}

Actor::~Actor()
{
d_keepWorkerRunning = false;
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_hasJob.notify_one();
}
d_worker.join();
}

实际上抛出的异常是 int rc = future.get(); 行中的 boost::thread_interrupted。但是形成 boost 文档我无法解释这个异常。文档说

Throws: - boost::thread_interrupted if the result associated with *this is not ready at the point of the call, and the current thread is interrupted.

但是我的工作线程不能处于中断状态。

当我使用 gdb 并设置“catch throw”时,我看到回溯看起来像

throw thread_interrupted

boost::detail::interruption_checker::check_for_interruption

boost::detail::interruption_checker::interruption_checker

boost::condition_variable::wait

boost::detail::future_object_base::wait_internal

boost::detail::future_object_base::wait

boost::detail::future_object::get

boost::unique_future::get

我查看了 boost 源,但不明白为什么 interruption_checker 决定工作线程被中断。

所以有人 C++ 大师,请帮助我。我需要做什么才能获得正确的代码?我正在使用:

boost 1_53

Linux 版本 2.6.18-194.32.1.el5 Red Hat 4.1.2-48

海湾合作委员会 4.7

EDIT

Fixed it! Thanks to Evgeny Panasyuk and Lazin. The problem was in TLS management. boost::thread and boost::thread_specific_ptr are using same TLS storage for their purposes. In my case there was problem when they both tried to change this storage on creation (Unfortunately I didn't get why in details it happens). So TLS became corrupted.

I replaced boost::thread_specific_ptr from my code with __thread specified variable.

Offtop: During debugging I found memory corruption in external library and fixed it =)

.

EDIT 2 I got the exact problem... It is a bug in GCC =) The _GLIBCXX_DEBUG compilation flag breaks ABI. You can see discussion on boost bugtracker: https://svn.boost.org/trac/boost/ticket/7666

最佳答案

我发现了几个错误:


Actor::workerThread函数在 d_jobQueueMutex 上双重解锁.第一次解锁是手动的 d_jobQueueMutex.unlock(); , 第二个在 boost::unique_lock<boost::mutex> 的析构函数中.

你应该阻止解锁之一,例如 release unique_lock之间的关联和 mutex :

g.release(); // <------------ PATCH
d_jobQueueMutex.unlock();

或者添加额外的代码块+默认构造Job .


有可能workerThread永远不会离开以下循环:

while (d_jobQueue.empty()) {
d_hasJob.wait(g);
}

想象以下情况:d_jobQueue是空的,Actor::~Actor()被调用,它设置标志并通知工作线程:

d_keepWorkerRunning = false;
d_hasJob.notify_one();

workerThread在 while 循环中醒来,发现队列为空并再次休眠。

通常的做法是发送特殊的最终作业来停止工作线程:

~Actor()
{
execJobSync([this]()->int
{
d_keepWorkerRunning = false;
return 0;
});
d_worker.join();
}

在这种情况下,d_keepWorkerRunning不需要是原子的。


LIVE DEMO on Coliru


编辑:

I have added event queue code into your example.

您在 EventQueueImpl 中都有并发队列和 Actor , 但对于不同的类型。可以将公共(public)部分提取到单独的实体中 concurrent_queue<T>适用于任何类型。在一个地方调试和测试队列比捕获分散在不同类中的错误要容易得多。

所以,你可以尝试使用这个 concurrent_queue<T> (on Coliru)

关于c++ - 使用boost::thread的Actor计算模型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19593427/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com