- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
所以我不知道为什么,但我无法全神贯注于 boost Beast websocket 服务器以及您可以(或应该)如何与它交互。
我制作的基本程序看起来像这样,跨越 2 个类(WebSocketListener
和 WebSocketSession
) https://www.boost.org/doc/libs/develop/libs/beast/example/websocket/server/async/websocket_server_async.cpp
一切正常,我可以连接,并且可以回显消息。我们将永远只有 1 个事件 session ,我正在努力了解如何从其类之外与该 session 进行交互,例如在我的 int main() 中或可能负责发出读/写的另一个类中。我们将使用一个简单的命令设计模式,命令异步进入缓冲区,针对硬件进行处理,然后 async_write 返回结果。读取和排队很简单,将在 WebsocketSession
中完成,但我看到的所有写入都只是直接在 session 中读取/写入,而不是获取外部输入。
我见过使用 boost::asio::async_write(socket, buffer, ...)
之类的例子,但我很难理解我是如何在以下情况下获得对所述套接字的引用的 session 由监听器本身创建。
最佳答案
我不会依赖 session 外部的套接字,而是依赖您的程序逻辑来实现 session 。
这是因为 session (连接)将控制其自身的生命周期,自发到达并可能自发断开连接。您的硬件很可能没有。
因此,借用“依赖注入(inject)”的概念告诉您的听众您的应用程序逻辑,然后从 session 中调用它。 (监听器会将依赖项“注入(inject)”到每个新创建的 session 中)。
让我们从链接示例的简化/现代化版本开始。
现在,在我们准备响应的地方,您希望注入(inject)您自己的逻辑,所以让我们按照我们的想象来编写它:
void on_read(beast::error_code ec, std::size_t /*bytes_transferred*/) {
if (ec == websocket::error::closed) return;
if (ec.failed()) return fail(ec, "read");
// Process the message
response_ = logic_->Process(beast::buffers_to_string(buffer_));
ws_.async_write(
net::buffer(response_),
beast::bind_front_handler(&session::on_write, shared_from_this()));
}
这里我们声明成员并从构造函数中初始化它们:
std::string response_;
std::shared_ptr<AppDomain::Logic> logic_;
public:
explicit session(tcp::socket&& socket,
std::shared_ptr<AppDomain::Logic> logic)
: ws_(std::move(socket))
, logic_(logic) {}
现在,我们需要将逻辑注入(inject)监听器,以便我们可以传递它:
class listener : public std::enable_shared_from_this<listener> {
net::any_io_executor ex_;
tcp::acceptor acceptor_;
std::shared_ptr<AppDomain::Logic> logic_;
public:
listener(net::any_io_executor ex, tcp::endpoint endpoint,
std::shared_ptr<AppDomain::Logic> logic)
: ex_(ex)
, acceptor_(ex)
, logic_(logic) {
这样我们就可以传递它:
void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
fail(ec, "accept");
} else {
std::make_shared<session>(std::move(socket), logic_)->run();
}
// Accept another connection
do_accept();
}
现在在 main 中创建真正的逻辑:
auto logic = std::make_shared<AppDomain::Logic>("StackOverflow Demo/");
try {
// The io_context is required for all I/O
net::thread_pool ioc(threads);
std::make_shared<listener>(ioc.get_executor(),
tcp::endpoint{address, port}, logic)
->run();
ioc.join();
} catch (beast::system_error const& se) {
fail(se.code(), "listener");
}
为了好玩,让我们实现一些随机逻辑,将来可能会在硬件中实现:
namespace AppDomain {
struct Logic {
std::string banner;
Logic(std::string msg) : banner(std::move(msg)) {}
std::string Process(std::string request) {
std::cout << "Processing: " << std::quoted(request) << std::endl;
std::string result;
auto fold = [&result](auto op, double initial) {
return [=, &result](auto& ctx) {
auto& args = _attr(ctx);
auto v = accumulate(args.begin(), args.end(), initial, op);
result = "Fold:" + std::to_string(v);
};
};
auto invalid = [&result](auto& ctx) {
result = "Invalid Command: " + _attr(ctx);
};
using namespace boost::spirit::x3;
auto args = rule<void, std::vector<double>>{} = '(' >> double_ % ',' >> ')';
auto add = "adding" >> args[fold(std::plus<>{}, 0)];
auto mul = "multiplying" >> args[fold(std::multiplies<>{}, 1)];
auto err = lexeme[+char_][invalid];
phrase_parse(begin(request), end(request), add | mul | err, blank);
return banner + result;
}
};
} // namespace AppDomain
现在您可以看到它的实际效果: Full Listing
如果一个请求需要多个响应怎么办?
你需要一个队列。我通常将那些 发件箱
称为 searching for outbox_
, _outbox
etc会给出很多例子。
这些示例还将展示如何处理可以“从外部发起写入”的其他情况,以及如何安全地将这些写入队列。也许这里有一个非常吸引人的例子 How to batch send unsent messages in asio
万一将来链接失效:
#include <boost/algorithm/string/trim.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <filesystem>
#include <functional>
#include <iostream>
static std::string g_app_name = "app-logic-service";
#include <boost/core/demangle.hpp> // just for our demo logic
#include <boost/spirit/home/x3.hpp> // idem
#include <numeric> // idem
namespace AppDomain {
struct Logic {
std::string banner;
Logic(std::string msg) : banner(std::move(msg)) {}
std::string Process(std::string request) {
std::string result;
auto fold = [&result](auto op, double initial) {
return [=, &result](auto& ctx) {
auto& args = _attr(ctx);
auto v = accumulate(args.begin(), args.end(), initial, op);
result = "Fold:" + std::to_string(v);
};
};
auto invalid = [&result](auto& ctx) {
result = "Invalid Command: " + _attr(ctx);
};
using namespace boost::spirit::x3;
auto args = rule<void, std::vector<double>>{} = '(' >> double_ % ',' >> ')';
auto add = "adding" >> args[fold(std::plus<>{}, 0)];
auto mul = "multiplying" >> args[fold(std::multiplies<>{}, 1)];
auto err = lexeme[+char_][invalid];
phrase_parse(begin(request), end(request), add | mul | err, blank);
return banner + result;
}
};
} // namespace AppDomain
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
// Report a failure
void fail(beast::error_code ec, char const* what) {
std::cerr << what << ": " << ec.message() << "\n";
}
class session : public std::enable_shared_from_this<session> {
websocket::stream<beast::tcp_stream> ws_;
beast::flat_buffer buffer_;
std::string response_;
std::shared_ptr<AppDomain::Logic> logic_;
public:
explicit session(tcp::socket&& socket,
std::shared_ptr<AppDomain::Logic> logic)
: ws_(std::move(socket))
, logic_(logic) {}
void run() {
// Get on the correct executor
// strand for thread safety
dispatch(
ws_.get_executor(),
beast::bind_front_handler(&session::on_run, shared_from_this()));
}
private:
void on_run() {
// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(
beast::role_type::server));
// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res) {
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) + " " +
g_app_name);
}));
// Accept the websocket handshake
ws_.async_accept(
beast::bind_front_handler(&session::on_accept, shared_from_this()));
}
void on_accept(beast::error_code ec) {
if (ec)
return fail(ec, "accept");
do_read();
}
void do_read() {
ws_.async_read(
buffer_,
beast::bind_front_handler(&session::on_read, shared_from_this()));
}
void on_read(beast::error_code ec, std::size_t /*bytes_transferred*/) {
if (ec == websocket::error::closed) return;
if (ec.failed()) return fail(ec, "read");
// Process the message
auto request = boost::algorithm::trim_copy(
beast::buffers_to_string(buffer_.data()));
std::cout << "Processing: " << std::quoted(request) << " from "
<< beast::get_lowest_layer(ws_).socket().remote_endpoint()
<< std::endl;
response_ = logic_->Process(request);
ws_.async_write(
net::buffer(response_),
beast::bind_front_handler(&session::on_write, shared_from_this()));
}
void on_write(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec)
return fail(ec, "write");
// Clear the buffer
buffer_.consume(buffer_.size());
// Do another read
do_read();
}
};
// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener> {
net::any_io_executor ex_;
tcp::acceptor acceptor_;
std::shared_ptr<AppDomain::Logic> logic_;
public:
listener(net::any_io_executor ex, tcp::endpoint endpoint,
std::shared_ptr<AppDomain::Logic> logic)
: ex_(ex)
, acceptor_(ex)
, logic_(logic) {
acceptor_.open(endpoint.protocol());
acceptor_.set_option(tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen(tcp::acceptor::max_listen_connections);
}
// Start accepting incoming connections
void run() { do_accept(); }
private:
void do_accept() {
// The new connection gets its own strand
acceptor_.async_accept(make_strand(ex_),
beast::bind_front_handler(&listener::on_accept,
shared_from_this()));
}
void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
fail(ec, "accept");
} else {
std::make_shared<session>(std::move(socket), logic_)->run();
}
// Accept another connection
do_accept();
}
};
int main(int argc, char* argv[]) {
g_app_name = std::filesystem::path(argv[0]).filename();
if (argc != 4) {
std::cerr << "Usage: " << g_app_name << " <address> <port> <threads>\n"
<< "Example:\n"
<< " " << g_app_name << " 0.0.0.0 8080 1\n";
return 1;
}
auto const address = net::ip::make_address(argv[1]);
auto const port = static_cast<uint16_t>(std::atoi(argv[2]));
auto const threads = std::max<int>(1, std::atoi(argv[3]));
auto logic = std::make_shared<AppDomain::Logic>("StackOverflow Demo/");
try {
// The io_context is required for all I/O
net::thread_pool ioc(threads);
std::make_shared<listener>(ioc.get_executor(),
tcp::endpoint{address, port}, logic)
->run();
ioc.join();
} catch (beast::system_error const& se) {
fail(se.code(), "listener");
}
}
为了回应评论,我再次具体化了发件箱模式。注意代码中的一些注释。
#include <boost/algorithm/string/trim.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <deque>
#include <filesystem>
#include <functional>
#include <iostream>
#include <list>
static std::string g_app_name = "app-logic-service";
#include <boost/core/demangle.hpp> // just for our demo logic
#include <boost/spirit/home/x3.hpp> // idem
#include <numeric> // idem
namespace AppDomain {
struct Logic {
std::string banner;
Logic(std::string msg) : banner(std::move(msg)) {}
std::string Process(std::string request) {
std::string result;
auto fold = [&result](auto op, double initial) {
return [=, &result](auto& ctx) {
auto& args = _attr(ctx);
auto v = accumulate(args.begin(), args.end(), initial, op);
result = "Fold:" + std::to_string(v);
};
};
auto invalid = [&result](auto& ctx) {
result = "Invalid Command: " + _attr(ctx);
};
using namespace boost::spirit::x3;
auto args = rule<void, std::vector<double>>{} = '(' >> double_ % ',' >> ')';
auto add = "adding" >> args[fold(std::plus<>{}, 0)];
auto mul = "multiplying" >> args[fold(std::multiplies<>{}, 1)];
auto err = lexeme[+char_][invalid];
phrase_parse(begin(request), end(request), add | mul | err, blank);
return banner + result;
}
};
} // namespace AppDomain
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
// Report a failure
void fail(beast::error_code ec, char const* what) {
std::cerr << what << ": " << ec.message() << "\n";
}
class session : public std::enable_shared_from_this<session> {
websocket::stream<beast::tcp_stream> ws_;
beast::flat_buffer buffer_;
std::shared_ptr<AppDomain::Logic> logic_;
public:
explicit session(tcp::socket&& socket,
std::shared_ptr<AppDomain::Logic> logic)
: ws_(std::move(socket))
, logic_(logic) {}
void run() {
// Get on the correct executor
// strand for thread safety
dispatch(
ws_.get_executor(),
beast::bind_front_handler(&session::on_run, shared_from_this()));
}
void post_message(std::string msg) {
post(ws_.get_executor(),
[self = shared_from_this(), this, msg = std::move(msg)] {
do_post_message(std::move(msg));
});
}
private:
void on_run() {
// on the strand
// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(
beast::role_type::server));
// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res) {
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) + " " +
g_app_name);
}));
// Accept the websocket handshake
ws_.async_accept(
beast::bind_front_handler(&session::on_accept, shared_from_this()));
}
void on_accept(beast::error_code ec) {
// on the strand
if (ec)
return fail(ec, "accept");
do_read();
}
void do_read() {
// on the strand
buffer_.clear();
ws_.async_read(
buffer_,
beast::bind_front_handler(&session::on_read, shared_from_this()));
}
void on_read(beast::error_code ec, std::size_t /*bytes_transferred*/) {
// on the strand
if (ec == websocket::error::closed) return;
if (ec.failed()) return fail(ec, "read");
// Process the message
auto request = boost::algorithm::trim_copy(
beast::buffers_to_string(buffer_.data()));
std::cout << "Processing: " << std::quoted(request) << " from "
<< beast::get_lowest_layer(ws_).socket().remote_endpoint()
<< std::endl;
do_post_message(logic_->Process(request)); // already on the strand
do_read();
}
std::deque<std::string> _outbox;
void do_post_message(std::string msg) {
// on the strand
_outbox.push_back(std::move(msg));
if (_outbox.size() == 1)
do_write_loop();
}
void do_write_loop() {
// on the strand
if (_outbox.empty())
return;
ws_.async_write( //
net::buffer(_outbox.front()),
[self = shared_from_this(), this] //
(beast::error_code ec, size_t bytes_transferred) {
// on the strand
boost::ignore_unused(bytes_transferred);
if (ec)
return fail(ec, "write");
_outbox.pop_front();
do_write_loop();
});
}
};
// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener> {
net::any_io_executor ex_;
tcp::acceptor acceptor_;
std::shared_ptr<AppDomain::Logic> logic_;
public:
listener(net::any_io_executor ex, tcp::endpoint endpoint,
std::shared_ptr<AppDomain::Logic> logic)
: ex_(ex)
, acceptor_(make_strand(ex)) // NOTE to guard sessions_
, logic_(logic) {
acceptor_.open(endpoint.protocol());
acceptor_.set_option(tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen(tcp::acceptor::max_listen_connections);
}
// Start accepting incoming connections
void run() { do_accept(); }
void broadcast(std::string msg) {
post(acceptor_.get_executor(),
beast::bind_front_handler(&listener::do_broadcast,
shared_from_this(), std::move(msg)));
}
private:
using handle_t = std::weak_ptr<session>;
std::list<handle_t> sessions_;
void do_broadcast(std::string const& msg) {
for (auto handle : sessions_)
if (auto sess = handle.lock())
sess->post_message(msg);
}
void do_accept() {
// The new connection gets its own strand
acceptor_.async_accept(make_strand(ex_),
beast::bind_front_handler(&listener::on_accept,
shared_from_this()));
}
void on_accept(beast::error_code ec, tcp::socket socket) {
// on the strand
if (ec) {
fail(ec, "accept");
} else {
auto sess = std::make_shared<session>(std::move(socket), logic_);
sessions_.emplace_back(sess);
// optionally:
sessions_.remove_if(std::mem_fn(&handle_t::expired));
sess->run();
}
// Accept another connection
do_accept();
}
};
static void emulate_hardware_stuff(std::shared_ptr<listener> srv) {
using std::this_thread::sleep_for;
using namespace std::chrono_literals;
// Extremely simplistic. Instead I'd recommend `steady_timer` with
// `_async_wait` here, but since I'm just making a sketch...
unsigned i = 0;
while (true) {
sleep_for(1s);
srv->broadcast("Hardware thing #" + std::to_string(++i));
}
}
int main(int argc, char* argv[]) {
g_app_name = std::filesystem::path(argv[0]).filename();
if (argc != 4) {
std::cerr << "Usage: " << g_app_name << " <address> <port> <threads>\n"
<< "Example:\n"
<< " " << g_app_name << " 0.0.0.0 8080 1\n";
return 1;
}
auto const address = net::ip::make_address(argv[1]);
auto const port = static_cast<uint16_t>(std::atoi(argv[2]));
auto const threads = std::max<int>(1, std::atoi(argv[3]));
auto logic = std::make_shared<AppDomain::Logic>("StackOverflow Demo/");
try {
// The io_context is required for all I/O
net::thread_pool ioc(threads);
auto srv = std::make_shared<listener>( //
ioc.get_executor(), //
tcp::endpoint{address, port}, //
logic);
srv->run();
std::thread something_hardware(emulate_hardware_stuff, srv);
ioc.join();
something_hardware.join();
} catch (beast::system_error const& se) {
fail(se.code(), "listener");
}
}
现场演示:
关于c++ - Boost Beast Async Websocket Server 如何与 session 交互?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71638897/
本周我将在 Windows Server 2008 上设置一个专用的 SQL Server 2005 机器,并希望将其精简为尽可能简单,同时仍能发挥全部功能。 为此,“服务器核心”选项听起来很有吸引力
已关闭。此问题不符合Stack Overflow guidelines 。目前不接受答案。 这个问题似乎与 help center 中定义的范围内的编程无关。 . 已关闭 8 年前。 Improve
我获取了 2014 版本数据库的备份,并尝试在另一台服务器中将其恢复到具有相同名称和登录名的数据库中。此 SQL Server 版本是 2016。 恢复备份文件时,出现此错误: TITLE: Micr
我获取了 2014 版本数据库的备份,并尝试在另一台服务器中将其恢复到具有相同名称和登录名的数据库中。此 SQL Server 版本是 2016。 恢复备份文件时,出现此错误: TITLE: Micr
TFS 是否提供任何增强的方法来存储对 sql server 数据库所做的更改,而不是使用它来对在数据库上执行的 sql 语句的文本文件进行版本控制? 或者我正在寻找的功能是否仅在第 3 方工具(如
关闭。这个问题需要多问focused 。目前不接受答案。 想要改进此问题吗?更新问题,使其仅关注一个问题 editing this post . 已关闭 9 年前。 Improve this ques
我即将将我的 SQL Server 2012 实例升级到 SQL Server 2014。 我已经克隆了主机 Windows VM 并将其重命名为 foo-2012至 foo-2014 . 重新启动时
我想为 SQL Server 登录授予对数据库的访问权限。我知道 sp_grantdbaccess,但它已被弃用。我可以改用什么以及如何检查登录名是否还没有访问数据库的权限? 场景:UserA 创建数
客户别无选择,只能在接下来的几天内从 sql server 2000 迁移到 2008。测试显示 2005 年的重要功能出现了 Not Acceptable 性能下降,但 2008 年却没有。好消息是
我有一个测试数据库,我需要将其导出到我们客户的测试环境中。 这将是一次性的工作。 我正在使用 SQL Server 2005(我的测试数据库是 SQL Server 2005 Express) 执行此
我需要将一个 CSV 文件导入到 mongoDB 不幸的是我遇到了以下错误: error connecting to host: could not connect to server: se
我以为 R2 是一个补丁/服务包。我一直在寻找下载,但没有看到。因此,我假设 R2 是一个新版本,并且我需要 sqlserver 2008 r2 的安装介质来进行升级? 另外,我需要为新许可证付费吗?
我无法使用 SQL Server Management Studio 连接到 SQL Server。 我有一个连接字符串: 我尝试通过在服务器名中输入 myIP、在登录名中输入 MyID、在密码中
我们希望使用 SQL Server 加密来加密数据库中的几个列。我们还需要在生产和测试环境之间传输数据。看来最好的解决方案是在生产和测试服务器上使用相同的主 key 、证书和对称 key ,以便我可以
有没有可以分析 SQL Server 数据库潜在问题的工具? 例如: a foreign key column that is not indexed 没有 FILL FACTOR 的 uniquei
我正在尝试从我的 SQL 2012 BI 版本建立复制,但我收到一条奇怪的错误消息! "You cannot create a publication from server 'X' because
如果您使用 SQL Server 身份验证 (2005),登录详细信息是否以明文形式通过网络发送? 最佳答案 如您所愿,安全无忧... 您可以相当轻松地配置 SSL,如果您没有受信任的证书,如果您强制
我想将数据从一个表复制到不同服务器之间的另一个表。 如果是在同一服务器和不同的数据库中,我使用了以下 SELECT * INTO DB1..TBL1 FROM DB2..TBL1 (to copy w
我希望得到一些帮助,因为我在这个问题上已经被困了 2 天了! 场景:我可以从我的开发计算机(和其他同事)连接到 SERVER\INSTANCE,但无法从另一个 SQL Server 连接。我得到的错误
我正在尝试从我的 SQL 2012 BI 版本建立复制,但我收到一条奇怪的错误消息! "You cannot create a publication from server 'X' because
我是一名优秀的程序员,十分优秀!