gpt4 book ai didi

c++ - 在 boost asio 中为异步操作的函数处理程序分配线程的困难

转载 作者:太空宇宙 更新时间:2023-11-04 12:56:19 25 4
gpt4 key购买 nike

我知道运行 io_service.run() 的线程负责执行异步操作的函数处理程序,但我在为在父异步操作的回调函数中触发的异步操作分配线程时遇到问题。例如考虑下面的程序:

#ifdef WIN32
#define _WIN32_WINNT 0x0501
#include <stdio.h>
#endif

#include <fstream> // for writting to file
#include <iostream> // for writting to file
#include <stdlib.h> // atoi (string to integer)
#include <chrono>
#include <boost/thread.hpp> // for multi threading
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <signal.h> // For Interrupt Handling (Signal Handling Event)
#include <vector>


#define max_length 46
#define server_ip1 "127.0.0.1"
//#define server_ip2 "127.0.0.1"
#define server_port 4000

#define MEM_FN(x) boost::bind(&self_type::x, shared_from_this())
#define MEM_FN1(x,y) boost::bind(&self_type::x, shared_from_this(),y)
#define MEM_FN2(x,y,z) boost::bind(&self_type::x, shared_from_this(),y,z)

void talk1();
using namespace boost::asio;

io_service service, service2;
std::chrono::time_point<std::chrono::high_resolution_clock> t_start;

ip::udp::socket sock1(service);
ip::udp::endpoint ep1( ip::address::from_string(server_ip1), 4000);
//ip::udp::socket sock2(service);
//ip::udp::endpoint ep2( ip::address::from_string(server_ip2), 4000);

std::chrono::time_point<std::chrono::high_resolution_clock> tc;
int OnCon[2];

class talk_to_svr1 : public boost::enable_shared_from_this<talk_to_svr1>, boost::noncopyable {
typedef talk_to_svr1 self_type;
talk_to_svr1(const std::string & message, ip::udp::endpoint ep) : started_(true), message_(message) {}

void start(ip::udp::endpoint ep) {
do_write(message_);
}
public:
typedef boost::system::error_code error_code;
typedef boost::shared_ptr<talk_to_svr1> ptr;

static ptr start(ip::udp::endpoint ep, const std::string & message) {
ptr new_(new talk_to_svr1(message, ep));
new_->start(ep);
return new_;
}
bool started() { return started_; }

private:
void on_read(const error_code & err, size_t bytes) {
this->t2 = std::chrono::high_resolution_clock::now(); // Time of finished reading
if ( !err) {
auto t0_rel = 1.e-9*std::chrono::duration_cast<std::chrono::nanoseconds>(t0-t_start).count();
auto t1_rel = 1.e-9*std::chrono::duration_cast<std::chrono::nanoseconds>(t1-t_start).count();
auto t2_rel = 1.e-9*std::chrono::duration_cast<std::chrono::nanoseconds>(t2-t_start).count();
std::cout << "Sock1: " << t0_rel << ", " << t1_rel << ", " << t2_rel << std::endl;
std::string msg(read_buffer_, bytes);
std::cout << msg << std::endl;
}
else {
std::cout << "Error occured in reading data from server (Sock1)" << std::endl;
}
}

void on_write(const error_code & err, size_t bytes) {
this->t1 = std::chrono::high_resolution_clock::now(); // Time of finished writting
std::cout << "Sock1 successfully sent " << bytes << " bytes of data" << std::endl;
do_read();
}

void do_read() {
sock1.async_receive_from(buffer(read_buffer_),ep1 ,MEM_FN2(on_read,_1,_2));

}

void do_write(const std::string & msg) {
if ( !started() ) return;
std::copy(msg.begin(), msg.end(), write_buffer_);

this->t0 = std::chrono::high_resolution_clock::now(); // Time of starting to write
sock1.async_send_to( buffer(write_buffer_, msg.size()), ep1, MEM_FN2(on_write,_1,_2) );
}

public:
std::chrono::time_point<std::chrono::high_resolution_clock> t0; // Time of starting to write
std::chrono::time_point<std::chrono::high_resolution_clock> t1; // Time of finished writting
std::chrono::time_point<std::chrono::high_resolution_clock> t2; // Time of finished reading

private:
int indx;
char read_buffer_[max_length];
char write_buffer_[max_length];
bool started_;
std::string message_;
};

