gpt4 book ai didi

c++ - Boost Beast Async Websocket Server 如何与 session 交互?

转载 作者:行者123 更新时间:2023-12-02 01:40:35 26 4
gpt4 key购买 nike

所以我不知道为什么,但我无法全神贯注于 boost Beast websocket 服务器以及您可以(或应该)如何与它交互。

我制作的基本程序看起来像这样,跨越 2 个类(WebSocketListenerWebSocketSession) https://www.boost.org/doc/libs/develop/libs/beast/example/websocket/server/async/websocket_server_async.cpp

一切正常,我可以连接,并且可以回显消息。我们将永远只有 1 个事件 session ,我正在努力了解如何从其类之外与该 session 进行交互,例如在我的 int main() 中或可能负责发出读/写的另一个类中。我们将使用一个简单的命令设计模式,命令异步进入缓冲区,针对硬件进行处理,然后 async_write 返回结果。读取和排队很简单,将在 WebsocketSession 中完成,但我看到的所有写入都只是直接在 session 中读取/写入,而不是获取外部输入。

我见过使用 boost::asio::async_write(socket, buffer, ...) 之类的例子,但我很难理解我是如何在以下情况下获得对所述套接字的引用的 session 由监听器本身创建。

最佳答案

我不会依赖 session 外部的套接字,而是依赖您的程序逻辑来实现 session 。

这是因为 session (连接)将控制其自身的生命周期,自发到达并可能自发断开连接。您的硬件很可能没有。

因此,借用“依赖注入(inject)”的概念告诉您的听众您的应用程序逻辑,然后从 session 中调用它。 (监听器会将依赖项“注入(inject)”到每个新创建的 session 中)。

让我们从链接示例的简化/现代化版本开始。

现在,在我们准备响应的地方,您希望注入(inject)您自己的逻辑,所以让我们按照我们的想象来编写它:

void on_read(beast::error_code ec, std::size_t /*bytes_transferred*/) {
if (ec == websocket::error::closed) return;
if (ec.failed()) return fail(ec, "read");

// Process the message
response_ = logic_->Process(beast::buffers_to_string(buffer_));

ws_.async_write(
net::buffer(response_),
beast::bind_front_handler(&session::on_write, shared_from_this()));
}

这里我们声明成员并从构造函数中初始化它们:

    std::string                          response_;
std::shared_ptr<AppDomain::Logic> logic_;

public:
explicit session(tcp::socket&& socket,
std::shared_ptr<AppDomain::Logic> logic)
: ws_(std::move(socket))
, logic_(logic) {}

现在,我们需要将逻辑注入(inject)监听器,以便我们可以传递它:

class listener : public std::enable_shared_from_this<listener> {
net::any_io_executor ex_;
tcp::acceptor acceptor_;
std::shared_ptr<AppDomain::Logic> logic_;

public:
listener(net::any_io_executor ex, tcp::endpoint endpoint,
std::shared_ptr<AppDomain::Logic> logic)
: ex_(ex)
, acceptor_(ex)
, logic_(logic) {

这样我们就可以传递它:

void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
fail(ec, "accept");
} else {
std::make_shared<session>(std::move(socket), logic_)->run();
}

// Accept another connection
do_accept();
}

现在在 main 中创建真正的逻辑:

auto logic = std::make_shared<AppDomain::Logic>("StackOverflow Demo/");

try {
// The io_context is required for all I/O
net::thread_pool ioc(threads);

std::make_shared<listener>(ioc.get_executor(),
tcp::endpoint{address, port}, logic)
->run();

ioc.join();
} catch (beast::system_error const& se) {
fail(se.code(), "listener");
}

演示逻辑

为了好玩,让我们实现一些随机逻辑,将来可能会在硬件中实现:

namespace AppDomain {
struct Logic {
std::string banner;
Logic(std::string msg) : banner(std::move(msg)) {}

std::string Process(std::string request) {
std::cout << "Processing: " << std::quoted(request) << std::endl;

std::string result;

auto fold = [&result](auto op, double initial) {
return [=, &result](auto& ctx) {
auto& args = _attr(ctx);
auto v = accumulate(args.begin(), args.end(), initial, op);
result = "Fold:" + std::to_string(v);
};
};

auto invalid = [&result](auto& ctx) {
result = "Invalid Command: " + _attr(ctx);
};

using namespace boost::spirit::x3;
auto args = rule<void, std::vector<double>>{} = '(' >> double_ % ',' >> ')';
auto add = "adding" >> args[fold(std::plus<>{}, 0)];
auto mul = "multiplying" >> args[fold(std::multiplies<>{}, 1)];
auto err = lexeme[+char_][invalid];

phrase_parse(begin(request), end(request), add | mul | err, blank);

return banner + result;
}
};
} // namespace AppDomain

