gpt4 book ai didi

C++:Boost.Asio:在新线程中启动 SSL 服务器 session

转载 作者:太空狗 更新时间:2023-10-29 23:01:18 26 4
gpt4 key购买 nike

我写了一对基于this example for the server 的服务器/客户端程序我完成了所有的通信协议(protocol)。服务器应该从多个客户端的多个连接接收多个连接,所以我想将 session 彼此分开,我希望我可以用 std::thread 做到这一点。

这看起来很容易,但我完全不知道该怎么做。网上的所有例子似乎都展示了如何并行运行一个函数,但似乎并没有展示如何在新线程中创建一个对象。

我已经发表了一些评论来解释我对这种 session 机制的理解。

我想使用的代码如下:

class server
{
public:
server(boost::asio::io_service& io_service, unsigned short port)
: io_service_(io_service),
acceptor_(io_service,
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)),
context_(io_service, boost::asio::ssl::context::sslv23)
{
//some code...


//notice the next lines here create the session object, and then recurs that to receive more connections
session* new_session = new session(io_service_, context_);

//this is called to accept more connections if available, the callback function is called with start() to start the session
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}

void handle_accept(session* new_session, const boost::system::error_code& error)
{
if (!error)
{
//so the session starts here, and another object is created waiting for another session
new_session->start();
new_session = new session(io_service_, context_);
//now this function is, again, a call back function to make use of new_session, the new object that's waiting for a connection
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
else
{
delete new_session;
}
}

private:
boost::asio::io_service& io_service_;
boost::asio::ip::tcp::acceptor acceptor_;
boost::asio::ssl::context context_;
};

如何在新的 std::thread 中创建这些 session ?

如果您需要任何其他信息,请询问。谢谢。

最佳答案

我修改了 linked answer 的示例与您的示例代码混合。

它演示了相同的原理,但在硬件支持的线程数(即 thread::hardware_concurrency)上运行 io_service。

这里的问题是

  • (共享)对象生命周期
  • 线程安全

