gpt4 book ai didi

c++ - 使用队列 boost asio 异步读取和写入套接字

转载 作者:行者123 更新时间:2023-11-28 01:48:40 27 4
gpt4 key购买 nike

我正在开发一个简单的 TCP 服务器,该服务器读取消息并将其写入线程安全队列。然后应用程序可以使用这些队列安全地从不同的线程读取和写入套接字。

我面临的问题是我无法async_read。我的队列有 pop 操作,它返回下一个要处理的元素,但如果没有可用元素,它就会阻塞。因此,一旦我调用 pop,async_read 回调当然不会再被触发。有什么方法可以将这样的队列集成到 boost asio 中,还是我必须完全重写?

下面是我用来说明我遇到的问题的一个简短示例。建立 TCP 连接后,我将创建一个新线程,该线程将在该 tcp_connection 下运行应用程序。之后我想启动 async_readasync_write。几个小时以来,我一直在为此苦思冥想,我真的不知道如何解决这个问题。

class tcp_connection : public std::enable_shared_from_this<tcp_connection>
{
public:
static std::shared_ptr<tcp_connection> create(boost::asio::io_service &io_service) {
return std::shared_ptr<tcp_connection>(new tcp_connection(io_service));
}

boost::asio::ip::tcp::socket& get_socket()
{
return this->socket;
}

void app_start()
{
while(1)
{
// Pop is a blocking call.
auto inbound_message = this->inbound_messages.pop();
std::cout << "Got message in app thread: " << inbound_message << ". Sending it back to client." << std::endl;
this->outbound_messages.push(inbound_message);
}
}

void start() {
this->app_thread = std::thread(&tcp_connection::app_start, shared_from_this());

boost::asio::async_read_until(this->socket, this->input_stream, "\r\n",
strand.wrap(boost::bind(&tcp_connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));

// Start async writing here. The message to send are in the outbound_message queue. But a Pop operation blocks
// empty() is also available to check whether the queue is empty.
// So how can I async write without blocking the read.
// block...
auto message = this->outbound_messages.pop();
boost::asio::async_write(this->socket, boost::asio::buffer(message),
strand.wrap(boost::bind(&tcp_connection::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
}

void handle_read(const boost::system::error_code& e, size_t bytes_read)
{
std::cout << "handle_read called" << std::endl;
if (e)
{
std::cout << "Error handle_read: " << e.message() << std::endl;
return;
}
if (bytes_read != 0)
{
std::istream istream(&this->input_stream);
std::string message;
message.resize(bytes_read);
istream.read(&message[0], bytes_read);
std::cout << "Got message: " << message << std::endl;
this->inbound_messages.push(message);
}
boost::asio::async_read_until(this->socket, this->input_stream, "\r\n",
strand.wrap(boost::bind(&tcp_connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
}

void handle_write(const boost::system::error_code& e, size_t /*bytes_transferred*/)
{
if (e)
{
std::cout << "Error handle_write: " << e.message() << std::endl;
return;
}

// block...
auto message = this->outbound_messages.pop();
boost::asio::async_write(this->socket, boost::asio::buffer(message),
strand.wrap(boost::bind(&tcp_connection::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
}



private:
tcp_connection(boost::asio::io_service& io_service) : socket(io_service), strand(io_service)
{
}

boost::asio::ip::tcp::socket socket;
boost::asio::strand strand;
boost::asio::streambuf input_stream;

std::thread app_thread;

concurrent_queue<std::string> inbound_messages;
concurrent_queue<std::string> outbound_messages;
};

class tcp_server
{
public:
tcp_server(boost::asio::io_service& io_service)
: acceptor(io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 9001))
{
start_accept();
}

private:
void start_accept()
{
std::shared_ptr<tcp_connection> new_connection =
tcp_connection::create(acceptor.get_io_service());

acceptor.async_accept(new_connection->get_socket(),
boost::bind(&tlcp_tcp_server::handle_accept, this, new_connection, boost::asio::placeholders::error));
}

void handle_accept(std::shared_ptr<tcp_connection> new_connection,
const boost::system::error_code& error)
{
if (!error)
{
new_connection->start();
}

start_accept();
}

boost::asio::ip::tcp::acceptor acceptor;
};

最佳答案

在我看来,您似乎想要一个采用错误消息占位符和回调处理程序的 async_pop 方法。当您收到消息时,检查是否有未完成的处理程序,如果有,则弹出消息,注销处理程序并调用它。类似地,在注册 async_pop 时,如果已经有消息在等待,弹出消息并发布对处理程序的调用而不注册它。

您可能希望从 pop_operation 或类似类型的多态基类派生 async_pop 类。

关于c++ - 使用队列 boost asio 异步读取和写入套接字,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43830917/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com