现在您可以看到它的实际效果: Full Listing

enter image description here

从这里去哪里

如果一个请求需要多个响应怎么办?

你需要一个队列。我通常将那些 发件箱 称为 searching for outbox_, _outbox etc会给出很多例子。

这些示例还将展示如何处理可以“从外部发起写入”的其他情况,以及如何安全地将这些写入队列。也许这里有一个非常吸引人的例子 How to batch send unsent messages in asio

列表供引用

万一将来链接失效:

#include <boost/algorithm/string/trim.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <filesystem>
#include <functional>
#include <iostream>

static std::string g_app_name = "app-logic-service";

#include <boost/core/demangle.hpp> // just for our demo logic
#include <boost/spirit/home/x3.hpp> // idem
#include <numeric> // idem

namespace AppDomain {
struct Logic {
std::string banner;
Logic(std::string msg) : banner(std::move(msg)) {}

std::string Process(std::string request) {
std::string result;

auto fold = [&result](auto op, double initial) {
return [=, &result](auto& ctx) {
auto& args = _attr(ctx);
auto v = accumulate(args.begin(), args.end(), initial, op);
result = "Fold:" + std::to_string(v);
};
};

auto invalid = [&result](auto& ctx) {
result = "Invalid Command: " + _attr(ctx);
};

using namespace boost::spirit::x3;
auto args = rule<void, std::vector<double>>{} = '(' >> double_ % ',' >> ')';
auto add = "adding" >> args[fold(std::plus<>{}, 0)];
auto mul = "multiplying" >> args[fold(std::multiplies<>{}, 1)];
auto err = lexeme[+char_][invalid];

phrase_parse(begin(request), end(request), add | mul | err, blank);

return banner + result;
}
};
} // namespace AppDomain

namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>

// Report a failure
void fail(beast::error_code ec, char const* what) {
std::cerr << what << ": " << ec.message() << "\n";
}

class session : public std::enable_shared_from_this<session> {
websocket::stream<beast::tcp_stream> ws_;
beast::flat_buffer buffer_;
std::string response_;
std::shared_ptr<AppDomain::Logic> logic_;

public:
explicit session(tcp::socket&& socket,
std::shared_ptr<AppDomain::Logic> logic)
: ws_(std::move(socket))
, logic_(logic) {}

void run() {
// Get on the correct executor
// strand for thread safety
dispatch(
ws_.get_executor(),
beast::bind_front_handler(&session::on_run, shared_from_this()));
}

private:
void on_run() {
// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(
beast::role_type::server));

// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res) {
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) + " " +
g_app_name);
}));

// Accept the websocket handshake
ws_.async_accept(
beast::bind_front_handler(&session::on_accept, shared_from_this()));
}

void on_accept(beast::error_code ec) {
if (ec)
return fail(ec, "accept");

do_read();
}

void do_read() {
ws_.async_read(
buffer_,
beast::bind_front_handler(&session::on_read, shared_from_this()));
}

void on_read(beast::error_code ec, std::size_t /*bytes_transferred*/) {
if (ec == websocket::error::closed) return;
if (ec.failed()) return fail(ec, "read");

// Process the message
auto request = boost::algorithm::trim_copy(
beast::buffers_to_string(buffer_.data()));

std::cout << "Processing: " << std::quoted(request) << " from "
<< beast::get_lowest_layer(ws_).socket().remote_endpoint()
<< std::endl;

response_ = logic_->Process(request);

ws_.async_write(
net::buffer(response_),
beast::bind_front_handler(&session::on_write, shared_from_this()));
}

void on_write(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);

if (ec)
return fail(ec, "write");

// Clear the buffer
buffer_.consume(buffer_.size());

// Do another read
do_read();
}
};

// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener> {
net::any_io_executor ex_;
tcp::acceptor acceptor_;
std::shared_ptr<AppDomain::Logic> logic_;

