gpt4 book ai didi

c++ - Boost threads - 处理线程中断的安全/保证方式

转载 作者:太空狗 更新时间:2023-10-29 21:16:33 28 4
gpt4 key购买 nike

我正在使用 pthreads 移动 C 程序转向 C++,并且为了使程序具有多平台、可移植性等特性,将不得不广泛使用 Boost 库。

过去使用线程时,我的代码通常具有以下形式:


void threadFunc(void* pUserData)
{
if ( !pUserData )
{
return;
}

myStruct* pData = (myStruct*)pUserData;
bool bRun;

lock(pUserData);
bRun = pUserData->bRun;
unlock(pUserData);

while ( bRun )
{
// Do some stuff
// ...
// ...
lock(pUserData);
bRun = pUserData->bRun;
unlock(pUserData);
}

/* Done execution of thread; External thread must have set pUserData->bRun
* to FALSE.
*/
}

这如我所料。当我想要线程关闭时,我只是锁定 struct 的互斥体。它正在访问(通常是 struct 的成员),切换 bRun标志,和join()在线程上。对于 boost 线程,我注意到我的代码中有 timed_join() 的情况。执行非阻塞操作时超时,similar to this SO question .这让我怀疑我没有正确使用 boost 线程。

对于问题...

首先,两种通用线程结构中的哪一种对于让线程正确捕获 thread_interrupted 是正确的异常?

案例A


void threadFunc( void* pUserData )
{
while ( 1 )
{
try
{
// Do some stuff
// ...
// ...
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
}
catch(boost::thread_interrupted const& )
{
// Thread interrupted; Clean up
}
}
}

案例B


void threadFunc( void* pUserData )
{
try
{
while ( 1 )
{
// Do some stuff
// ...
// ...
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
}
}
catch(boost::thread_interrupted const& )
{
// Thread interrupted; Clean up
}
}

其次,如果我希望线程有机会捕获中断调用,而不是捕获中断调用,那么调用什么适当的 boost 函数来代替 sleep sleep()还是放弃当前占有的剩余CPU时间片? From another SO question on this topic , 似乎 boost::this_thread::interruption_point() call 可能是我正在寻找的东西,但我不能 100% 确定它是否总是有效,根据我在 SO 问题中阅读的内容,我从中引用了它。


最后,据我了解,不调用任何方式的 boost sleep()我循环中的函数或一些类似的中断点函数将意味着 timed_join()总是会超时,我要么必须:

  • 强行终止线程。
  • 像在我原来的 C 代码中那样使用互斥锁保护标志。

这个假设是否正确?


谢谢。


引用资料

  1. Boost Thread - How to acknowledge interrupt , <https://stackoverflow.com/questions/7316633/boost-thread-how-to-acknowledge-interrupt> , 于 2016 年 1 月 18 日访问。
  2. boost::this_thread::interruption_point() doesn't throw boost::thread_interrupted& exception , <https://stackoverflow.com/questions/26103648/boostthis-threadinterruption-point-doesnt-throw-boostthread-interrupted> , 于 2016 年 1 月 18 日访问。

最佳答案

您的代码不是异常安全的。因此,在等待加入时很容易出现死锁。

如果您在持有互斥量时收到异常,您的代码将永远不会解锁互斥量,可能会导致等待线程中出现死锁。