void wait_s(int seconds)
{
boost::this_thread::sleep_for(boost::chrono::seconds{seconds});
}

void wait_ms(int msecs) {
boost::this_thread::sleep( boost::posix_time::millisec(msecs));
}

void async_thread() {
service.run();
}

void async_thread2() {
service2.run();
}
void GoOperational(int indx) {


if (indx == 0) {

talk_to_svr1::start(ep1, "Message01");
wait_s(1);
talk_to_svr1::start(ep1, "Message02");
wait_s(2);
}
else if (indx == 1) {
//talk_to_svr2::start(ep2, "Masoud");
wait_s(1);
//talk_to_svr2::start(ep2, "Ahmad");
wait_s(2);
}
else {
std::cout << "Wrong index!." << std::endl;
}
}

void on_connect(const boost::system::error_code & err, int ii) {
std::cout << "Socket "<< ii << " is connected."<< std::endl;
OnCon[ii] = 1;
if ( !err) {
tc = std::chrono::high_resolution_clock::now();
auto ty = 1.e-9*std::chrono::duration_cast<std::chrono::nanoseconds>(tc-t_start).count();
std::cout << "Sock " << ii << " connected at time: " << ty << " seconds" << std::endl;

if ( (OnCon[0] /*+ OnCon[1]*/ ) == 1) {
GoOperational(0);
//GoOperational(1);
}
}
else {
std::cout << "Socket " << ii << "had a problem for connecting to server.";
}
}

int main(int argc, char* argv[]) {
OnCon[0] = 0;
OnCon[1] = 0;
ep1 = ep1;
//ep2 = ep2;
std::cout.precision(9);
std::cout << "///////////////////////" << std::endl;
std::cout << "Socket Number, Time of starting to write, Time of finished writting, time of finished reading" << std::endl;
t_start = std::chrono::high_resolution_clock::now();
sock1.async_connect(ep1, boost::bind(on_connect, boost::asio::placeholders::error, 0));
//sock2.async_connect(ep2, boost::bind(on_connect, boost::asio::placeholders::error, 1));

boost::thread b{boost::bind(async_thread)};
b.join();

}

在这个程序中,我有一个名为 sock1 的全局 udp 套接字,它将通过在 main 函数的第 9 行运行 sock1.async_connect() 来连接。在这个异步操作的回调函数中,我创建了两个 talk_to_svr1 类的实例,每个实例负责向服务器发送消息,然后异步接收来自服务器的响应。在发送第二条消息之前我需要等待 3 秒,这就是为什么我在创建 talk_to_svr1 的第二个实例之前调用 wait_s(1) 的原因。 问题是调用 wait_s(1) 除了暂停主线程外还会暂停不需要的异步发送操作。

如果有人可以更改上述代码,让另一个线程负责向服务器异步发送消息,以便调用 wait_s(1) 不会暂停发送操作,我将不胜感激。

最佳答案

Note: posted an alternative using coroutines as well

根据定义,异步编码 不需要您“控制”线程。事实上,你不应该需要线程。当然,您不能阻止内部完成处理程序,因为这会阻碍进度。

您可以简单地使用一个计时器,在 3 秒后到期,async_wait 并在其完成处理程序中发送第二个请求。

这是对您的代码的一次大清理。请注意,我删除了所有对全局变量的使用。他们使事情变得非常容易出错并导致大量重复(事实上 talk_to_svr1 硬编码 ep1sock1 所以它对你的第二个没有用 channel ,大部分被注释掉了)。

改变的关键是让message_operation接续:

template <typename F_>
void async_message(udp::socket& s, std::string const& message, F_&& handler) {
using Op = message_operation<F_>;
boost::shared_ptr<Op> new_(new Op(s, message, std::forward<F_>(handler)));
new_->do_write();
}

当消息/响应完成时,调用handler。现在,我们可以实现应用程序协议(protocol)(基本上是您试图在 on_connect/GoOperational 中捕获的内容):