public:
listener(net::any_io_executor ex, tcp::endpoint endpoint,
std::shared_ptr<AppDomain::Logic> logic)
: ex_(ex)
, acceptor_(ex)
, logic_(logic) {
acceptor_.open(endpoint.protocol());
acceptor_.set_option(tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen(tcp::acceptor::max_listen_connections);
}

// Start accepting incoming connections
void run() { do_accept(); }

private:
void do_accept() {
// The new connection gets its own strand
acceptor_.async_accept(make_strand(ex_),
beast::bind_front_handler(&listener::on_accept,
shared_from_this()));
}

void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
fail(ec, "accept");
} else {
std::make_shared<session>(std::move(socket), logic_)->run();
}

// Accept another connection
do_accept();
}
};

int main(int argc, char* argv[]) {
g_app_name = std::filesystem::path(argv[0]).filename();

if (argc != 4) {
std::cerr << "Usage: " << g_app_name << " <address> <port> <threads>\n"
<< "Example:\n"
<< " " << g_app_name << " 0.0.0.0 8080 1\n";
return 1;
}
auto const address = net::ip::make_address(argv[1]);
auto const port = static_cast<uint16_t>(std::atoi(argv[2]));
auto const threads = std::max<int>(1, std::atoi(argv[3]));

auto logic = std::make_shared<AppDomain::Logic>("StackOverflow Demo/");

try {
// The io_context is required for all I/O
net::thread_pool ioc(threads);

std::make_shared<listener>(ioc.get_executor(),
tcp::endpoint{address, port}, logic)
->run();

ioc.join();
} catch (beast::system_error const& se) {
fail(se.code(), "listener");
}
}

更新

为了回应评论,我再次具体化了发件箱模式。注意代码中的一些注释。

Compiler Explorer

#include <boost/algorithm/string/trim.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <deque>
#include <filesystem>
#include <functional>
#include <iostream>
#include <list>

static std::string g_app_name = "app-logic-service";

#include <boost/core/demangle.hpp> // just for our demo logic
#include <boost/spirit/home/x3.hpp> // idem
#include <numeric> // idem

namespace AppDomain {
struct Logic {
std::string banner;
Logic(std::string msg) : banner(std::move(msg)) {}

std::string Process(std::string request) {
std::string result;

auto fold = [&result](auto op, double initial) {
return [=, &result](auto& ctx) {
auto& args = _attr(ctx);
auto v = accumulate(args.begin(), args.end(), initial, op);
result = "Fold:" + std::to_string(v);
};
};

auto invalid = [&result](auto& ctx) {
result = "Invalid Command: " + _attr(ctx);
};

using namespace boost::spirit::x3;
auto args = rule<void, std::vector<double>>{} = '(' >> double_ % ',' >> ')';
auto add = "adding" >> args[fold(std::plus<>{}, 0)];
auto mul = "multiplying" >> args[fold(std::multiplies<>{}, 1)];
auto err = lexeme[+char_][invalid];

phrase_parse(begin(request), end(request), add | mul | err, blank);

return banner + result;
}
};
} // namespace AppDomain

namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>

// Report a failure
void fail(beast::error_code ec, char const* what) {
std::cerr << what << ": " << ec.message() << "\n";
}

class session : public std::enable_shared_from_this<session> {
websocket::stream<beast::tcp_stream> ws_;
beast::flat_buffer buffer_;
std::shared_ptr<AppDomain::Logic> logic_;

public:
explicit session(tcp::socket&& socket,
std::shared_ptr<AppDomain::Logic> logic)
: ws_(std::move(socket))
, logic_(logic) {}

void run() {
// Get on the correct executor
// strand for thread safety
dispatch(
ws_.get_executor(),
beast::bind_front_handler(&session::on_run, shared_from_this()));
}

void post_message(std::string msg) {
post(ws_.get_executor(),
[self = shared_from_this(), this, msg = std::move(msg)] {
do_post_message(std::move(msg));
});
}

private:
void on_run() {
// on the strand
// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(
beast::role_type::server));

// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res) {
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) + " " +
g_app_name);
}));

// Accept the websocket handshake
ws_.async_accept(
beast::bind_front_handler(&session::on_accept, shared_from_this()));
}

void on_accept(beast::error_code ec) {
// on the strand
if (ec)
return fail(ec, "accept");

do_read();
}

void do_read() {
// on the strand
buffer_.clear();

ws_.async_read(
buffer_,
beast::bind_front_handler(&session::on_read, shared_from_this()));
}