要检查的事情:

  1. 首先,您的代码不会在任何地方显示您如何创建您试图中断的线程。

    需要boost::thread 管理的线程实例上运行它。这应该是显而易见的,因为如果没有这样的对象,将很难调用 boost::thread::interrupt()。尽管如此,我还是想仔细检查一下这是按顺序进行的,因为线程函数的函数签名强烈建议使用 native POSIX pthread_create

    A good idea is to check the return value of interrupt_request() (which will be false for native threads):

    The boost::this_thread interrupt related functions behave in a degraded mode when called from a thread created using the native interface, i.e. boost::this_thread::interruption_enabled() returns false. As consequence the use of boost::this_thread::disable_interruption and boost::this_thread::restore_interruption will do nothing and calls to boost::this_thread::interruption_point() will be just ignored.

  2. 锁定/解锁功能是如何实现的?这里同样如此。如果您混合使用 POSIX/非 boost API,那么您可能会错过信号。

    当你这样做的时候,开始使用 lock_guardunique_lock 这样你的代码就异常安全了。

  3. 关于您的Case ACase B 问题,Case B 最简单。

    案例 A 没有显示如何计划退出 while 循环。您当然可以使用 breakreturn(甚至 goto),但正如您展示的那样,这将是一个无限循环。

  4. 除了 sleep() 之外,还有 yield() 明确放弃时间片的剩余部分,同时作为中断点。

    我知道你说过你不想放弃时间片,但我不确定你是否意识到如果没有工作要做,这将导致线程消耗 CPU:

    让我觉得非常矛盾的是,您希望不放弃剩余的时间片(表示低延迟、高带宽应用程序和无锁上下文)并且您在谈论同时使用 mutex。显然,后者比仅 yield 部分时间片要昂贵得多,因为它是无锁并发的对立面

因此,让我以两个良好风格的演示作为结束,一个是有锁的,一个是无锁的(或保守锁,取决于您的线程同步逻辑的其余部分)。

锁定演示

这使用“旧”线程控制,因为它在锁定时没问题,IMO:

Live On Coliru

#include <boost/thread.hpp>

void trace(std::string const& msg);

struct myStruct {
bool keepRunning = true;

using mutex_t = boost::mutex;
using lock_t = boost::unique_lock<mutex_t>;

lock_t lock() const { return lock_t(mx); }

private:
mutex_t mutable mx;
};

void threadFunc(myStruct &userData) {
trace("threadFunc enter");

do
{
{
auto lock = userData.lock();
if (!userData.keepRunning)
break;
} // destructor of unique_lock unlocks, exception safe


// Do some stuff
// ...
// ...
trace("(work)");
boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
} while (true);

trace("threadFunc exit");
}

int main() {
trace("Starting main");

boost::thread_group threads;
constexpr int N = 4;
std::vector<myStruct> data(N);

for (int i=0; i < N; ++i)
threads.create_thread(boost::bind(threadFunc, boost::ref(data[i])));

boost::this_thread::sleep_for(boost::chrono::seconds(1));

trace("Main signaling shutdown");

for (auto& d : data) {
auto lock = d.lock();
d.keepRunning = false;
}

threads.join_all();
trace("Bye");
}

void trace(std::string const& msg) {
static boost::mutex mx;
boost::lock_guard<boost::mutex> lk(mx);

static int thread_id_gen = 0;
thread_local int thread_id = thread_id_gen++;

std::cout << "Thread #" << thread_id << ": " << msg << "\n";
}

输出例如

Thread #0: Starting main
Thread #1: threadFunc enter
Thread #1: (work)
Thread #2: threadFunc enter
Thread #2: (work)
Thread #3: threadFunc enter
Thread #3: (work)
Thread #4: threadFunc enter
Thread #4: (work)
Thread #3: (work)
Thread #1: (work)
Thread #2: (work)
Thread #4: (work)
Thread #1: (work)
Thread #3: (work)
Thread #4: (work)
Thread #2: (work)
Thread #4: (work)
Thread #1: (work)
Thread #3: (work)
Thread #2: (work)
Thread #3: (work)
Thread #4: (work)
Thread #2: (work)
Thread #1: (work)
Thread #2: (work)
Thread #1: (work)
Thread #4: (work)
Thread #3: (work)
Thread #2: (work)
Thread #4: (work)
Thread #1: (work)
Thread #3: (work)
Thread #4: (work)
Thread #2: (work)
Thread #1: (work)
Thread #3: (work)
Thread #4: (work)
Thread #2: (work)
Thread #1: (work)
Thread #3: (work)
Thread #4: (work)
Thread #2: (work)
Thread #1: (work)
Thread #3: (work)
Thread #0: Main signaling shutdown
Thread #4: threadFunc exit
Thread #2: threadFunc exit
Thread #1: threadFunc exit
Thread #3: threadFunc exit
Thread #0: Bye

