gpt4 book ai didi

c++ - 当工作线程完成时,我如何安全地终止它们?

转载 作者:行者123 更新时间:2023-11-28 06:47:48 25 4
gpt4 key购买 nike

我正在尝试使用 C++ 11 同步功能实现主从模型进行练习。该模型使用一个 std::queue 对象以及一个条件变量和一些互斥锁。主线程将任务放入队列中,工作线程从队列中弹出一个任务并“处理”它们。

当我不终止工作线程时,我的代码可以正常工作(除非我错过了一些竞争条件)。但是,程序永远不会结束,直到您使用 Ctrl+C 手动终止它。我有一些代码可以在主线程完成后终止工作人员。不幸的是,这不能正常工作,因为它会跳过某些执行运行的最后一个任务。

所以我的问题是:是否可以在处理完所有任务后安全且正确地终止工作线程?

这只是一个概念证明,我是 C++ 11 功能的新手,所以我为我的风格道歉。我感谢任何建设性的批评。

编辑: nogard 友善地指出模型的这种实现使其变得相当复杂,并向我表明我所要求的是毫无意义的,因为好的实现不会有这个问题。线程池是正确实现这一点的方法。此外,我应该为 worker_done 使用 std::atomic 而不是普通 bool 值(感谢 Jarod42)。

#include <iostream>
#include <sstream>
#include <string>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>

//To sleep
#include <unistd.h>

struct Task
{
int taskID;
};
typedef struct Task task;


//cout mutex
std::mutex printstream_accessor;

//queue related objects
std::queue<task> taskList;
std::mutex queue_accessor;
std::condition_variable cv;

//worker flag
bool worker_done = false;

//It is acceptable to call this on a lock only if you poll - you will get an inaccurate answer otherwise
//Will return true if the queue is empty, false if not
bool task_delegation_eligible()
{
return taskList.empty();
}

//Thread safe cout function
void safe_cout(std::string input)
{
// Apply a stream lock and state the calling thread information then print the input
std::unique_lock<std::mutex> cout_lock(printstream_accessor);
std::cout << "Thread:" << std::this_thread::get_id() << " " << input << std::endl;
}//cout_lock destroyed, therefore printstream_accessor mutex is unlocked

void worker_thread()
{
safe_cout("worker_thread() initialized");
while (!worker_done)
{
task getTask;
{
std::unique_lock<std::mutex> q_lock(queue_accessor);
cv.wait(q_lock,
[]
{ //predicate that will check if available
//using a lambda function to apply the ! operator
if (worker_done)
return true;

return !task_delegation_eligible();
}
);

if (!worker_done)
{
//Remove task from the queue
getTask = taskList.front();
taskList.pop();
}
}

if (!worker_done)
{
//process task
std::string statement = "Processing TaskID:";
std::stringstream convert;
convert << getTask.taskID;
statement += convert.str();

//print task information
safe_cout(statement);

//"process" task
usleep(5000);
}
}
}

/**
* master_thread():
* This thread is responsible for creating task objects and pushing them onto the queue
* After this, it will notify all other threads who are waiting to consume data
*/
void master_thread()
{
safe_cout("master_thread() initialized");

for (int i = 0; i < 10; i++)
{
//Following 2 lines needed if you want to don't want this thread to bombard the queue with tasks before processing of a task can be done
while (!task_delegation_eligible() ) //task_eligible() is true IFF queue is empty
std::this_thread::yield(); //yield execution to other threads (if there are tasks on the queue)

//create a new task
task newTask;
newTask.taskID = (i+1);

//lock the queue then push
{
std::unique_lock<std::mutex> q_lock(queue_accessor);
taskList.push(newTask);
}//unique_lock destroyed here

cv.notify_one();
}

safe_cout("master_thread() complete");
}

int main(void)
{
std::thread MASTER_THREAD(master_thread); //create a thread object named MASTER_THREAD and have it run the function master_thread()
std::thread WORKER_THREAD_1(worker_thread);
std::thread WORKER_THREAD_2(worker_thread);
std::thread WORKER_THREAD_3(worker_thread);

MASTER_THREAD.join();

//wait for the queue tasks to finish
while (!task_delegation_eligible()); //wait if the queue is full

/**
* Following 2 lines
* Terminate worker threads => this doesn't work as expected.
* The model is fine as long as you don't try to stop the worker
* threads like this as it might skip a task, however this program
* will terminate
*/
worker_done = true;
cv.notify_all();

WORKER_THREAD_1.join();
WORKER_THREAD_2.join();
WORKER_THREAD_3.join();

return 0;
}

非常感谢

最佳答案

您的程序中存在可见性问题:工作线程可能无法观察到在一个线程中对 worker_done 标志所做的更改。为了保证第二个 Action 可以观察到一个 Action 的结果,那么您必须使用某种形式的同步来确保第二个线程看到第一个线程所做的事情。要解决此问题,您可以按照 Jarod42 的建议使用 atomic。

如果你做这个程序是为了练习它很好,但对于真正的应用程序,你可以从现有的 thread pool 中获益,这将大大简化您的代码。

关于c++ - 当工作线程完成时,我如何安全地终止它们?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24645645/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com