void on_read(beast::error_code ec, std::size_t /*bytes_transferred*/) {
// on the strand
if (ec == websocket::error::closed) return;
if (ec.failed()) return fail(ec, "read");

// Process the message
auto request = boost::algorithm::trim_copy(
beast::buffers_to_string(buffer_.data()));

std::cout << "Processing: " << std::quoted(request) << " from "
<< beast::get_lowest_layer(ws_).socket().remote_endpoint()
<< std::endl;

do_post_message(logic_->Process(request)); // already on the strand

do_read();
}

std::deque<std::string> _outbox;

void do_post_message(std::string msg) {
// on the strand
_outbox.push_back(std::move(msg));

if (_outbox.size() == 1)
do_write_loop();
}

void do_write_loop() {
// on the strand
if (_outbox.empty())
return;

ws_.async_write( //
net::buffer(_outbox.front()),
[self = shared_from_this(), this] //
(beast::error_code ec, size_t bytes_transferred) {
// on the strand
boost::ignore_unused(bytes_transferred);

if (ec)
return fail(ec, "write");

_outbox.pop_front();
do_write_loop();
});
}
};

// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener> {
net::any_io_executor ex_;
tcp::acceptor acceptor_;
std::shared_ptr<AppDomain::Logic> logic_;

public:
listener(net::any_io_executor ex, tcp::endpoint endpoint,
std::shared_ptr<AppDomain::Logic> logic)
: ex_(ex)
, acceptor_(make_strand(ex)) // NOTE to guard sessions_
, logic_(logic) {
acceptor_.open(endpoint.protocol());
acceptor_.set_option(tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen(tcp::acceptor::max_listen_connections);
}

// Start accepting incoming connections
void run() { do_accept(); }

void broadcast(std::string msg) {
post(acceptor_.get_executor(),
beast::bind_front_handler(&listener::do_broadcast,
shared_from_this(), std::move(msg)));
}

private:
using handle_t = std::weak_ptr<session>;
std::list<handle_t> sessions_;

void do_broadcast(std::string const& msg) {
for (auto handle : sessions_)
if (auto sess = handle.lock())
sess->post_message(msg);
}

void do_accept() {
// The new connection gets its own strand
acceptor_.async_accept(make_strand(ex_),
beast::bind_front_handler(&listener::on_accept,
shared_from_this()));
}

void on_accept(beast::error_code ec, tcp::socket socket) {
// on the strand
if (ec) {
fail(ec, "accept");
} else {
auto sess = std::make_shared<session>(std::move(socket), logic_);
sessions_.emplace_back(sess);
// optionally:
sessions_.remove_if(std::mem_fn(&handle_t::expired));
sess->run();
}

// Accept another connection
do_accept();
}
};

static void emulate_hardware_stuff(std::shared_ptr<listener> srv) {
using std::this_thread::sleep_for;
using namespace std::chrono_literals;
// Extremely simplistic. Instead I'd recommend `steady_timer` with
// `_async_wait` here, but since I'm just making a sketch...
unsigned i = 0;

while (true) {
sleep_for(1s);
srv->broadcast("Hardware thing #" + std::to_string(++i));
}
}

int main(int argc, char* argv[]) {
g_app_name = std::filesystem::path(argv[0]).filename();

if (argc != 4) {
std::cerr << "Usage: " << g_app_name << " <address> <port> <threads>\n"
<< "Example:\n"
<< " " << g_app_name << " 0.0.0.0 8080 1\n";
return 1;
}
auto const address = net::ip::make_address(argv[1]);
auto const port = static_cast<uint16_t>(std::atoi(argv[2]));
auto const threads = std::max<int>(1, std::atoi(argv[3]));

auto logic = std::make_shared<AppDomain::Logic>("StackOverflow Demo/");

try {
// The io_context is required for all I/O
net::thread_pool ioc(threads);

auto srv = std::make_shared<listener>( //
ioc.get_executor(), //
tcp::endpoint{address, port}, //
logic);

srv->run();

std::thread something_hardware(emulate_hardware_stuff, srv);

ioc.join();
something_hardware.join();
} catch (beast::system_error const& se) {
fail(se.code(), "listener");
}
}

现场演示:

enter image description here

关于c++ - Boost Beast Async Websocket Server 如何与 session 交互?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71638897/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com