- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 C++ 11 同步功能实现主从模型进行练习。该模型使用一个 std::queue 对象以及一个条件变量和一些互斥锁。主线程将任务放入队列中,工作线程从队列中弹出一个任务并“处理”它们。
当我不终止工作线程时,我的代码可以正常工作(除非我错过了一些竞争条件)。但是,程序永远不会结束,直到您使用 Ctrl+C 手动终止它。我有一些代码可以在主线程完成后终止工作人员。不幸的是,这不能正常工作,因为它会跳过某些执行运行的最后一个任务。
所以我的问题是:是否可以在处理完所有任务后安全且正确地终止工作线程?
这只是一个概念证明,我是 C++ 11 功能的新手,所以我为我的风格道歉。我感谢任何建设性的批评。
编辑: nogard 友善地指出模型的这种实现使其变得相当复杂,并向我表明我所要求的是毫无意义的,因为好的实现不会有这个问题。线程池是正确实现这一点的方法。此外,我应该为 worker_done 使用 std::atomic 而不是普通 bool 值(感谢 Jarod42)。
#include <iostream>
#include <sstream>
#include <string>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>
//To sleep
#include <unistd.h>
struct Task
{
int taskID;
};
typedef struct Task task;
//cout mutex
std::mutex printstream_accessor;
//queue related objects
std::queue<task> taskList;
std::mutex queue_accessor;
std::condition_variable cv;
//worker flag
bool worker_done = false;
//It is acceptable to call this on a lock only if you poll - you will get an inaccurate answer otherwise
//Will return true if the queue is empty, false if not
bool task_delegation_eligible()
{
return taskList.empty();
}
//Thread safe cout function
void safe_cout(std::string input)
{
// Apply a stream lock and state the calling thread information then print the input
std::unique_lock<std::mutex> cout_lock(printstream_accessor);
std::cout << "Thread:" << std::this_thread::get_id() << " " << input << std::endl;
}//cout_lock destroyed, therefore printstream_accessor mutex is unlocked
void worker_thread()
{
safe_cout("worker_thread() initialized");
while (!worker_done)
{
task getTask;
{
std::unique_lock<std::mutex> q_lock(queue_accessor);
cv.wait(q_lock,
[]
{ //predicate that will check if available
//using a lambda function to apply the ! operator
if (worker_done)
return true;
return !task_delegation_eligible();
}
);
if (!worker_done)
{
//Remove task from the queue
getTask = taskList.front();
taskList.pop();
}
}
if (!worker_done)
{
//process task
std::string statement = "Processing TaskID:";
std::stringstream convert;
convert << getTask.taskID;
statement += convert.str();
//print task information
safe_cout(statement);
//"process" task
usleep(5000);
}
}
}
/**
* master_thread():
* This thread is responsible for creating task objects and pushing them onto the queue
* After this, it will notify all other threads who are waiting to consume data
*/
void master_thread()
{
safe_cout("master_thread() initialized");
for (int i = 0; i < 10; i++)
{
//Following 2 lines needed if you want to don't want this thread to bombard the queue with tasks before processing of a task can be done
while (!task_delegation_eligible() ) //task_eligible() is true IFF queue is empty
std::this_thread::yield(); //yield execution to other threads (if there are tasks on the queue)
//create a new task
task newTask;
newTask.taskID = (i+1);
//lock the queue then push
{
std::unique_lock<std::mutex> q_lock(queue_accessor);
taskList.push(newTask);
}//unique_lock destroyed here
cv.notify_one();
}
safe_cout("master_thread() complete");
}
int main(void)
{
std::thread MASTER_THREAD(master_thread); //create a thread object named MASTER_THREAD and have it run the function master_thread()
std::thread WORKER_THREAD_1(worker_thread);
std::thread WORKER_THREAD_2(worker_thread);
std::thread WORKER_THREAD_3(worker_thread);
MASTER_THREAD.join();
//wait for the queue tasks to finish
while (!task_delegation_eligible()); //wait if the queue is full
/**
* Following 2 lines
* Terminate worker threads => this doesn't work as expected.
* The model is fine as long as you don't try to stop the worker
* threads like this as it might skip a task, however this program
* will terminate
*/
worker_done = true;
cv.notify_all();
WORKER_THREAD_1.join();
WORKER_THREAD_2.join();
WORKER_THREAD_3.join();
return 0;
}
非常感谢
最佳答案
您的程序中存在可见性问题:工作线程可能无法观察到在一个线程中对 worker_done
标志所做的更改。为了保证第二个 Action 可以观察到一个 Action 的结果,那么您必须使用某种形式的同步来确保第二个线程看到第一个线程所做的事情。要解决此问题,您可以按照 Jarod42 的建议使用 atomic。
如果你做这个程序是为了练习它很好,但对于真正的应用程序,你可以从现有的 thread pool 中获益,这将大大简化您的代码。
关于c++ - 当工作线程完成时,我如何安全地终止它们?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24645645/
在我的 java 代码中,我做了类似的事情: int sleep = 0; sleep(sleep); sleep++; 被bos指出不好。它不能在 php 中正常工作。 在 java 中使用上述代码
我有一个程序使用第 3 方库进行一些计算,我在与主应用程序不同的线程上启动该程序。不幸的是,此计算可能需要很长时间,并且不提供进度更新和取消的接口(interface)。 为了拥有这样的界面,我想创建
C++ 是否有任何等效于 python 的函数 os.path.join?基本上,我正在寻找将文件路径的两个(或多个)部分组合在一起的东西,这样您就不必担心确保这两个部分完美地结合在一起。如果它在 Q
我正试图站起来(非商业)web application使用 neo4j Community 3.5.2 作为后端。 应用程序以两种方式与图形交互: 服务器端(安全的)用 flask 编写新的节点和关系
我正在开发一个将有许多外部用户的在线应用程序。至于现在,我的连接方法是为所有用户托管一个中央数据库,而他们从自己的服务器文件连接。 方法: PHP 连接文件(托管在他们的服务器上;文件由我提供) >>
我创建了一个将所有事件通知代码转换为字符串的函数。真的很简单。 我有一堆常量,比如 const _bstr_t DIRECTSHOW_MSG_EC_ACTIVATE("A video window i
我想将(附加)信息从过滤器传递到资源。我目前尝试这样做的方式是,在 Filter 中: getContext().getAttributes().put("additionalInformation"
我想计算转换系数。为此,我必须除以例如的最大值。 ushort 为 uchar 的最大值。 我想通过将参数传递给函数或类型名来动态地执行此操作。然后我想选择最大值并执行计算。 有两个问题: 如何动态选
我希望我的用户在用 Java 请求列表时能够编写自己的过滤器。 选项 1) 我正在考虑将 JavaScript 与 Rhino 结合使用。 我将用户的过滤器作为 javascript 字符串获取。然后
(安全地)提供来自不同域的图像是否符合 PCI 标准?我搜索了 PCI DSS 2.0 PDF,但没有找到任何引用资料。 最佳答案 图像不符合 PCI 合规性。 PCI DSS covers the
我们正在将 spring 和 hibernate 用于 web 应用程序:该应用程序有一个购物车,用户可以在其中放置商品。为了保存不同登录名之间要查看的项目,购物车中的项目值存储在表中。提交购物车时,
我正在为多个客户创建一个具有电子商务元素的 Rails 应用程序 - 我希望这些客户能够在管理区域中指定计算运费的公式;因为方法可能不同。 让我们假装一下,我允许他们输入 ruby 代码,然后我稍
我正在 Eclipse 中开发一个 Java 项目,使用 Maven 构建和管理依赖项。该项目分布在 5 个 Eclipse 项目中,其中一个是父 POM。我正在研究基于另一个团队实现的更复杂服务器的
我想在 ADO.NET 数据服务中存储每线程数据。在线程特定的静态变量上使用 ThreadStatic 属性是否安全,或者我会遇到问题吗?我担心的是,我的 ThreadStatic 变量在请求完成并且
Stackoverflow 上至少有一篇与此主题相关的帖子:Generate password in python 你会发现这个主题甚至在 PEP 中也受到了一些批评。这里提到:https://www
对于我工作中的一个项目,我需要创建一个独立的 Python 安装(来自源代码)。然而,完整的目录占用大约 90MB 的磁盘空间,虽然不多,但太多了,无法一遍又一遍地复制。 我可以从自定义 python
例如,我有一张学生表,我有一本 Python 字典 mydict = {"fname" : "samwise", "lname" : "gamgee", "age" : 13} 我怎样才能安全地生成一
我经常在代码中遇到使用 memset 手动零初始化的 POD 结构,如下所示: struct foo; memset(&foo, 0, sizeof(foo)); 我检查了 C++11 标准,它说:“
我是一名优秀的程序员,十分优秀!