gpt4 book ai didi

c++ - 如何设计正确释放 boost::asio 套接字或其包装器

转载 作者:可可西里 更新时间:2023-11-01 02:32:00 24 4
gpt4 key购买 nike

在多年未接触 boost::asio 之后,我正在尝试使用 boost::asio 制作我自己的简单异步 TCP 服务器。

我能找到的最新示例 list 是: http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/tutorial/tutdaytime3/src.html

这个示例列表的问题是(我觉得)它作弊而且作弊很大,通过将 tcp_connection 设为 shared_ptr,这样它就不用担心每个连接的生命周期管理。 (我认为)他们这样做是为了简洁,因为这是一个小教程,但该解决方案不是真实世界。

如果您想在计时器或类似的东西上向每个客户端发送消息怎么办?在任何真实世界的非平凡服务器中都需要一组客户端连接。

我担心每个连接的生命周期管理。我认为自然而然的做法是在 tcp_server 中保留一些 tcp_connection 对象或指向它们的指针。从 OnConnect 回调中添加到该集合并从该集合中删除 OnDisconnect。

请注意,OnDisconnect 很可能会从实际的 Disconnect 方法中调用,如果出现错误,该方法又会从 OnReceive 回调或 OnSend 回调中调用。

嗯,这就是问题所在。

假设我们有一个看起来像这样的调用堆栈:

tcp_connection::~tcp_connection
tcp_server::OnDisconnect
tcp_connection::OnDisconnect
tcp_connection::Disconnect
tcp_connection::OnReceive

这会导致错误,因为调用堆栈展开并且我们正在一个已调用其析构函数的对象中执行代码......我想,对吧?

我想每个从事服务器编程的人都以某种方式遇到过这种情况。处理它的策略是什么?

我希望解释足够好,可以遵循。如果不告诉我,我会创建自己的源列表,但它会非常大。


编辑:相关

) Memory management in asynchronous C++ code

IMO 不是一个可接受的答案,它依赖于在接听电话时使用 shared_ptr 进行欺骗,仅此而已,并且不是真实世界。如果服务器想每 5 分钟向所有客户端说“嗨”怎么办。某种收藏是必要的。如果您在多个线程上调用 io_service.run 怎么办?

我也在 boost 邮件列表上询问: http://boost.2283326.n4.nabble.com/How-to-design-proper-release-of-a-boost-asio-socket-or-wrapper-thereof-td4693442.html

最佳答案

正如我所说,我看不出使用智能指针是如何“作弊,而且作弊很大”。我也不认为您关于“他们这样做是为了简洁”的评估站得住脚。


这是我们代码库中经过轻微编辑的摘录¹,它举例说明了使用 shared_ptrs 如何不排除跟踪连接。

它仅显示事物的服务器端,带有

  • connection.hpp 中一个非常简单的connection 对象;这使用了 enable_shared_from_this

  • 只是固定大小的 connection_pool(我们也有动态调整池的大小,因此有锁定原语)。请注意我们如何对所有事件连接执行操作。

    所以你会很简单地写这样的东西来写给所有的客户,比如在计时器上:

    _pool.for_each_active([] (auto const& conn) {
    send_message(conn, hello_world_packet);
    });
  • 一个示例 listener,显示它如何与 connection_pool 联系(它有一个关闭所有连接的示例方法)

