gpt4 book ai didi

c++ - boost::asio async_receive_from UDP 端点在线程之间共享?

转载 作者:可可西里 更新时间:2023-11-01 16:40:33 25 4
gpt4 key购买 nike

Boost asio 专门允许多个线程调用 io_service 上的 run() 方法。这似乎是创建多线程 UDP 服务器的好方法。但是,我遇到了一个问题,我正在努力寻找答案。

查看典型的 async_receive_from 调用:

m_socket->async_receive_from(
boost::asio::buffer(m_recv_buffer),
m_remote_endpoint,
boost::bind(
&udp_server::handle_receive,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));

远程端点和消息缓冲区没有传递给处理程序,而是处于更高的范围级别(在我的示例中为成员变量)。在 UDP 消息到达时处理它的代码如下所示:

void dns_server::handle_receive(const boost::system::error_code &error, std::size_t size)
{
// process message
blah(m_recv_buffer, size);

// send something back
respond(m_remote_endpoint);
}

如果有多个线程在运行,同步是如何进行的?在线程之间共享一个端点和接收缓冲区意味着 asio 在消息同时到达的情况下等待处理程序在单个线程中完成,然后再在另一个线程中调用处理程序。这似乎否定了允许多个线程首先调用 run 的意义。

如果我想获得请求的并发服务,看起来我需要将工作数据包连同端点的拷贝交给一个单独的线程,允许处理程序方法立即返回,以便 asio 可以获取on 并将另一条消息并行传递给另一个调用 run() 的线程。

这似乎有点令人讨厌。我在这里缺少什么?

最佳答案

Having a single end point and receive buffer shared between the threads implies that asio waits for a handler to complete within a single thread

如果您的意思是“使用单线程运行服务时”,那么这是正确的。

否则,情况并非如此。相反,当您同时调用单个服务对象(即套接字,而不是 io_service)上的操作时,Asio 只是说行为是“未定义的”。

That seems to negate the point of allowing multiple threads to call run in the first place.

除非处理需要相当长的时间,否则不会。

第一段介绍Timer.5 sample 似乎很好地阐述了您的主题。

session

要分离特定于请求的数据(缓冲区和端点),您需要一些 session 概念。 Asio 中一个流行的机制是 bound shared_ptr s 或共享自此 session 类(boost 绑定(bind)支持直接绑定(bind)到 boost::shared_ptr 实例)。

避免并发、不同步地访问 m_socket 的成员您可以添加锁或使用 strand上面链接的 Timer.5 示例中记录的方法。

演示

供您欣赏的是 Daytime.6 异步 UDP 日间服务器,修改为与许多服务 IO 线程一起工作。

请注意,从逻辑上讲,仍然只有一个 IO 线程(strand),因此我们没有违反套接字类的文档化线程安全性。

然而,与官方示例不同,响应可能会乱序排队,这取决于 udp_session::handle_request 中实际处理所花费的时间。 .

注意

  • 一个udp_session用于保存每个请求的缓冲区和远程端点的类
  • 一个线程池,能够在多个内核上扩展实际处理(而非 IO)的负载。
#include <ctime>
#include <iostream>
#include <string>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

using namespace boost;
using asio::ip::udp;
using system::error_code;

std::string make_daytime_string()
{
using namespace std; // For time_t, time and ctime;
time_t now = time(0);
return ctime(&now);
}

class udp_server; // forward declaration

struct udp_session : enable_shared_from_this<udp_session> {

udp_session(udp_server* server) : server_(server) {}

void handle_request(const error_code& error);

void handle_sent(const error_code& ec, std::size_t) {
// here response has been sent
if (ec) {
std::cout << "Error sending response to " << remote_endpoint_ << ": " << ec.message() << "\n";
}
}

udp::endpoint remote_endpoint_;
array<char, 100> recv_buffer_;
std::string message;
udp_server* server_;
};

class udp_server
{
typedef shared_ptr<udp_session> shared_session;
public:
udp_server(asio::io_service& io_service)
: socket_(io_service, udp::endpoint(udp::v4(), 1313)),
strand_(io_service)
{
receive_session();
}

private:
void receive_session()
{
// our session to hold the buffer + endpoint
auto session = make_shared<udp_session>(this);

socket_.async_receive_from(
asio::buffer(session->recv_buffer_),
session->remote_endpoint_,
strand_.wrap(
bind(&udp_server::handle_receive, this,
session, // keep-alive of buffer/endpoint
asio::placeholders::error,
asio::placeholders::bytes_transferred)));
}

void handle_receive(shared_session session, const error_code& ec, std::size_t /*bytes_transferred*/) {
// now, handle the current session on any available pool thread
socket_.get_io_service().post(bind(&udp_session::handle_request, session, ec));

// immediately accept new datagrams
receive_session();
}

void enqueue_response(shared_session const& session) {
socket_.async_send_to(asio::buffer(session->message), session->remote_endpoint_,
strand_.wrap(bind(&udp_session::handle_sent,
session, // keep-alive of buffer/endpoint
asio::placeholders::error,
asio::placeholders::bytes_transferred)));
}

udp::socket socket_;
asio::strand strand_;

friend struct udp_session;
};

void udp_session::handle_request(const error_code& error)
{
if (!error || error == asio::error::message_size)
{
message = make_daytime_string(); // let's assume this might be slow

// let the server coordinate actual IO
server_->enqueue_response(shared_from_this());
}
}

int main()
{
try {
asio::io_service io_service;
udp_server server(io_service);

thread_group group;
for (unsigned i = 0; i < thread::hardware_concurrency(); ++i)
group.create_thread(bind(&asio::io_service::run, ref(io_service)));

group.join_all();
}
catch (std::exception& e) {
std::cerr << e.what() << std::endl;
}
}

结束语

有趣的是,在大多数情况下,您会看到单线程版本的性能一样好,没有理由使设计复杂化。

或者,您可以使用单线程 io_service专用于 IO,如果这确实是 CPU 密集型部分,则使用老式的工作池来对请求进行后台处理。首先,这简化了设计,其次,这可能会 boost IO 任务的吞吐量,因为不再需要协调发布在链上的任务。

关于c++ - boost::asio async_receive_from UDP 端点在线程之间共享?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26703583/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com