无锁演示

  1. 前面lockful例子的直译:

    Live On Coliru

  2. 但是,现在拥有一个共享的关闭标志可能更有意义:

    Live On Coliru

    #include <boost/thread.hpp>

    void trace(std::string const& msg);

    struct myStruct {
    int value;
    };

    void threadFunc(myStruct userData, boost::atomic_bool& keepRunning) {
    std::string valuestr = std::to_string(userData.value);
    trace("threadFunc enter(" + valuestr + ")");

    do
    {
    if (!keepRunning)
    break;

    // Do some stuff
    // ...
    // ...
    trace("(work" + valuestr + ")");
    boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
    } while (true);

    trace("threadFunc " + valuestr + " exit");
    }

    int main() {
    boost::atomic_bool running { true };
    trace("Starting main");

    boost::thread_group threads;
    constexpr int N = 4;

    for (int i=0; i < N; ++i) {
    threads.create_thread(
    boost::bind(threadFunc, myStruct{i}, boost::ref(running)
    ));
    }

    boost::this_thread::sleep_for(boost::chrono::seconds(1));

    trace("Main signaling shutdown");

    running = false;

    threads.join_all();
    trace("Bye");
    }

    void trace(std::string const& msg) {
    static boost::mutex mx;
    boost::lock_guard<boost::mutex> lk(mx);

    static int thread_id_gen = 0;
    thread_local int thread_id = thread_id_gen++;

    std::cout << "Thread #" << thread_id << ": " << msg << "\n";
    }

    请注意,我们可以使用 boost::bind¹

  3. threadFunc 的方式更“自然的 C++” >
  4. 使用interruption_point()

    请注意 interrupt() 的文档状态:

    Effects: If *this refers to a thread of execution, request that the thread will be interrupted the next time it enters one of the predefined interruption points with interruption enabled, or if it is currently blocked in a call to one of the predefined interruption points with interruption enabled [emphasis mine]

    值得注意的是,可以暂时禁用中断。 Boost 只为此提供支持 RAII 的类,因此默认情况下是异常安全的。如果您的代码使用 class disable_interruption or class restore_interruption您不必为此担心。

    Live On Coliru

    #include <boost/thread.hpp>

    void trace(std::string const& msg);

    struct myStruct {
    int value;
    };

    void threadFunc(myStruct userData) {
    std::string valuestr = std::to_string(userData.value);
    trace("threadFunc enter(" + valuestr + ")");

    // using `Case B` form from your question
    try {
    do
    {
    boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); // Avoids huge output for demo

    trace("(work" + valuestr + ")");
    boost::this_thread::interruption_point();
    } while (true);
    } catch(boost::thread_interrupted const& tie) {
    trace("threadFunc " + valuestr + " interrupted");
    }

    trace("threadFunc " + valuestr + " exit");
    }

    int main() {
    trace("Starting main");

    boost::thread_group threads;
    constexpr int N = 4;

    for (int i=0; i < N; ++i) {
    threads.create_thread(boost::bind(threadFunc, myStruct{i}));
    }

    boost::this_thread::sleep_for(boost::chrono::seconds(1));

    trace("Main signaling shutdown");
    threads.interrupt_all();

    threads.join_all();
    trace("Bye");
    }

    void trace(std::string const& msg) {
    static boost::mutex mx;
    boost::lock_guard<boost::mutex> lk(mx);

    static int thread_id_gen = 0;
    thread_local int thread_id = thread_id_gen++;

    std::cout << "Thread #" << thread_id << ": " << msg << "\n";
    }

¹ 或 lambda,如果您愿意的话

关于c++ - Boost threads - 处理线程中断的安全/保证方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34859533/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com