代码 list

  • 连接.hpp

    #pragma once

    #include "xxx/net/rpc/protocol.hpp"
    #include "log.hpp"
    #include "stats_filer.hpp"
    #include <memory>

    namespace xxx { namespace net { namespace rpc {

    struct connection : std::enable_shared_from_this<connection>, protected LogSource {
    typedef std::shared_ptr<connection> ptr;

    private:
    friend struct io;
    friend struct listener;

    boost::asio::io_service& _svc;
    protocol::socket _socket;
    protocol::endpoint _ep;
    protocol::endpoint _peer;
    public:

    connection(boost::asio::io_service& svc, protocol::endpoint ep)
    : LogSource("rpc::connection"),
    _svc(svc),
    _socket(svc),
    _ep(ep)
    {}

    void init() {
    _socket.set_option(protocol::no_delay(true));
    _peer = _socket.remote_endpoint();
    g_stats_filer_p->inc_value("asio." + _ep.address().to_string() + ".sockets_accepted");
    debug() << "New connection from " << _peer;
    }

    protocol::endpoint endpoint() const { return _ep; }
    protocol::endpoint peer() const { return _peer; }
    protocol::socket& socket() { return _socket; }

    // TODO encapsulation
    int handle() {
    return _socket.native_handle();
    }

    bool valid() const { return _socket.is_open(); }

    void cancel() {
    _svc.post([this] { _socket.cancel(); });
    }

    using shutdown_type = boost::asio::ip::tcp::socket::shutdown_type;
    void shutdown(shutdown_type what = shutdown_type::shutdown_both) {
    _svc.post([=] { _socket.shutdown(what); });
    }

    ~connection() {
    g_stats_filer_p->inc_value("asio." + _ep.address().to_string() + ".sockets_disconnected");
    }
    };

    } } }
  • 连接池.hpp

    #pragma once

    #include <mutex>
    #include "xxx/threads/null_mutex.hpp"
    #include "xxx/net/rpc/connection.hpp"
    #include "stats_filer.hpp"
    #include "log.hpp"

    namespace xxx { namespace net { namespace rpc {

    // not thread-safe by default, but pass e.g. std::mutex for `Mutex` if you need it
    template <typename Ptr = xxx::net::rpc::connection::ptr, typename Mutex = xxx::threads::null_mutex>
    struct basic_connection_pool : LogSource {
    using WeakPtr = std::weak_ptr<typename Ptr::element_type>;

    basic_connection_pool(std::string name = "connection_pool", size_t size)
    : LogSource(std::move(name)), _pool(size)
    { }

    bool try_insert(Ptr const& conn) {
    std::lock_guard<Mutex> lk(_mx);

    auto slot = std::find_if(_pool.begin(), _pool.end(), std::mem_fn(&WeakPtr::expired));

    if (slot == _pool.end()) {
    g_stats_filer_p->inc_value("asio." + conn->endpoint().address().to_string() + ".connections_dropped");
    error() << "dropping connection from " << conn->peer() << ": connection pool (" << _pool.size() << ") saturated";
    return false;
    }

    *slot = conn;
    return true;
    }

    template <typename F>
    void for_each_active(F action) {
    auto locked = [=] {
    using namespace std;
    lock_guard<Mutex> lk(_mx);
    vector<Ptr> locked(_pool.size());
    transform(_pool.begin(), _pool.end(), locked.begin(), mem_fn(&WeakPtr::lock));
    return locked;
    }();

    for (auto const& p : locked)
    if (p) action(p);
    }

    constexpr static bool synchronizing() {
    return not std::is_same<xxx::threads::null_mutex, Mutex>();
    }

    private:
    void dump_stats(LogSource::LogTx tx) const {
    // lock is assumed!
    size_t empty = 0, busy = 0, idle = 0;

    for (auto& p : _pool) {
    switch (p.use_count()) {
    case 0: empty++; break;
    case 1: idle++; break;
    default: busy++; break;
    }
    }

    tx << "usage empty:" << empty << " busy:" << busy << " idle:" << idle;
    }

    Mutex _mx;
    std::vector<WeakPtr> _pool;
    };

    // TODO FIXME use null_mutex once growing is no longer required AND if
    // en-pooling still only happens from the single IO thread (XXX-2535)
    using server_connection_pool = basic_connection_pool<xxx::net::rpc::connection::ptr, std::mutex>;

    } } }
  • 监听器.hpp

    #pragma once

    #include "xxx/threads/null_mutex.hpp"
    #include <mutex>
    #include "xxx/net/rpc/connection_pool.hpp"
    #include "xxx/net/rpc/io_operations.hpp"

    namespace xxx { namespace net { namespace rpc {

    struct listener : std::enable_shared_from_this<listener>, LogSource {
    typedef std::shared_ptr<listener> ptr;

    protocol::acceptor _acceptor;
    protocol::endpoint _ep;

    listener(boost::asio::io_service& svc, protocol::endpoint ep, server_connection_pool& pool)
    : LogSource("rpc::listener"), _acceptor(svc), _ep(ep), _pool(pool)
    {
    _acceptor.open(ep.protocol());

    _acceptor.set_option(protocol::acceptor::reuse_address(true));
    _acceptor.set_option(protocol::no_delay(true));
    ::fcntl(_acceptor.native(), F_SETFD, FD_CLOEXEC); // FIXME use non-racy socket factory?
    _acceptor.bind(ep);

    _acceptor.listen(32);
    }

    void accept_loop(std::function<void(connection::ptr conn)> on_accept) {

    auto self = shared_from_this();
    auto conn = std::make_shared<xxx::net::rpc::connection>(_acceptor.get_io_service(), _ep);

    _acceptor.async_accept(conn->_socket, [this,self,conn,on_accept](boost::system::error_code ec) {
    if (ec) {
    auto tx = ec == boost::asio::error::operation_aborted? debug() : warn();
    tx << "failed accept " << ec.message();
    } else {
    ::fcntl(conn->_socket.native(), F_SETFD, FD_CLOEXEC); // FIXME use non-racy socket factory?

    if (_pool.try_insert(conn)) {
    on_accept(conn);
    }

    self->accept_loop(on_accept);
    }
    });
    }

    void close() {
    _acceptor.cancel();
    _acceptor.close();

    _acceptor.get_io_service().post([=] {
    _pool.for_each_active([] (auto const& sp) {
    sp->shutdown(connection::shutdown_type::shutdown_both);
    sp->cancel();
    });
    });

    debug() << "shutdown";
    }

    ~listener() {
    }

    private:
    server_connection_pool& _pool;
    };

    } } }

¹ 下载为要点 https://gist.github.com/sehe/979af25b8ac4fd77e73cdf1da37ab4c2

关于c++ - 如何设计正确释放 boost::asio 套接字或其包装器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43239208/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com