gpt4 book ai didi

c++ - boost .Asio : Is it a good thing to use a `io_service` per connection/socket?

转载 作者:塔克拉玛干 更新时间:2023-11-03 01:52:27 26 4
gpt4 key购买 nike

我想创建一个实现一个线程一个连接模型的应用程序。但是每个连接都必须是可停止的。我试过this boost.asio example它实现了我想要的阻塞版本。但经过一番询问后,我发现没有可靠的方法来停止该示例的 session 。所以我试图实现我自己的。我不得不使用异步函数。由于我想让一个线程只管理一个连接,并且无法控制哪个异步作业用于哪个线程,所以我决定为每个连接/套接字/线程使用 io_service

那么这是一个好的方法吗,你知道更好的方法吗?

我的代码在这里,您可以检查和查看它:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/array.hpp>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>
#include <list>
#include <iostream>
#include <string>
#include <istream>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket socket_type;

const short PORT = 11235;
class Server;

// A connection has its own io_service and socket
class Connection {
protected:
ba::io_service service;
socket_type sock;
b::thread *thread;
ba::streambuf stream_buffer; // for reading etc
Server *server;
void AsyncReadString() {
ba::async_read_until(
sock,
stream_buffer,
'\0', // null-char is a delimiter
b::bind(&Connection::ReadHandler, this,
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
void AsyncWriteString(const std::string &s) {
std::string newstr = s + '\0'; // add a null char
ba::async_write(
sock,
ba::buffer(newstr.c_str(), newstr.size()),
b::bind(&Connection::WriteHandler, this,
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
virtual void Session() {
AsyncReadString();
service.run(); // run at last
}
std::string ExtractString() {
std::istream is(&stream_buffer);
std::string s;
std::getline(is, s, '\0');
return s;
}
virtual void ReadHandler(
const bs::error_code &ec,
std::size_t bytes_transferred) {
if (!ec) {
std::cout << (ExtractString() + "\n");
std::cout.flush();
AsyncReadString(); // read again
}
else {
// do nothing, "this" will be deleted later
}
}
virtual void WriteHandler(
const bs::error_code &ec,
std::size_t bytes_transferred) {
}
public:
Connection(Server *s) :
service(),
sock(service),
server(s),
thread(NULL)
{ }
socket_type& Socket() {
return sock;
}
void Start() {
if (thread) delete thread;
thread = new b::thread(
b::bind(&Connection::Session, this));
}
void Join() {
if (thread) thread->join();
}
void Stop() {
service.stop();
}
void KillMe();
virtual ~Connection() {
}
};

// a server also has its own io_service but it's only used for accepting
class Server {
public:
std::list<Connection*> Connections;
protected:
ba::io_service service;
acceptor_type acc;
b::thread *thread;
virtual void AcceptHandler(const bs::error_code &ec) {
if (!ec) {
Connections.back()->Start();
Connections.push_back(new Connection(this));
acc.async_accept(
Connections.back()->Socket(),
b::bind(&Server::AcceptHandler,
this,
ba::placeholders::error));
}
else {
// do nothing
// since the new session will be deleted
// automatically by the destructor
}
}
virtual void ThreadFunc() {
Connections.push_back(new Connection(this));
acc.async_accept(
Connections.back()->Socket(),
b::bind(&Server::AcceptHandler,
this,
ba::placeholders::error));
service.run();
}
public:
Server():
service(),
acc(service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
thread(NULL)
{ }
void Start() {
if (thread) delete thread;
thread = new b::thread(
b::bind(&Server::ThreadFunc, this));
}
void Stop() {
service.stop();
}
void Join() {
if (thread) thread->join();
}
void StopAllConnections() {
for (auto c : Connections) {
c->Stop();
}
}
void JoinAllConnections() {
for (auto c : Connections) {
c->Join();
}
}
void KillAllConnections() {
for (auto c : Connections) {
delete c;
}
Connections.clear();
}
void KillConnection(Connection *c) {
Connections.remove(c);
delete c;
}
virtual ~Server() {
delete thread;
// connection should be deleted by the user (?)
}
};

void Connection::KillMe() {
server->KillConnection(this);
}

int main() {
try {
Server s;
s.Start();
std::cin.get(); // wait for enter
s.Stop(); // stop listening first
s.StopAllConnections(); // interrupt ongoing connections
s.Join(); // wait for server, should return immediately
s.JoinAllConnections(); // wait for ongoing connections
s.KillAllConnections(); // destroy connection objects
// at the end of scope, Server will be destroyed
}
catch (std::exception &e) {
std::cerr << "Exception: " << e.what() << std::endl;
return 1;
}
return 0;
}

最佳答案

没有。每个连接都使用一个 io_service 对象绝对是一种味道。特别是因为您还在专用线程上运行每个连接。

此时您必须问问自己,异步给您带来了什么?您可以让所有代码同步并拥有完全相同数量的线程等。

显然,您希望将连接多路复用到数量少得多的服务上。在实践中有一些合理的模型,例如

  1. 具有单个服务线程的单个 io_service(这通常很好)。在服务上排队的任何任务都不会阻塞很长时间,否则延迟会受到影响

  2. 单个 io_service 具有多个执行处理程序的线程。池中的线程数应该足以服务最大值。支持的同时 CPU 密集型任务的数量(或者,延迟将开始上升)

  3. 每个线程一个 io_service,通常每个逻辑核心一个线程,并具有线程关联性,以便它“粘附”到该核心。这可能是缓存位置的理想选择

更新:演示

这是一个演示,使用上面的选项 1 显示惯用风格:

Live On Coliru

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket socket_type;

const short PORT = 11235;

// A connection has its own io_service and socket
class Connection : public b::enable_shared_from_this<Connection>
{
public:
typedef boost::shared_ptr<Connection> Ptr;
protected:
socket_type sock;
ba::streambuf stream_buffer; // for reading etc
std::string message;

void AsyncReadString() {
std::cout << __PRETTY_FUNCTION__ << "\n";

ba::async_read_until(
sock,
stream_buffer,
'\0', // null-char is a delimiter
b::bind(&Connection::ReadHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
void AsyncWriteString(const std::string &s) {
std::cout << __PRETTY_FUNCTION__ << "\n";

message = s;

ba::async_write(
sock,
ba::buffer(message.c_str(), message.size()+1),
b::bind(&Connection::WriteHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
std::string ExtractString() {
std::cout << __PRETTY_FUNCTION__ << "\n";

std::istream is(&stream_buffer);
std::string s;
std::getline(is, s, '\0');
return s;
}
void ReadHandler(
const bs::error_code &ec,
std::size_t bytes_transferred)
{
std::cout << __PRETTY_FUNCTION__ << "\n";

if (!ec) {
std::cout << (ExtractString() + "\n");
std::cout.flush();
AsyncReadString(); // read again
}
else {
// do nothing, "this" will be deleted later
}
}
void WriteHandler(const bs::error_code &ec, std::size_t bytes_transferred) {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
public:
Connection(ba::io_service& svc) : sock(svc) { }

virtual ~Connection() {
std::cout << __PRETTY_FUNCTION__ << "\n";
}

socket_type& Socket() { return sock; }
void Session() { AsyncReadString(); }
void Stop() { sock.cancel(); }
};

// a server also has its own io_service but it's only used for accepting
class Server {
public:
std::list<boost::weak_ptr<Connection> > m_connections;
protected:
ba::io_service _service;
boost::optional<ba::io_service::work> _work;
acceptor_type _acc;
b::thread thread;

void AcceptHandler(const bs::error_code &ec, Connection::Ptr accepted) {
if (!ec) {
accepted->Session();
DoAccept();
}
else {
// do nothing the new session will be deleted automatically by the
// destructor
}
}

void DoAccept() {
auto newaccept = boost::make_shared<Connection>(_service);

_acc.async_accept(
newaccept->Socket(),
b::bind(&Server::AcceptHandler,
this,
ba::placeholders::error,
newaccept
));
}

public:
Server():
_service(),
_work(ba::io_service::work(_service)),
_acc(_service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
thread(b::bind(&ba::io_service::run, &_service))
{ }

~Server() {
std::cout << __PRETTY_FUNCTION__ << "\n";
Stop();
_work.reset();
if (thread.joinable()) thread.join();
}

void Start() {
std::cout << __PRETTY_FUNCTION__ << "\n";
DoAccept();
}

void Stop() {
std::cout << __PRETTY_FUNCTION__ << "\n";
_acc.cancel();
}

void StopAllConnections() {
std::cout << __PRETTY_FUNCTION__ << "\n";
for (auto c : m_connections) {
if (auto p = c.lock())
p->Stop();
}
}
};

int main() {
try {
Server s;
s.Start();

std::cerr << "Shutdown in 2 seconds...\n";
b::this_thread::sleep_for(b::chrono::seconds(2));

std::cerr << "Stop accepting...\n";
s.Stop();

std::cerr << "Shutdown...\n";
s.StopAllConnections(); // interrupt ongoing connections
} // destructor of Server will join the service thread
catch (std::exception &e) {
std::cerr << __FUNCTION__ << ":" << __LINE__ << "\n";
std::cerr << "Exception: " << e.what() << std::endl;
return 1;
}

std::cerr << "Byebye\n";
}

我修改了 main() 以在没有用户干预的情况下运行 2 秒。这样我就可以演示了 Live On Coliru (当然,它受限于客户端进程的数量)。

如果您在很多(很多)客户端上运行它,使用例如

$ time (for a in {1..1000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\\0" | netcat localhost 11235)& done; wait)

你会发现两个第二个窗口都处理了它们:

$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
2 hello world 28214
2 hello world 4554
2 hello world 6216
2 hello world 7864
2 hello world 9966
2 void Server::Stop()
1000 std::string Connection::ExtractString()
1001 virtual Connection::~Connection()
2000 void Connection::AsyncReadString()
2000 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

如果你真的发狂并 boost 1000 到例如100000 那里,你会得到类似的东西:

sehe@desktop:/tmp$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
2 hello world 5483
2 hello world 579
2 hello world 5865
2 hello world 938
2 void Server::Stop()
3 hello world 9613
1741 std::string Connection::ExtractString()
1742 virtual Connection::~Connection()
3482 void Connection::AsyncReadString()
3482 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

服务器重复运行 2 秒。

关于c++ - boost .Asio : Is it a good thing to use a `io_service` per connection/socket?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31486010/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com