- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
所以我不知道为什么,但我无法全神贯注于 boost Beast websocket 服务器以及您可以(或应该)如何与它交互。
我制作的基本程序看起来像这样,跨越 2 个类(WebSocketListener
和 WebSocketSession
) https://www.boost.org/doc/libs/develop/libs/beast/example/websocket/server/async/websocket_server_async.cpp
一切正常,我可以连接,并且可以回显消息。我们将永远只有 1 个事件 session ,我正在努力了解如何从其类之外与该 session 进行交互,例如在我的 int main() 中或可能负责发出读/写的另一个类中。我们将使用一个简单的命令设计模式,命令异步进入缓冲区,针对硬件进行处理,然后 async_write 返回结果。读取和排队很简单,将在 WebsocketSession
中完成,但我看到的所有写入都只是直接在 session 中读取/写入,而不是获取外部输入。
我见过使用 boost::asio::async_write(socket, buffer, ...)
之类的例子,但我很难理解我是如何在以下情况下获得对所述套接字的引用的 session 由监听器本身创建。
最佳答案
我不会依赖 session 外部的套接字,而是依赖您的程序逻辑来实现 session 。
这是因为 session (连接)将控制其自身的生命周期,自发到达并可能自发断开连接。您的硬件很可能没有。
因此,借用“依赖注入(inject)”的概念告诉您的听众您的应用程序逻辑,然后从 session 中调用它。 (监听器会将依赖项“注入(inject)”到每个新创建的 session 中)。
让我们从链接示例的简化/现代化版本开始。
现在,在我们准备响应的地方,您希望注入(inject)您自己的逻辑,所以让我们按照我们的想象来编写它:
void on_read(beast::error_code ec, std::size_t /*bytes_transferred*/) {
if (ec == websocket::error::closed) return;
if (ec.failed()) return fail(ec, "read");
// Process the message
response_ = logic_->Process(beast::buffers_to_string(buffer_));
ws_.async_write(
net::buffer(response_),
beast::bind_front_handler(&session::on_write, shared_from_this()));
}
这里我们声明成员并从构造函数中初始化它们:
std::string response_;
std::shared_ptr<AppDomain::Logic> logic_;
public:
explicit session(tcp::socket&& socket,
std::shared_ptr<AppDomain::Logic> logic)
: ws_(std::move(socket))
, logic_(logic) {}
现在,我们需要将逻辑注入(inject)监听器,以便我们可以传递它:
class listener : public std::enable_shared_from_this<listener> {
net::any_io_executor ex_;
tcp::acceptor acceptor_;
std::shared_ptr<AppDomain::Logic> logic_;
public:
listener(net::any_io_executor ex, tcp::endpoint endpoint,
std::shared_ptr<AppDomain::Logic> logic)
: ex_(ex)
, acceptor_(ex)
, logic_(logic) {
这样我们就可以传递它:
void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
fail(ec, "accept");
} else {
std::make_shared<session>(std::move(socket), logic_)->run();
}
// Accept another connection
do_accept();
}
现在在 main 中创建真正的逻辑:
auto logic = std::make_shared<AppDomain::Logic>("StackOverflow Demo/");
try {
// The io_context is required for all I/O
net::thread_pool ioc(threads);
std::make_shared<listener>(ioc.get_executor(),
tcp::endpoint{address, port}, logic)
->run();
ioc.join();
} catch (beast::system_error const& se) {
fail(se.code(), "listener");
}
为了好玩,让我们实现一些随机逻辑,将来可能会在硬件中实现:
namespace AppDomain {
struct Logic {
std::string banner;
Logic(std::string msg) : banner(std::move(msg)) {}
std::string Process(std::string request) {
std::cout << "Processing: " << std::quoted(request) << std::endl;
std::string result;
auto fold = [&result](auto op, double initial) {
return [=, &result](auto& ctx) {
auto& args = _attr(ctx);
auto v = accumulate(args.begin(), args.end(), initial, op);
result = "Fold:" + std::to_string(v);
};
};
auto invalid = [&result](auto& ctx) {
result = "Invalid Command: " + _attr(ctx);
};
using namespace boost::spirit::x3;
auto args = rule<void, std::vector<double>>{} = '(' >> double_ % ',' >> ')';
auto add = "adding" >> args[fold(std::plus<>{}, 0)];
auto mul = "multiplying" >> args[fold(std::multiplies<>{}, 1)];
auto err = lexeme[+char_][invalid];
phrase_parse(begin(request), end(request), add | mul | err, blank);
return banner + result;
}
};
} // namespace AppDomain
现在您可以看到它的实际效果: Full Listing
如果一个请求需要多个响应怎么办?
你需要一个队列。我通常将那些 发件箱
称为 searching for outbox_
, _outbox
etc会给出很多例子。
这些示例还将展示如何处理可以“从外部发起写入”的其他情况,以及如何安全地将这些写入队列。也许这里有一个非常吸引人的例子 How to batch send unsent messages in asio
万一将来链接失效:
#include <boost/algorithm/string/trim.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <filesystem>
#include <functional>
#include <iostream>
static std::string g_app_name = "app-logic-service";
#include <boost/core/demangle.hpp> // just for our demo logic
#include <boost/spirit/home/x3.hpp> // idem
#include <numeric> // idem
namespace AppDomain {
struct Logic {
std::string banner;
Logic(std::string msg) : banner(std::move(msg)) {}
std::string Process(std::string request) {
std::string result;
auto fold = [&result](auto op, double initial) {
return [=, &result](auto& ctx) {
auto& args = _attr(ctx);
auto v = accumulate(args.begin(), args.end(), initial, op);
result = "Fold:" + std::to_string(v);
};
};
auto invalid = [&result](auto& ctx) {
result = "Invalid Command: " + _attr(ctx);
};
using namespace boost::spirit::x3;
auto args = rule<void, std::vector<double>>{} = '(' >> double_ % ',' >> ')';
auto add = "adding" >> args[fold(std::plus<>{}, 0)];
auto mul = "multiplying" >> args[fold(std::multiplies<>{}, 1)];
auto err = lexeme[+char_][invalid];
phrase_parse(begin(request), end(request), add | mul | err, blank);
return banner + result;
}
};
} // namespace AppDomain
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
// Report a failure
void fail(beast::error_code ec, char const* what) {
std::cerr << what << ": " << ec.message() << "\n";
}
class session : public std::enable_shared_from_this<session> {
websocket::stream<beast::tcp_stream> ws_;
beast::flat_buffer buffer_;
std::string response_;
std::shared_ptr<AppDomain::Logic> logic_;
public:
explicit session(tcp::socket&& socket,
std::shared_ptr<AppDomain::Logic> logic)
: ws_(std::move(socket))
, logic_(logic) {}
void run() {
// Get on the correct executor
// strand for thread safety
dispatch(
ws_.get_executor(),
beast::bind_front_handler(&session::on_run, shared_from_this()));
}
private:
void on_run() {
// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(
beast::role_type::server));
// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res) {
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) + " " +
g_app_name);
}));
// Accept the websocket handshake
ws_.async_accept(
beast::bind_front_handler(&session::on_accept, shared_from_this()));
}
void on_accept(beast::error_code ec) {
if (ec)
return fail(ec, "accept");
do_read();
}
void do_read() {
ws_.async_read(
buffer_,
beast::bind_front_handler(&session::on_read, shared_from_this()));
}
void on_read(beast::error_code ec, std::size_t /*bytes_transferred*/) {
if (ec == websocket::error::closed) return;
if (ec.failed()) return fail(ec, "read");
// Process the message
auto request = boost::algorithm::trim_copy(
beast::buffers_to_string(buffer_.data()));
std::cout << "Processing: " << std::quoted(request) << " from "
<< beast::get_lowest_layer(ws_).socket().remote_endpoint()
<< std::endl;
response_ = logic_->Process(request);
ws_.async_write(
net::buffer(response_),
beast::bind_front_handler(&session::on_write, shared_from_this()));
}
void on_write(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec)
return fail(ec, "write");
// Clear the buffer
buffer_.consume(buffer_.size());
// Do another read
do_read();
}
};
// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener> {
net::any_io_executor ex_;
tcp::acceptor acceptor_;
std::shared_ptr<AppDomain::Logic> logic_;
public:
listener(net::any_io_executor ex, tcp::endpoint endpoint,
std::shared_ptr<AppDomain::Logic> logic)
: ex_(ex)
, acceptor_(ex)
, logic_(logic) {
acceptor_.open(endpoint.protocol());
acceptor_.set_option(tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen(tcp::acceptor::max_listen_connections);
}
// Start accepting incoming connections
void run() { do_accept(); }
private:
void do_accept() {
// The new connection gets its own strand
acceptor_.async_accept(make_strand(ex_),
beast::bind_front_handler(&listener::on_accept,
shared_from_this()));
}
void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
fail(ec, "accept");
} else {
std::make_shared<session>(std::move(socket), logic_)->run();
}
// Accept another connection
do_accept();
}
};
int main(int argc, char* argv[]) {
g_app_name = std::filesystem::path(argv[0]).filename();
if (argc != 4) {
std::cerr << "Usage: " << g_app_name << " <address> <port> <threads>\n"
<< "Example:\n"
<< " " << g_app_name << " 0.0.0.0 8080 1\n";
return 1;
}
auto const address = net::ip::make_address(argv[1]);
auto const port = static_cast<uint16_t>(std::atoi(argv[2]));
auto const threads = std::max<int>(1, std::atoi(argv[3]));
auto logic = std::make_shared<AppDomain::Logic>("StackOverflow Demo/");
try {
// The io_context is required for all I/O
net::thread_pool ioc(threads);
std::make_shared<listener>(ioc.get_executor(),
tcp::endpoint{address, port}, logic)
->run();
ioc.join();
} catch (beast::system_error const& se) {
fail(se.code(), "listener");
}
}
为了回应评论,我再次具体化了发件箱模式。注意代码中的一些注释。
#include <boost/algorithm/string/trim.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <deque>
#include <filesystem>
#include <functional>
#include <iostream>
#include <list>
static std::string g_app_name = "app-logic-service";
#include <boost/core/demangle.hpp> // just for our demo logic
#include <boost/spirit/home/x3.hpp> // idem
#include <numeric> // idem
namespace AppDomain {
struct Logic {
std::string banner;
Logic(std::string msg) : banner(std::move(msg)) {}
std::string Process(std::string request) {
std::string result;
auto fold = [&result](auto op, double initial) {
return [=, &result](auto& ctx) {
auto& args = _attr(ctx);
auto v = accumulate(args.begin(), args.end(), initial, op);
result = "Fold:" + std::to_string(v);
};
};
auto invalid = [&result](auto& ctx) {
result = "Invalid Command: " + _attr(ctx);
};
using namespace boost::spirit::x3;
auto args = rule<void, std::vector<double>>{} = '(' >> double_ % ',' >> ')';
auto add = "adding" >> args[fold(std::plus<>{}, 0)];
auto mul = "multiplying" >> args[fold(std::multiplies<>{}, 1)];
auto err = lexeme[+char_][invalid];
phrase_parse(begin(request), end(request), add | mul | err, blank);
return banner + result;
}
};
} // namespace AppDomain
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
// Report a failure
void fail(beast::error_code ec, char const* what) {
std::cerr << what << ": " << ec.message() << "\n";
}
class session : public std::enable_shared_from_this<session> {
websocket::stream<beast::tcp_stream> ws_;
beast::flat_buffer buffer_;
std::shared_ptr<AppDomain::Logic> logic_;
public:
explicit session(tcp::socket&& socket,
std::shared_ptr<AppDomain::Logic> logic)
: ws_(std::move(socket))
, logic_(logic) {}
void run() {
// Get on the correct executor
// strand for thread safety
dispatch(
ws_.get_executor(),
beast::bind_front_handler(&session::on_run, shared_from_this()));
}
void post_message(std::string msg) {
post(ws_.get_executor(),
[self = shared_from_this(), this, msg = std::move(msg)] {
do_post_message(std::move(msg));
});
}
private:
void on_run() {
// on the strand
// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(
beast::role_type::server));
// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res) {
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) + " " +
g_app_name);
}));
// Accept the websocket handshake
ws_.async_accept(
beast::bind_front_handler(&session::on_accept, shared_from_this()));
}
void on_accept(beast::error_code ec) {
// on the strand
if (ec)
return fail(ec, "accept");
do_read();
}
void do_read() {
// on the strand
buffer_.clear();
ws_.async_read(
buffer_,
beast::bind_front_handler(&session::on_read, shared_from_this()));
}
void on_read(beast::error_code ec, std::size_t /*bytes_transferred*/) {
// on the strand
if (ec == websocket::error::closed) return;
if (ec.failed()) return fail(ec, "read");
// Process the message
auto request = boost::algorithm::trim_copy(
beast::buffers_to_string(buffer_.data()));
std::cout << "Processing: " << std::quoted(request) << " from "
<< beast::get_lowest_layer(ws_).socket().remote_endpoint()
<< std::endl;
do_post_message(logic_->Process(request)); // already on the strand
do_read();
}
std::deque<std::string> _outbox;
void do_post_message(std::string msg) {
// on the strand
_outbox.push_back(std::move(msg));
if (_outbox.size() == 1)
do_write_loop();
}
void do_write_loop() {
// on the strand
if (_outbox.empty())
return;
ws_.async_write( //
net::buffer(_outbox.front()),
[self = shared_from_this(), this] //
(beast::error_code ec, size_t bytes_transferred) {
// on the strand
boost::ignore_unused(bytes_transferred);
if (ec)
return fail(ec, "write");
_outbox.pop_front();
do_write_loop();
});
}
};
// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener> {
net::any_io_executor ex_;
tcp::acceptor acceptor_;
std::shared_ptr<AppDomain::Logic> logic_;
public:
listener(net::any_io_executor ex, tcp::endpoint endpoint,
std::shared_ptr<AppDomain::Logic> logic)
: ex_(ex)
, acceptor_(make_strand(ex)) // NOTE to guard sessions_
, logic_(logic) {
acceptor_.open(endpoint.protocol());
acceptor_.set_option(tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen(tcp::acceptor::max_listen_connections);
}
// Start accepting incoming connections
void run() { do_accept(); }
void broadcast(std::string msg) {
post(acceptor_.get_executor(),
beast::bind_front_handler(&listener::do_broadcast,
shared_from_this(), std::move(msg)));
}
private:
using handle_t = std::weak_ptr<session>;
std::list<handle_t> sessions_;
void do_broadcast(std::string const& msg) {
for (auto handle : sessions_)
if (auto sess = handle.lock())
sess->post_message(msg);
}
void do_accept() {
// The new connection gets its own strand
acceptor_.async_accept(make_strand(ex_),
beast::bind_front_handler(&listener::on_accept,
shared_from_this()));
}
void on_accept(beast::error_code ec, tcp::socket socket) {
// on the strand
if (ec) {
fail(ec, "accept");
} else {
auto sess = std::make_shared<session>(std::move(socket), logic_);
sessions_.emplace_back(sess);
// optionally:
sessions_.remove_if(std::mem_fn(&handle_t::expired));
sess->run();
}
// Accept another connection
do_accept();
}
};
static void emulate_hardware_stuff(std::shared_ptr<listener> srv) {
using std::this_thread::sleep_for;
using namespace std::chrono_literals;
// Extremely simplistic. Instead I'd recommend `steady_timer` with
// `_async_wait` here, but since I'm just making a sketch...
unsigned i = 0;
while (true) {
sleep_for(1s);
srv->broadcast("Hardware thing #" + std::to_string(++i));
}
}
int main(int argc, char* argv[]) {
g_app_name = std::filesystem::path(argv[0]).filename();
if (argc != 4) {
std::cerr << "Usage: " << g_app_name << " <address> <port> <threads>\n"
<< "Example:\n"
<< " " << g_app_name << " 0.0.0.0 8080 1\n";
return 1;
}
auto const address = net::ip::make_address(argv[1]);
auto const port = static_cast<uint16_t>(std::atoi(argv[2]));
auto const threads = std::max<int>(1, std::atoi(argv[3]));
auto logic = std::make_shared<AppDomain::Logic>("StackOverflow Demo/");
try {
// The io_context is required for all I/O
net::thread_pool ioc(threads);
auto srv = std::make_shared<listener>( //
ioc.get_executor(), //
tcp::endpoint{address, port}, //
logic);
srv->run();
std::thread something_hardware(emulate_hardware_stuff, srv);
ioc.join();
something_hardware.join();
} catch (beast::system_error const& se) {
fail(se.code(), "listener");
}
}
现场演示:
关于c++ - Boost Beast Async Websocket Server 如何与 session 交互?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71638897/
在 python 交互中,有没有办法在每次输入命令后自动从 python 文件执行方法? 例如:如果我有一个打印文件信息的方法,但我不想不断调用该方法,我怎样才能让它在 python 交互中的每个命令
当你使用Edge等浏览器或系统软件播放媒体时,Windows控制中心就会出现相应的媒体信息以及控制播放的功能,如图。 SMTC (SystemMedia
我在主菜单上使用标准的剪切,复制,粘贴操作。它们具有快捷键Ctrl-X,Ctrl-C和Ctrl-V。 当我打开模态表单时FindFilesForm.ShowModal,然后所有快捷方式都可以从表单中使
这是我想要实现的目标:打开一个 shell(korn 或 bash,没关系),从那个 shell,我想打开一个 ssh 连接(ssh user@host)。在某些时候,可能会提示我输入密码,或者可能会
我正在测试在C / C++程序中嵌入Python,但是我缺乏理解。 测试程序很简单: 初始化解释器; 从启动Timer的文件中执行python脚本(每0.1秒增加一个变量); 等待5秒(C++); 从
我正在尝试用java创建Excel文件。现在,我正在使用 Apache POI 库创建文件并将其保存到本地驱动器。有没有办法启动 Excel 并填充数据而不将其保存到硬盘驱动器? 最佳答案 考虑 Do
我有一个黑盒函数,它接受大约 10 个整数输入。该函数返回一个 pandas 数据框,我想捕获输出窗口(通过使用 bbwidget.children)并显示在布局中的其他地方。到目前为止,交互/交互似
我正在体验新的 QQuickWidget。我如何在 QQuickWidget 和 C++ 之间进行交互? C++ QQuickWidget *view = new QQuickWidget(); vi
我正在尝试设置一个使用 TWAIN 的 C# 应用程序 example from code project 除了我需要将 Form 转换为 IMessageFilter 和调用 IMessageFil
我想在使用 redis 的 python 中编写应用程序。我用谷歌搜索,但找不到我的问题的任何结果。通常,我这样做: import redis rs = redis.Redis('localhost'
最近做一个小项目,网页中嵌入google maps,输入经纬度坐标可以定位地图位置并加注标记,点击标记获取远端摄像头数据并在视频窗口实现播放。在实际操作过程中,由于经纬度数据和视频登录的用户名密码数
我需要在这里澄清一些事情: 我有一个网站,每次在浏览器中重新加载网站时都会更新两个变量的值。这个页面显然是一个 HTML 页面,但变量是由 javascript 函数更新的。此页面在我的服务器上运行。
我注意到,auto忽略双条件。这是一个简化的示例: Parameter A B : Prop. Parameter A_iff_B : A B. Theorem foo1: A -> B. Proo
使用 interactive使用多个小部件相当简单,例如: interactive(foo, w1=widget1, w2=widget2, ...) 但是,我想使用 VBox 和 HBox 的组合以
我们提供类似于 imagemagick 的浏览器页面 JavaScript,可帮助人们将图像转换为不同大小和格式。但是,它需要网页交互。 是否可以让人们自动进行这种交互——无需将图像发送到我们的服务器
大家好,我正在尝试制作一个具有大量动画和效果的交互式 UI。 但我不知道是否: 核心图形可以支持用户交互(触摸、拖动等) 核心图形支持对象旋转 核心图形可以以任何方式与 UIKit 和核心动画交互 谢
这是获取维基百科上一篇关于高盛的文章的介绍的链接。 http://en.wikipedia.org/w/api.php?action=query&prop=extracts&titles=Goldma
我正在尝试编写一个 AppleScript 来查询 iCal 并在任何日历中查找给定日期的所有事件。 我首先编写了一个简单的脚本,它对给定日历中的每个事件执行一些简单的操作: tell applica
我在我的 hudson 服务器上使用 jira 插件。将代码提交到 svn 时,我的提交注释包含在我的 jira 问题中,但有什么办法可以将注释归因于执行提交的实际人员,而不是让一个全局 jira 用
我正在播放一段视频来装饰我的用户界面。我隐藏了 AV 播放器控件,但用户仍然可以控制视频。例如,他们可以使用滑动手势快进或快退。 这让我特别惊讶,因为 AVPlayerView 上面有一个覆盖 Vie
我是一名优秀的程序员,十分优秀!