gpt4 book ai didi

c++ - std::vector 的并行写入器和读取器

转载 作者:太空狗 更新时间:2023-10-29 23:06:29 29 4
gpt4 key购买 nike

我有一个类同时被 2 个线程使用:一个线程将结果(一个接一个)添加到任务的 results 中,第二个线程处理那些 已经存在的结果

// all members are copy-able
struct task {
command cmd;
vector<result> results;
};

class generator {
public:
generator(executor* e); // store the ptr
void run();
...
};

class executor {
public:
void run();
void add_result(int command_id, result r);
task& find_task(int command_id);
...
private:
vector<task> tasks_;
condition_variable_any update_condition_;
};

启动

// In main, we have instances of generator and executor,
// we launch 2 threads and wait for them.
std::thread gen_th( std::bind( &generator::run, gen_instance_) );
std::thread exe_th( std::bind( &executor::run, exe_instance_) );

生成器线程

void generator::run() {
while(is_running) {
sleep_for_random_seconds();
executor_->add_result( SOME_ID, new_result() );
}
}

执行线程

void executor::add_result( int command_id, result r ) {
std::unique_lock<std::recursive_mutex> l(mutex_);
task& t = this->find_task(command_id);
t.results.push_back(r);
update_condition_.notify_all();
}

void executor::run() {
while(is_running) {
update_condition_.wait(...);
task& t = this->find_task(SOME_ID);
for(result r: t.results) {
// no live updates are visible here
}
}
}
  1. 生成器线程每隔几秒添加一个结果。
  2. 执行器线程本身就是一个执行器。它通过 run 方法运行,该方法等待更新,当更新发生时,它会处理结果。

注意事项:

  1. 任务 vector 可能很大;结果永远不会被丢弃;
  2. 执行器中的for-each 循环获取它正在处理的任务,然后遍历结果,检查其中哪些是新的并处理它们。一旦处理,它们将被标记并且不会被再次处理。此处理可能需要一些时间。

Executor Thread 在添加另一个结果之前没有完成 for 循环时会出现问题 - 结果对象在 for 循环中不可见。由于 Executor Thread 正在工作,它不会注意到更新条件更新,不会刷新 vector 等。当它完成时(在alread-not-actual view 上工作 tasks_) 它再次卡在 update_condition_.. 刚刚被触发。

我需要让代码知道,它应该在完成循环后再次运行循环for-each循环中可见的任务进行更改。这个问题的最佳解决方案是什么?

最佳答案

您只需要检查您的 vector 是否为空,阻塞 CV 之前。类似的东西:

while (running) {
std::unique_lock<std::mutex> lock(mutex);
while (tasks_.empty()) // <-- this is important
update_condition_.wait(lock);
// handle tasks_
}

如果您的体系结构允许(即,如果您在处理任务时不需要持有锁),您可能还希望在处理任务之前尽快解锁互斥量,以便生产者可以推送更多任务而无需阻塞。也许用一个临时 vector 交换你的 tasks_ vector ,然后解锁互斥量,然后才开始处理临时 vector 中的任务:

while (running) {
std::unique_lock<std::mutex> lock(mutex);
while (tasks_.empty())
update_condition_.wait(lock);
std::vector<task> localTasks;
localTasks.swap(tasks_);
lock.unlock(); // <-- release the lock early
// handle localTasks
}

编辑:啊,现在我意识到这并不适合你的情况,因为你的消息不直接在 tasks_ 中,而是在 tasks_.results。虽然您了解我的总体思路,但使用它需要更改代码的结构(例如,扁平化您的任务/结果并始终有一个与单个结果关联的 cmd)。

关于c++ - std::vector 的并行写入器和读取器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15460403/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com