////////////////////////////////////////////////////
// basic protocol (2 messages, 1 delay)
struct ApplicationProtocol {
ApplicationProtocol(ba::io_service& service, udp::endpoint ep, std::string m1, std::string m2, std::chrono::seconds delay = 3s)
: _service(service),
_endpoint(ep),
message1(std::move(m1)), message2(std::move(m2)),
delay(delay), timer(service)
{ }

void go() {
_socket.async_connect(_endpoint, boost::bind(&ApplicationProtocol::on_connect, this, _1));
}

private:
ba::io_service& _service;
udp::socket _socket{_service};
udp::endpoint _endpoint;

std::string message1, message2;
std::chrono::seconds delay;
ba::high_resolution_timer timer;

void on_connect(error_code ec) {
std::cout << _endpoint << " connected at " << relatime() << " ms\n";

if (!ec) {
async_message(_socket, message1, boost::bind(&ApplicationProtocol::on_message1_sent, this, _1, _2));
} else {
std::cout << "Socket had a problem for connecting to server.";
}
}

void on_message1_sent(error_code ec, std::string response) {
if (ec)
std::cout << "Message 1 failed: " << ec.message() << "\n";
else {
std::cout << "Message 1 returned: '" << response << "'\n";

timer.expires_from_now(delay);
timer.async_wait(boost::bind(&ApplicationProtocol::on_delay_complete, this, _1));
}
}

void on_delay_complete(error_code ec) {
if (ec)
std::cout << "Delay faile: " << ec.message() << "\n";
else {
std::cout << "Delay completed\n";

async_message(_socket, message2, boost::bind(&ApplicationProtocol::on_message2_sent, this, _1, _2));
}
}

void on_message2_sent(error_code ec, std::string response) {
if (ec)
std::cout << "Message 2 failed: " << ec.message() << "\n";
else {
std::cout << "Message 2 returned: '" << response << "'\n";
}
}
};

请注意使用它变得多么简单:

int main() {
ba::io_service service;

std::cout.precision(2);
std::cout << std::fixed;

ApplicationProtocol
channel1(service, {{}, 4000}, "Message01\n", "Message02\n", 3s),
channel2(service, {{}, 4001}, "Masoud\n", "Ahmad\n", 2s);

channel1.go();
channel2.go();

service.run();
}

像这样运行两个 udp 服务时:

yes first|nl|netcat -ulp 4000& yes second|nl|netcat -ulp 4001& time wait

我们得到以下输出: Live On Coliru

0.0.0.0:4000 connected at 1.87 ms
0.0.0.0:4001 connected at 1.99 ms
127.0.0.1:4000 successfully sent 10 bytes of data
127.0.0.1:4001 successfully sent 7 bytes of data
127.0.0.1:4000: start 1.91, written 2.03, finished 2.25 ms
Message 1 returned: ' 1 first
2 first
3 first
4 '
127.0.0.1:4001: start 2.00, written 2.06, finished 2.34 ms
Message 1 returned: ' 1 second
2 second
3 second
'
Delay completed
127.0.0.1:4001 successfully sent 6 bytes of data
127.0.0.1:4001: start 2002.46, written 2002.49, finished 2002.53 ms
Message 2 returned: '47 second
148 second
149 second
150 s'
Delay completed
127.0.0.1:4000 successfully sent 10 bytes of data
127.0.0.1:4000: start 3002.36, written 3002.39, finished 3002.41 ms
Message 2 returned: 'first
159 first
160 first
161 first
'

服务器端依次收到如下消息:

enter image description here

完整代码

Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>

#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <chrono>
#include <iostream>

#define MEM_FN2(x, y, z) boost::bind(&self_type::x, shared_from_this(), y, z)

namespace ba = boost::asio;
using ba::ip::udp;
using boost::system::error_code;
using ba::asio_handler_invoke;

////////////////////////////////////////////////////
// timing stuff
using namespace std::chrono_literals;
using hrclock = std::chrono::high_resolution_clock;
using time_point = hrclock::time_point;

static double relatime(time_point tp = hrclock::now()) {
static const time_point t_start = hrclock::now();
return (tp - t_start)/1.0ms;
}

