- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我正在从事一个涉及 boost::beast
websocket/http 混合服务器的项目,该服务器在 boost::asio
之上运行。我的项目很大程度上基于 advanced_server.cpp
示例来源。
它工作正常,但现在我正在尝试添加一项功能,该功能需要向所有 连接的客户端发送消息。
我对 boost::asio
不是很熟悉,但现在我看不出有什么方法可以产生类似“广播”事件的东西(如果这是正确的术语)。
我天真的方法是看看我是否可以让 websocket_session()
的构造附加事件监听器之类的东西,然后析构函数分离监听器。那时,我可以触发事件,并让所有当前有效的 websocket session (websocket_session()
的生命周期限定在其中)执行回调。
有https://stackoverflow.com/a/17029022/268006 ,它或多或少地通过 (ab) 使用 boost::asio::steady_timer
来完成我想要的,但这似乎是一种可怕的黑客来完成本应非常简单的事情。
基本上,给定一个有状态的 boost::asio
服务器,我如何对多个连接执行操作?
最佳答案
首先:您可以广播 UDP,但那不是连接的客户端。那只是……UDP。
其次,该链接显示了如何在 Asio 中拥有类似条件变量(事件)的界面。那只是你问题的一小部分。您忘记了大局:您需要以某种方式了解一组打开的连接:
weak_ptr
) 的容器选项 1. 非常适合性能,选项 2. 更适合灵 active (将事件源与订阅者解耦,使异构订阅者成为可能,例如,不来自连接)。
因为我认为选项 1. 对于线程来说更简单,更好的是 w.r.t.效率(例如,您可以在不复制的情况下从一个缓冲区为所有客户端提供服务)并且您可能不需要双重解耦信号/插槽,让我引用一个答案,其中我已经展示了尽可能多的纯 Asio(没有 Beast):
它展示了“连接池”的概念 - 它本质上是 weak_ptr<connection>
的线程安全容器具有一些垃圾收集逻辑的对象。
在 chatting about things 之后我想花时间实际演示这两种方法,所以我在说什么完全清楚。
首先让我们展示一个简单的、普通的异步 TCP 服务器
#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;
static bool s_verbose = false;
struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}
void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}
private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}
bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));
return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}
void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}
void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}
friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
};
struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}
void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}
private:
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
session->start();
if (!ec)
accept_loop();
});
}
ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};
int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;
ba::io_context ioc;
server s(ioc);
std::thread th([&ioc] { ioc.run(); }); // todo exception handling
std::this_thread::sleep_for(3s);
s.stop(); // active connections will continue
th.join();
}
因此,让我们添加同时发送到所有事件连接的“广播消息”。我们添加两个:
模拟全局“服务器事件”的一个,就像您在问题中描述的那样)。它从 main 中触发:
std::this_thread::sleep_for(1s);
auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active connections\n";
请注意我们是如何通过向每个已接受的连接注册一个弱指针并对每个连接进行操作来做到这一点的:
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
if (!ec) {
auto n = reg_connection(session);
session->start();
accept_loop();
broadcast("player #" + std::to_string(n) + " has entered the game\n");
}
});
broadcast
也直接从main
中使用并且很简单:
size_t broadcast(std::string const& msg) {
return for_each_active([msg](connection& c) { c.send(msg, true); });
}
using-asio-post
branch on github
#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;
static bool s_verbose = false;
struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}
void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}
private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}
bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));
return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}
void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}
void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}
friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
};
struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}
void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}
size_t broadcast(std::string const& msg) {
return for_each_active([msg](connection& c) { c.send(msg, true); });
}
private:
using connptr = std::shared_ptr<connection>;
using weakptr = std::weak_ptr<connection>;
std::mutex _mx;
std::vector<weakptr> _registered;
size_t reg_connection(weakptr wp) {
std::lock_guard<std::mutex> lk(_mx);
_registered.push_back(wp);
return _registered.size();
}
template <typename F>
size_t for_each_active(F f) {
std::vector<connptr> active;
{
std::lock_guard<std::mutex> lk(_mx);
for (auto& w : _registered)
if (auto c = w.lock())
active.push_back(c);
}
for (auto& c : active) {
std::cout << "(running action for " << c->_s.remote_endpoint() << ")" << std::endl;
f(*c);
}
return active.size();
}
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
if (!ec) {
auto n = reg_connection(session);
session->start();
accept_loop();
broadcast("player #" + std::to_string(n) + " has entered the game\n");
}
});
}
ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};
int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;
ba::io_context ioc;
server s(ioc);
std::thread th([&ioc] { ioc.run(); }); // todo exception handling
std::this_thread::sleep_for(1s);
auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active connections\n";
std::this_thread::sleep_for(2s);
s.stop(); // active connections will continue
th.join();
}
信号方法是 Dependency Inversion 的一个很好的例子.
最重要的注意事项:
scoped_connection
当 connection
时,订阅会被*自动 删除吗?被破坏The difference is key to understanding the added flexibility: the signal owner/invoker does not know anything about the subscribers. That's the decoupling/dependency inversion we're talking about
using-signals2
branch on github
#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
#include <boost/signals2.hpp>
namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;
static bool s_verbose = false;
struct connection : std::enable_shared_from_this<connection> {
connection(ba::io_context& ioc) : _s(ioc) {}
void start() { read_loop(); }
void send(std::string msg, bool at_front = false) {
post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
if (enqueue(std::move(msg), at_front))
write_loop();
});
}
private:
void do_echo() {
std::string line;
if (getline(std::istream(&_rx), line)) {
send(std::move(line) + '\n');
}
}
bool enqueue(std::string msg, bool at_front)
{ // returns true if need to start write loop
at_front &= !_tx.empty(); // no difference
if (at_front)
_tx.insert(std::next(begin(_tx)), std::move(msg));
else
_tx.push_back(std::move(msg));
return (_tx.size() == 1);
}
bool dequeue()
{ // returns true if more messages pending after dequeue
assert(!_tx.empty());
_tx.pop_front();
return !_tx.empty();
}
void write_loop() {
ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec && dequeue()) write_loop();
});
}
void read_loop() {
ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
do_echo();
if (!ec)
read_loop();
});
}
friend struct server;
ba::streambuf _rx;
std::list<std::string> _tx;
tcp::socket _s;
boost::signals2::scoped_connection _subscription;
};
struct server {
server(ba::io_context& ioc) : _ioc(ioc) {
_acc.bind({{}, 6767});
_acc.set_option(tcp::acceptor::reuse_address());
_acc.listen();
accept_loop();
}
void stop() {
_ioc.post([=] {
_acc.cancel();
_acc.close();
});
}
size_t broadcast(std::string const& msg) {
_broadcast_event(msg);
return _broadcast_event.num_slots();
}
private:
boost::signals2::signal<void(std::string const& msg)> _broadcast_event;
size_t reg_connection(connection& c) {
c._subscription = _broadcast_event.connect(
[&c](std::string msg){ c.send(msg, true); }
);
return _broadcast_event.num_slots();
}
void accept_loop() {
auto session = std::make_shared<connection>(_acc.get_io_context());
_acc.async_accept(session->_s, [this,session](error_code ec) {
auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;
if (!ec) {
auto n = reg_connection(*session);
session->start();
accept_loop();
broadcast("player #" + std::to_string(n) + " has entered the game\n");
}
});
}
ba::io_context& _ioc;
tcp::acceptor _acc{_ioc, tcp::v4()};
};
int main(int argc, char** argv) {
s_verbose = argc>1 && argv[1] == "-v"s;
ba::io_context ioc;
server s(ioc);
std::thread th([&ioc] { ioc.run(); }); // todo exception handling
std::this_thread::sleep_for(1s);
auto n = s.broadcast("random global event broadcast\n");
std::cout << "Global event broadcast reached " << n << " active subscribers\n";
std::this_thread::sleep_for(2s);
s.stop(); // active connections will continue
th.join();
}
See the diff between Approach 1. and 2.: Compare View on github
针对 3 个并发客户端运行时的输出示例:
(for a in {1..3}; do netcat localhost 6767 < /etc/dictionaries-common/words > echoed.$a& sleep .1; done; time wait)
关于c++ - boost ASIO : Send message to all connected clients,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49394277/
我很难理解为什么这段代码无法编译: use std::cell::{Ref, RefCell}; struct St { data: RefCell } impl St { pub f
我从Richard Blum的一本书《 C#网络编程》中读到有关套接字的信息。以下摘录指出,不保证Send()方法可以发送所有传递给它的数据。 byte[] data = new byte[1024]
我有以下程序,它必须一次读取 1MB 的文件,将其发送到服务器(每次总是 1MB)并返回哈希码: #include #include #include #include #include #
代码在底部。 第 207 行的 send() 命令本身可以正常工作。但是,当我在第 218 行添加 send() 命令时,第一个命令失败 - 给出错误“地址错误”。我已经确认第二个 send() 命令
标记包含 !Send 的类型背后的原因是什么?字段(如 Rc 或 NonNull )与 Send特征?例如,标准库的 LinkedList 以这种方式工作:它包含 Option>字段并实现 Send特
我是新手,我正在尝试学习 goroutines 中信号函数的一些基本用法。我在 go 中有一个无限循环。通过这个 for 循环,我通过 channel 将值传递给 goroutine。 我也有一个阈值
如果数据是从另一台计算机(首先)“发送”的,我如何设置我的套接字例程以“发送”(首先)或(切换)“接收”? 谢谢 通用代码: -(void) TcpClient{ char buffer[12
这个问题已经有答案了: Java multiple file transfer over socket (3 个回答) 已关闭 4 年前。 我正在使用 Java Socket 将文件发送到服务器,然后
根据以下示例中的类型,Go编译器似乎将执行两个完全不同的语义操作: chanA <-chanB 如果chanA是类型(chan chan <-字符串),则此操作会将本身类型chanB的类型(chan
我正在尝试在 VBA 中使用 WinSock2 从本地主机 TCP 流发送(以及稍后接收)数据。 目前,我主要尝试从此处复制客户端示例,https://msdn.microsoft.com/en-us
我在我的 Mac OS X Yosemite 控制台中看到了这个: AppleEvents: Send port for process has no send right, port=( port:
我知道Clojure的“代理”是ref,带有“操作”的添加工作队列。 Action 是使用ref的值在第一个位置调用的函数,可以将其传递给其他参数。操作将返回ref的新值。因此,“代理”是一种计算re
我无法将任何对象或数组传递给 IPCRenderer。 通过 ipcs 传递对象或数组时出现错误,我什至尝试通过使用 JSON.stringify 转换为字符串来发送,但它会将其转换为空对象字符串。
我正在使用unix scoket进行数据传输(SOCK_STREAM模式) 我需要发送超过100k个字符的字符串。首先,我发送一个字符串的长度-它是sizeof(int)个字节。 length = s
Clojure API 将这两个函数描述为: (send a f & args) - Dispatch an action to an agent. Returns the agent immedia
def send_Button(): try: myMsg = "ME: " + text.get() msg = text.get() con
Ruby 对象都有一个“发送”方法,但是,我正在尝试使用一个 Java 库 ( netty-tools ),它的一个接口(interface)上有一个“发送”方法。 用法应该是 java_obj.se
Feb 8, 2011 11:56:49 AM com.sun.xml.internal.messaging.saaj.client.p2p.HttpSOAPC onnection post SEVE
来自 man 2 send: MSG_MORE (since Linux 2.4.4) (…) Since Linux 2.6, this flag is also supported for UDP
我的网页中可以有一个按钮,用于将预填充的消息发送到特定号码吗? 我正在尝试 intent://send/+391234567890#Intent;scheme=smsto;package=com.wh
我是一名优秀的程序员,十分优秀!