gpt4 book ai didi

c++ - 使用单独的线程执行C++中的命令列表

转载 作者:行者123 更新时间:2023-12-02 10:29:38 25 4
gpt4 key购买 nike

我有一个DataClass类,其中有一些数据需要generatedcleaned
我不知道在程序执行期间的什么时候会有新数据,并且我试图使用多个线程来允许主要线程在处理数据时继续进行。
这是DataClass:

class DataClass
{
public:
unsigned int state{0};


void Generate()
{
using namespace std::chrono_literals;
std::this_thread::sleep_for(3s);
state = 1;
}

void Clean()
{
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
state = 2;
}
};
我将每个DataClass对象保存为两个 std::deque,一个包含需要生成的对象,另一个包含需要清除的对象。
std::deque<DataClass*> dataToGenerate;
std::deque<DataClass*> dataToClean;
我正在使用两个函数 CleanerFunctionGeneratorFunction,它们将处理两个列表的内容。 GeneratorFunction:
void GeneratorFunction()
{
while (!dataToGenerate.empty())
{
auto* c = dataToGenerate.front();
c->Generate();
dataToGenerate.pop_front();
dataToClean.push_back(c);
std::cout << "Generated one Data Piece." << std::endl;
}
}
(清洁工是相似的)。
在主函数中,我为两个函数启动了两个线程。
但是线程会立即停止,因为两个列表都为空,因此每次都需要创建一个新线程。我已经研究过condition_variables,但是由于在网上找到的所有示例都与此场景不同,因此我无法使它们正常工作。
据我了解,如果我使用while(!dataToGenerate.empty()),我将在没有任何实际原因的情况下始终执行该行,始终填充线程,因此我不想使用此方法。
有没有办法我可以暂停每个线程,直到列表不再为空,然后启动线程?

最佳答案

这个问题的XY ratio似乎很高。我会仔细阅读任务排队和锁定,并就此重新描述您的解决方案。
特别是,您会发现这接近于生产者/消费者模式。
队列
让我们从最小的通用锁定队列开始:

template <typename T>
struct Queue {
Queue(size_t max = 50) : _max(max) {}

size_t enqueue(T v) {
std::unique_lock lk(_mx);
_cond.wait(lk, [this] { return (_max == 0) || (_storage.size() < _max); });

_storage.push_back(std::move(v));
_cond.notify_one();
return _storage.size(); // NOTE: very racy load indicator
}

template <typename Duration>
std::optional<T> dequeue(Duration d) {
std::unique_lock lk(_mx);

if (_cond.wait_for(lk, d, [this] { return !_storage.empty(); })) {
auto top = std::move(_storage.front());

_storage.pop_front();
_cond.notify_one();

return top;
}

return std::nullopt;
}

private:
size_t _max;
mutable std::mutex _mx;
mutable std::condition_variable _cond;
std::deque<T> _storage;
};
这永远不会阻止出队(因此您可以检测和处理队列为空)。除非达到一定的限制,否则原则上不会阻塞队列。将限制设为0以拥有无限制的队列。
现在,您可以拥有任意数量的队列,并且可以拥有任意数量的生产者/消费者。例如。:
程序逻辑
struct DataClass {
int id;
unsigned int state{ 0 };

DataClass(int id) : id(id) {}

void Generate() { sleep_for(3s); state = 1; }
void Clean() { sleep_for(1s); state = 2; }
};
现在,让我们制作一个具有4个生成器线程和2个更清洁线程的程序,监视2个队列(genTasks和cleanTasks)。
struct Program {
Program() {
auto worker_id = 1;
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
_workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
}

size_t createWork(DataClass task) {
return genTasks.enqueue(std::move(task));
}

~Program() {
_shutdown = true;
for (auto& th: _workers)
if (th.joinable()) th.join();
}

private:
Queue<DataClass> genTasks, cleanTasks;
std::atomic_bool _shutdown { false };
std::list<std::thread> _workers;

void generate_worker(int worker_id) {
while (!_shutdown) {
while (auto task = genTasks.dequeue(1s)) {
std::cout << "Worker #" << worker_id << " Generate: " << task->id << std::endl;
task->Generate();
cleanTasks.enqueue(std::move(*task));
}
}
std::cout << "Worker #" << worker_id << " Exit generate_worker" << std::endl;
}

void clean_worker(int worker_id) {
while (!_shutdown) {
while (auto task = cleanTasks.dequeue(1s)) {
std::cout << "Worker #" << worker_id << " Clean: " << task->id << std::endl;
task->Clean();
std::cout << "Worker #" << worker_id << " Done: " << task->id << std::endl;
}
}
std::cout << "Worker #" << worker_id << " Exit clean_worker" << std::endl;
}
};
我添加了一个 _shutdown标志,虽然效果不是很强(它会等到 worker 空闲至少一秒钟( dequeue(1s))。如果要进行更多的侵入式关闭,请通过worker循环添加一些 if (_shutdown) break;语句,这是一种很好的措施。
完整演示
让我们来做一些工作:
int main() {
Program p;

for (auto i : {1,2,3,4,5,6,7,8,9,10}) {
sleep_for((rand()%100) * 1ms);
p.createWork(i);
}

sleep_for(2.5s);
std::cout << "Load at createWork(42) is ~" << p.createWork(42) << std::endl;
sleep_for(2.5s);
std::cout << "Load at createWork(43) is ~" << p.createWork(43) << std::endl;

sleep_for(4s);
std::cout << "Initiating shutdown\n";
// Program destructor performs shutdown
}
版画
Live On Coliru
Worker #2 Generate: 1
Worker #1 Generate: 2
Worker #3 Generate: 3
Worker #4 Generate: 4
Worker #2 Generate: 5
Worker #5 Clean: 1
Load at createWork(42) is ~6
Worker #1 Generate: 6
Worker #6 Clean: 2
Worker #3 Generate: 7
Worker #4 Generate: 8
Worker #5 Done: 1
Worker #5 Clean: 3
Worker #6 Done: 2
Worker #6 Clean: 4
Worker #5 Done: 3
Worker #6 Done: 4
Load at createWork(43) is ~4
Worker #2 Generate: 9
Worker #5 Clean: 5
Worker #1 Generate: 10
Worker #6 Clean: 6
Worker #3 Generate: 42
Worker #4 Generate: 43
Worker #5 Done: 5
Worker #5 Clean: 7
Worker #6 Done: 6
Worker #6 Clean: 8
Worker #5 Done: 7
Worker #6 Done: 8
Worker #5 Clean: 9
Worker #6 Clean: 10
Initiating shutdown
Worker #2 Exit generate_worker
Worker #5 Done: 9
Worker #5 Clean: 42
Worker #1 Exit generate_worker
Worker #6 Done: 10
Worker #6 Clean: 43
Worker #3 Exit generate_worker
Worker #4 Exit generate_worker
Worker #5 Done: 42
Worker #6 Done: 43
Worker #5 Exit clean_worker
Worker #6 Exit clean_worker
Unfinished generate/clean tasks: 0/0
完整 list
Live On Coliru
#include <mutex>
#include <condition_variable>
#include <deque>
#include <optional>

template <typename T>
struct Queue {
Queue(size_t max = 50) : _max(max) {}

size_t enqueue(T v) {
std::unique_lock lk(_mx);
_cond.wait(lk, [this] { return (_max == 0) || (_storage.size() < _max); });

_storage.push_back(std::move(v));
_cond.notify_one();
return _storage.size(); // NOTE: very racy load indicator
}

template <typename Duration>
std::optional<T> dequeue(Duration d) {
std::unique_lock lk(_mx);

if (_cond.wait_for(lk, d, [this] { return !_storage.empty(); })) {
auto top = std::move(_storage.front());

_storage.pop_front();
_cond.notify_one();

return top;
}

return std::nullopt;
}

size_t size() const { // racy in multi-thread situations
std::unique_lock lk(_mx);
return _storage.size();
}

private:
size_t _max;
mutable std::mutex _mx;
mutable std::condition_variable _cond;
std::deque<T> _storage;
};

#include <chrono>
#include <thread>
#include <iostream>
#include <list>
#include <atomic>
using namespace std::chrono_literals;
static inline auto sleep_for = [](auto d) { std::this_thread::sleep_for(d); };

struct DataClass {
int id;
unsigned int state{ 0 };

DataClass(int id) : id(id) {}
//DataClass(DataClass&&) = default;
//DataClass& operator=(DataClass&&) = default;
//DataClass(DataClass const&) = delete;

void Generate() { sleep_for(3s); state = 1; }
void Clean() { sleep_for(1s); state = 2; }
};

struct Program {
Program() {
auto worker_id = 1;
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { generate_worker(id); });
_workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
_workers.emplace_back([this, id=worker_id++] { clean_worker(id); });
}

size_t createWork(DataClass task) {
return genTasks.enqueue(std::move(task));
}

~Program() {
_shutdown = true;
for (auto& th: _workers)
if (th.joinable()) th.join();
std::cout << "Unfinished generate/clean tasks: " << genTasks.size() << "/" << cleanTasks.size() << "\n";
}

private:
Queue<DataClass> genTasks, cleanTasks;
std::atomic_bool _shutdown { false };
std::list<std::thread> _workers;

void generate_worker(int worker_id) {
while (!_shutdown) {
while (auto task = genTasks.dequeue(1s)) {
std::cout << "Worker #" << worker_id << " Generate: " << task->id << std::endl;
task->Generate();
cleanTasks.enqueue(std::move(*task));
}
}
std::cout << "Worker #" << worker_id << " Exit generate_worker" << std::endl;
}

void clean_worker(int worker_id) {
while (!_shutdown) {
while (auto task = cleanTasks.dequeue(1s)) {
std::cout << "Worker #" << worker_id << " Clean: " << task->id << std::endl;
task->Clean();
std::cout << "Worker #" << worker_id << " Done: " << task->id << std::endl;
}
}
std::cout << "Worker #" << worker_id << " Exit clean_worker" << std::endl;
}
};

int main() {
Program p;

for (auto i : {1,2,3,4,5,6,7,8,9,10}) {
sleep_for((rand()%100) * 1ms);
p.createWork(i);
}

sleep_for(2.5s);
std::cout << "Load at createWork(42) is ~" << p.createWork(42) << std::endl;
sleep_for(2.5s);
std::cout << "Load at createWork(43) is ~" << p.createWork(43) << std::endl;

sleep_for(4s);
std::cout << "Initiating shutdown\n";
// Program destructor performs shutdown
}

关于c++ - 使用单独的线程执行C++中的命令列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62854388/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com