////////////////////////////////////////////////////
// message operation - with F continuation
template <typename F>
class message_operation : public boost::enable_shared_from_this<message_operation<F> >, boost::noncopyable {
typedef message_operation self_type;

template <typename F_>
friend void async_message(udp::socket&, std::string const&, F_&&);

private:
template <typename F_>
message_operation(udp::socket& s, std::string message, F_&& handler)
: _socket(s), _endpoint(s.remote_endpoint()), handler_(std::forward<F_>(handler)), message_(std::move(message)) {}

using boost::enable_shared_from_this<message_operation>::shared_from_this;
void do_write() {
t0 = hrclock::now(); // Time of starting to write
_socket.async_send_to(ba::buffer(message_), _endpoint, MEM_FN2(on_write, _1, _2));
}

void on_write(const error_code & err, size_t bytes) {
t1 = hrclock::now(); // Time of finished writting
if (err)
handler_(err, "");
else
{
std::cout << _endpoint << " successfully sent " << bytes << " bytes of data\n";
do_read();
}
}

void do_read() {
_socket.async_receive_from(ba::buffer(read_buffer_), _sender, MEM_FN2(on_read, _1, _2));
}

void on_read(const error_code &err, size_t bytes) {
t2 = hrclock::now(); // Time of finished reading

if (!err) {
std::cout << _endpoint
<< ": start " << relatime(t0)
<< ", written " << relatime(t1)
<< ", finished " << relatime(t2)
<< " ms\n";

handler_(err, std::string(read_buffer_, bytes));
} else {
std::cout << "Error occured in reading data from server\n";
}
}

time_point t0, t1, t2; // Time of starting to write, finished writting, finished reading

// params
udp::socket& _socket;
udp::endpoint _endpoint;
F handler_;

// sending
std::string message_;

// receiving
udp::endpoint _sender;
char read_buffer_[46];
};

template <typename F_>
void async_message(udp::socket& s, std::string const& message, F_&& handler) {
using Op = message_operation<F_>;
boost::shared_ptr<Op> new_(new Op(s, message, std::forward<F_>(handler)));
new_->do_write();
}

////////////////////////////////////////////////////
// basic protocol (2 messages, 1 delay)
struct ApplicationProtocol {
ApplicationProtocol(ba::io_service& service, udp::endpoint ep, std::string m1, std::string m2, std::chrono::seconds delay = 3s)
: _service(service),
_endpoint(ep),
message1(std::move(m1)), message2(std::move(m2)),
delay(delay), timer(service)
{ }

void go() {
_socket.async_connect(_endpoint, boost::bind(&ApplicationProtocol::on_connect, this, _1));
}

private:
ba::io_service& _service;
udp::socket _socket{_service};
udp::endpoint _endpoint;

std::string message1, message2;
std::chrono::seconds delay;
ba::high_resolution_timer timer;

void on_connect(error_code ec) {
std::cout << _endpoint << " connected at " << relatime() << " ms\n";

if (!ec) {
async_message(_socket, message1, boost::bind(&ApplicationProtocol::on_message1_sent, this, _1, _2));
} else {
std::cout << "Socket had a problem for connecting to server.";
}
}

void on_message1_sent(error_code ec, std::string response) {
if (ec)
std::cout << "Message 1 failed: " << ec.message() << "\n";
else {
std::cout << "Message 1 returned: '" << response << "'\n";

timer.expires_from_now(delay);
timer.async_wait(boost::bind(&ApplicationProtocol::on_delay_complete, this, _1));
}
}

void on_delay_complete(error_code ec) {
if (ec)
std::cout << "Delay faile: " << ec.message() << "\n";
else {
std::cout << "Delay completed\n";

async_message(_socket, message2, boost::bind(&ApplicationProtocol::on_message2_sent, this, _1, _2));
}
}

void on_message2_sent(error_code ec, std::string response) {
if (ec)
std::cout << "Message 2 failed: " << ec.message() << "\n";
else {
std::cout << "Message 2 returned: '" << response << "'\n";
}
}
};

int main() {
ba::io_service service;
relatime(); // start the clock

std::cout.precision(2);
std::cout << std::fixed;

ApplicationProtocol
channel1(service, {{}, 4000}, "Message01\n", "Message02\n", 3s),
channel2(service, {{}, 4001}, "Masoud\n", "Ahmad\n", 2s);

channel1.go();
channel2.go();

service.run();
}

关于c++ - 在 boost asio 中为异步操作的函数处理程序分配线程的困难,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46495543/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com