gpt4 book ai didi

c++ - 我应该如何在 boost::asio 的客户端应用程序中同时使用 async_read_until 和 async_write?

转载 作者:行者123 更新时间:2023-12-04 08:10:50 26 4
gpt4 key购买 nike

我的动机。
我正在尝试构建一个简单的信使。目前我已经编写了支持“邮件功能”的客户端和服务器应用程序,也就是说,它们缺乏您在每个即时通讯工具中的聊天交互。
这是我使用的模型。
服务器:每个连接的客户端的服务器都有一个专用的 Service提供实际服务的类。 Service 的一个实例类有一个 id。
客户 :在特定时刻同时开始从关联的 Service 读取消息和向其写入消息实例。
追踪器 : 通过保存用户登录和 Service 来记录用户的当前 session map 中的 id。还通过保存键值对(聊天参与者 ID 1,聊天参与者 ID 2)来记录打开的聊天。我交替使用用户的登录名和 ID,因为我有一个数据库。
下面是一个典型的使用场景。

  • 用户正在尝试登录。服务器将 Service 专用于该用户。 id 为 1 的实例。然后用户标识为 Bob。
  • 鲍勃打开与安的聊天。 Tracker Bob 使用的记录 Service 1 并且 Bob 打开了与 Ann 的聊天。
  • 用户正在尝试登录。服务器将 Service 专用于该用户。 id 为 2 的实例。然后用户标识为 Ann。
  • 安与鲍勃开始聊天。 Tracker Ann 使用的记录 Service 2 那个安打开了与鲍勃的聊天。
  • 安给鲍勃写了一条消息。 Service 2 收到消息询问Service如果 Bob 已打开与 Ann 的聊天,则 1 将消息发送到 Bob 的聊天。为此,我使用 Tracker .在我们的例子中,Bob 在聊天中,所以 Bob 的客户端应用程序应该读取来自 Service 的消息1. 否则 Service 2 只将新消息存储在数据库中。

  • 当用户打开与某人的聊天时,客户端应用程序同时开始向同事读取和写入消息 Service实例。
    问题
  • 鲍勃打开与安的聊天。安与鲍勃开始聊天。
  • 安发送消息。它们显示在 Bobs 聊天中。
  • 鲍勃发送一条消息。它不会显示在 Ann 的聊天中。此外,更多 Ann 的消息不再显示在 Bob 的聊天中。

  • 这是我的服务器代码的一部分。我添加了一些上下文,但您可能想查看 Service::onMessageReceived , Service::receive_message , Service::send_to_chat
    /// Struct to track active sessions of clients
    struct Tracker {
    static std::mutex current_sessions_guard; ///< mutex to lock the map of current sessions between threads
    static std::map<long, long> current_sessions;
    static std::map<long, int> client_to_service_id;
    };
    在客户服务模型中提供实际服务的类
    class Service {
    public:
    void send_to_chat(const std::string& new_message) {
    asio::async_write(*m_sock.get(), asio::buffer(new_message),
    [this]() {
    onAnotherPartyMessageSent();
    });
    }

    private:
    void onReceivedReady();
    void receive_message() {
    /// Server loop for reading messages from the client
    spdlog::info("[{}] in receive_message", service_id);

    asio::async_read_until(*m_sock.get(), *(m_message.get()), '\n',
    [this]() {
    onMessageReceived();
    });
    }
    void onMessageReceived();

    private:
    std::shared_ptr<asio::ip::tcp::socket> m_sock; ///< Pointer to an active socket that is used to communicate
    ///< with the client
    int service_id;
    long dialog_id = -1, client_id = -1, another_party_id = -1;
    std::shared_ptr<asio::streambuf> m_message;
    };
    方法的定义

    void Service::onMessageReceived() {
    /// Updates the database with the new message and asks Service instance of another participant
    /// to send the message if they opened this chat.

    std::istream istrm(m_message.get());
    std::string new_message;
    std::getline(istrm, new_message);
    m_message.reset(new asio::streambuf);

    std::unique_lock<std::mutex> tracker_lock(Tracker::current_sessions_guard);

    if (Tracker::current_sessions.find(another_party_id) != Tracker::current_sessions.end()) {
    if (Tracker::current_sessions[another_party_id] == client_id) {
    int another_party_service_id = Tracker::client_to_service_id[another_party_id];
    std::string formatted_msg = _form_message_str(login, new_message);

    spdlog::info("[{}] sends to chat '{}'", another_party_service_id, new_message);

    Server::launched_services[another_party_service_id]->send_to_chat(formatted_msg);
    }
    }
    tracker_lock.unlock();
    receive_message();
    }
    这是我的客户端代码的一部分。我添加了一些上下文,但您可能想查看 AsyncTCPClient::onSentReady , AsyncTCPClient::message_send_loop , AsyncTCPClient::message_wait_loop .
    /// Struct that stores a session with the given server
    struct Session {
    asio::ip::tcp::socket m_sock; //!< The socket for the client application to connect to the server
    asio::ip::tcp::endpoint m_ep; //!< The server's endpoint
    std::string current_chat;

    std::shared_ptr<asio::streambuf> m_chat_buf;
    std::shared_ptr<asio::streambuf> m_received_message;
    };

    /// Class that implements an asynchronous TCP client to interact with Service class
    class AsyncTCPClient: public asio::noncopyable {

    void onSentReady(std::shared_ptr<Session> session) {

    msg_wait_thread.reset(new std::thread([this, session] {
    asio::async_read_until(session->m_sock, *(session->m_received_message.get()), "\n",
    [this, session] () {
    message_wait_loop(session);
    });
    }));
    msg_wait_thread->detach();

    msg_thread.reset(new std::thread([this, session] {
    message_send_loop(session);
    }));

    msg_thread->detach();
    }


    void message_send_loop(std::shared_ptr<Session> session) {
    /// Starts loop in the current chat enabling the client to keep sending messages to another party
    logger->info("'{}' in message_send_loop", session->login);

    clear_console();
    m_console.write(session->current_chat);
    m_console.write("Write your message: ");

    std::string new_message;

    // We use a do/while loop to prevent empty messages either because of the client input
    // or \n's that were not read before

    do {
    new_message = m_console.read();
    } while (new_message.empty());


    std::unique_lock<std::mutex> lock_std_out(std_out_guard);
    session->current_chat.append(_form_message_str(session->login, new_message));
    lock_std_out.unlock();

    asio::async_write(session->m_sock, asio::buffer(new_message + "\n"),
    [this, session] () {
    message_send_loop(session);
    });
    }

    void message_wait_loop(std::shared_ptr<Session> session) {
    /// Starts loop in the current chat enabling the client to keep reading messages from another party

    logger->info("'{}' in message_wait_loop", session->login);

    std::istream istrm(session->m_received_message.get());
    std::string received_message;
    std::getline(istrm, received_message);

    session->m_received_message.reset(new asio::streambuf);

    std::unique_lock<std::mutex> lock_std_out(std_out_wait_guard);
    session->current_chat.append(received_message + "\n");
    lock_std_out.unlock();

    clear_console();
    m_console.write(session->current_chat);
    m_console.write("Write your message: ");

    asio::async_read_until(session->m_sock, *(session->m_received_message.get()), "\n",
    [this, session] (std::size_t) {
    message_wait_loop(session);
    });
    }

    private:
    asio::io_context m_ios;
    };
    所以,当我描述这个问题时,我没有 "'{}' in message_wait_loop"在第 3 点为两个客户端记录日志)。但是我在 2) 点为 Bob 的客户提供了这些日志。
    我也使用来自 answer here 的控制台.它通过互斥体去除回声并控制标准输入/输出资源。然而它并没有解决我的问题。
    任何帮助,将不胜感激。

    最佳答案

    代码太多,太少。这个问题太多了,而实际上提出改进建议的却太少。我看到过度使用 shared_ptr,线程,特别是在他们自己的线程上运行异步操作非常奇怪。更别说分离了:

    msg_wait_thread.reset(new std::thread([this, session] {
    asio::async_read_until(session->m_sock, *(session->m_received_message.get()), "\n",
    [this, session] () {
    message_wait_loop(session);
    });
    }));
    msg_wait_thread->detach();
    整个事情最好被完全等效(但更安全)取代
      asio::async_read_until(session->m_sock, *(session->m_received_message.get()), "\n", 
    [this, session] () {
    message_wait_loop(session);
    });
    我想读循环在一个线程上,这样输入就不会阻塞。但是,如果您将主线程视为“UI 线程”(它是),并接受控制台 IO 在那里阻塞,而不是将结果请求发布到单个 IO 线程以进行所有非阻塞操作,那么事情会变得容易得多。
    如果你分享一个 repo 或其他东西的链接,我很乐意多看它。
    更新
    在评论中,我查看了 github repo 中的代码并发布了 PR: https://github.com/cepessh/mymsg/pull/1

    This is a very raw proof-of-concept. I have included many changes thataren't actually related to the suggested concurrency fix, but theyhappened:

    • to allow me to run
    • during review (you will probably want to look at a number of those changes and keep them anyways)
    • fixes that were apparently missing from main branch (e.g. the default value for Message.read_by_recipient database column)

    You should be able to work out what changes were made and why by thecommit messages.

    Only the last two commits actually focus on the idea discussed inchat.

    关于c++ - 我应该如何在 boost::asio 的客户端应用程序中同时使用 async_read_until 和 async_write?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65969564/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com