gpt4 book ai didi

c++ - 使用 std::async() 和 std::move() 将数据异步传递给另一个线程

转载 作者:行者123 更新时间:2023-11-30 03:58:51 24 4
gpt4 key购买 nike

我真的很想看看这两个线程函数 RunThread1() & RunThread1() 是如何并行运行的。 RunThread2() 每次执行 RunThread1() 时都会被阻塞,反之亦然。

我不想等到 future 完成,因此我正在使用 std::asyncstd::move我正在使用 scoped_lock,但我认为这不是问题。

我正在设计一个异步响应处理引擎,一个线程插入数据,另一个线程从另一端读取数据。

有什么问题可能出在哪里的建议吗?对整体设计的任何建议。

#include <windows.h>

#include <string>
#include <iostream>
#include <vector>
#include <deque>
#include <chrono>
#include <thread>
#include <future>

#include <boost/scoped_ptr.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>

using namespace std;
using namespace boost;

template<typename R>
bool Is_future_ready(std::future<R> const& f)
{
return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
}

std::vector<std::future<void>> pending_futures;

class A
{
private:
boost::thread* myFunc1Thread;
boost::thread* myFunc2Thread;

public:
A()
{
myFunc1Thread = nullptr;
myFunc2Thread = nullptr;
}

void RunThreads();
void RunThread1();
void RunThread2();

void PopulateResponses(vector<string> responses);
void PopulateResponse(string response);

struct Record
{
char response[128];

Record(const char* response)
{
memset(this->response,0,sizeof(this->response));
strcpy(this->response, response);
}

~Record()
{
}

Record& operator= (const Record& cmd)
{
if(this == &cmd) // Same object?
{
return *this;
}

memset(this->response,0,sizeof(this->response));
strcpy(this->response, cmd.response);
return *this;
}
};

typedef deque<Record> RecordsQueue;
};

boost::mutex ResponseMutex;

A::RecordsQueue Records;

void A::RunThreads()
{
myFunc1Thread = new boost::thread(boost::bind(&A::RunThread1, this));
HANDLE threadHandle1 = myFunc1Thread->native_handle();
SetThreadPriority(threadHandle1, THREAD_PRIORITY_NORMAL);

myFunc2Thread = new boost::thread(boost::bind(&A::RunThread2, this));
HANDLE threadHandle2 = myFunc2Thread->native_handle();
SetThreadPriority(threadHandle2, THREAD_PRIORITY_NORMAL);

myFunc1Thread->join();
myFunc2Thread->join();
}

void A::PopulateResponse(string response)
{
Records.push_back(Record(response.c_str()));
}

void A::PopulateResponses(vector<string> responses)
{
boost::mutex::scoped_lock lock(ResponseMutex);
std::for_each(responses.begin(), responses.end(), bind1st(mem_fun(&A::PopulateResponse), this));
}

void A::RunThread1()
{
int i = 0;

while(true)
{
vector<string> responses;
responses.push_back(to_string(i));
cout<< "Added: " << to_string(i) << endl;
i++;

pending_futures.erase(std::remove_if( pending_futures.begin(), pending_futures.end(), Is_future_ready<void>), pending_futures.end());
auto f = std::async (std::launch::async, &A::PopulateResponses, this, responses);
pending_futures.push_back(std::move(f));
}
}

void A::RunThread2()
{
while(true)
{
boost::mutex::scoped_lock lock(ResponseMutex);
if(!Records.empty())
{
Record res = Records.front();
cout<< "Processed: " << res.response << endl;

//some lengthy processing...., let's use sleep() to depict that
boost::this_thread::sleep(boost::posix_time::seconds(1));

Records.pop_front();
}
}
}

int main()
{
A a;
a.RunThreads();
}

最佳答案

你在一个紧密的循环中添加 future :

void RunThread1() {
while(true)
{
// ...
auto f = std::async (std::launch::async, &A::PopulateResponses, this, responses);
pending_futures.push_back(std::move(f));
}
}

难怪没有什么能跟得上它。其他线程正在执行所有锁定(线程 1 没有阻塞操作,尽管 Is_future_ready 可能会强制让出线程,我不确定)。

在循环中的某处添加一个 sleep ,您会发现事情按预期进行。

boost::this_thread::sleep_for(boost::chrono::seconds(1));

请记住,这仍然很脆弱:它取决于时间是否正确。为了更普遍地健壮,使用适当的消息/任务队列并在队列已满时阻止推送端。

关于c++ - 使用 std::async() 和 std::move() 将数据异步传递给另一个线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27223008/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com