大多数 Asio 对象都不是线程安全的。因此,您需要同步访问它们。老式的互斥(std::mutex 等)在这种情况下效果不佳(因为您真的不想锁定每个完成处理程序并且您reeeeeeeally 不想在异步调用中持有锁 ¹。

对于这种情况,Boost Asio 有 strand 的概念:

我选择了最简单的解决方案来对“套接字”(ssl 流/连接/ session ,或者你会在逻辑上引用它)进行所有操作。

除此之外,我将所有对 acceptor_ 的访问都序列化在它自己的链上。

A hybrid solution might move all the connections on a io_service+pool and keep the listener (Server) on a separate io_service which could then be it's own implicit strand

注意:关于关机顺序:

  • 我显式地销毁了Server,这样我们就可以根据需要在其strand(!!) 上停止acceptor_
  • pool 线程只有在所有连接都关闭后才会完成。如果你想控制它,请再次查看链接的答案(它显示了如何保持弱指针跟踪连接)。或者,您可以为 session 中的所有异步操作设置超时,并检查 Server 的关闭信号。

演示代码

Live On Coliru

#include <boost/array.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>

namespace bs = boost::system;
namespace ba = boost::asio;
namespace bas = ba::ssl;

using ba::ip::tcp;
using SslContext = boost::shared_ptr<bas::context>;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef bas::stream<tcp::socket> stream_type;

const short PORT = 26767;

class Session : public boost::enable_shared_from_this<Session>
{
public:
typedef boost::shared_ptr<Session> Ptr;

Session(ba::io_service& svc, SslContext ctx) : strand_(svc), ctx_(ctx), stream(svc, *ctx) { }

virtual ~Session() {
std::cout << __PRETTY_FUNCTION__ << "\n";
}

stream_type::lowest_layer_type& socket() { return stream.lowest_layer(); }
void start() { AsyncReadString(); }
void Stop() { stream.shutdown(); }

protected:
ba::io_service::strand strand_;
SslContext ctx_;
stream_type stream;
ba::streambuf stream_buffer;
std::string message;

void AsyncReadString() {
std::cout << __PRETTY_FUNCTION__ << "\n";

ba::async_read_until(
stream,
stream_buffer,
'\0', // null-char is a delimiter
strand_.wrap(
boost::bind(&Session::ReadHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred)));
}
void AsyncWriteString(const std::string &s) {
std::cout << __PRETTY_FUNCTION__ << "\n";

message = s;

ba::async_write(
stream,
ba::buffer(message.c_str(), message.size()+1),
strand_.wrap(
boost::bind(&Session::WriteHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred)));
}

std::string ExtractString() {
std::cout << __PRETTY_FUNCTION__ << "\n";

std::istream is(&stream_buffer);
std::string s;
std::getline(is, s, '\0');
return s;
}

void ReadHandler(const bs::error_code &ec, std::size_t /*bytes_transferred*/)
{
std::cout << __PRETTY_FUNCTION__ << "\n";

if (!ec) {
std::cout << (ExtractString() + "\n");
std::cout.flush();
AsyncReadString(); // read again
}
else {
// do nothing, "this" will be deleted later
}
}

void WriteHandler(const bs::error_code &/*ec*/, std::size_t /*bytes_transferred*/) {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
};

class Server : public boost::enable_shared_from_this<Server>
{
public:
Server(ba::io_service& io_service, unsigned short port) :
strand_ (io_service),
acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
context_ (boost::make_shared<bas::context>(io_service, bas::context::sslv23))
{
//
}

void start_accept() {
auto new_session = boost::make_shared<Session>(strand_.get_io_service(), context_);

acceptor_.async_accept(new_session->socket(),
strand_.wrap(boost::bind(&Server::handle_accept, this, new_session, ba::placeholders::error)));
}

void stop_accept() {
auto keep = shared_from_this();
strand_.post([keep] { keep->acceptor_.close(); });
}

void handle_accept(Session::Ptr new_session, const bs::error_code& error)
{
if (!error) {
new_session->start();
start_accept(); // uses `acceptor_` safely because of the strand_
}
}

~Server() {
std::cout << __PRETTY_FUNCTION__ << "\n";
}

private:
ba::io_service::strand strand_;
tcp::acceptor acceptor_;
SslContext context_;
};

int main() {
ba::io_service svc;
boost::thread_group pool;

{
auto s = boost::make_shared<Server>(svc, PORT);
s->start_accept();

for (auto i = 0u; i<boost::thread::hardware_concurrency(); ++i)
pool.create_thread([&]{svc.run();});

std::cerr << "Shutdown in 10 seconds...\n";
boost::this_thread::sleep_for(boost::chrono::seconds(10)); // auto-shutdown in 10s

std::cerr << "Shutdown...\n";
} // destructor of Server // TODO thread-safe

pool.join_all();
}

哪个打印

$ (for a in {1..20000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\\0" | netcat localhost 6767)& done)

$ time ./test | sort | uniq -c | sort -n | tail
Shutdown in 10 seconds...
Shutdown...
1 Server::~Server()
1 void Session::AsyncReadString()virtual Session::~Session()
1 void Session::AsyncReadString()void Session::ReadHandler(const boost::system::error_code&, std::size_t)
1 void Session::ReadHandler(const boost::system::error_code&, std::size_t)void Session::AsyncReadString()
3
4523 void Session::AsyncReadString()
4524 void Session::ReadHandler(const boost::system::error_code&, std::size_t)
4526 virtual Session::~Session()

real 0m10.128s
user 0m0.430s
sys 0m0.262s

¹ 异步的全部意义在于避免阻塞可能需要“更长时间”的 IO 操作。锁定的想法是永远不要将锁持有“更长”的时间,否则它们会破坏可伸缩性

关于C++:Boost.Asio:在新线程中启动 SSL 服务器 session ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31503638/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com