- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我想创建一个实现一个线程一个连接模型的应用程序。但是每个连接都必须是可停止的。我试过this boost.asio example它实现了我想要的阻塞版本。但经过一番询问后,我发现没有可靠的方法来停止该示例的 session 。所以我试图实现我自己的。我不得不使用异步函数。由于我想让一个线程只管理一个连接,并且无法控制哪个异步作业用于哪个线程,所以我决定为每个连接/套接字/线程使用 io_service
。
那么这是一个好的方法吗,你知道更好的方法吗?
我的代码在这里,您可以检查和查看它:
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/array.hpp>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>
#include <list>
#include <iostream>
#include <string>
#include <istream>
namespace ba = boost::asio;
namespace bs = boost::system;
namespace b = boost;
typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket socket_type;
const short PORT = 11235;
class Server;
// A connection has its own io_service and socket
class Connection {
protected:
ba::io_service service;
socket_type sock;
b::thread *thread;
ba::streambuf stream_buffer; // for reading etc
Server *server;
void AsyncReadString() {
ba::async_read_until(
sock,
stream_buffer,
'\0', // null-char is a delimiter
b::bind(&Connection::ReadHandler, this,
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
void AsyncWriteString(const std::string &s) {
std::string newstr = s + '\0'; // add a null char
ba::async_write(
sock,
ba::buffer(newstr.c_str(), newstr.size()),
b::bind(&Connection::WriteHandler, this,
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
virtual void Session() {
AsyncReadString();
service.run(); // run at last
}
std::string ExtractString() {
std::istream is(&stream_buffer);
std::string s;
std::getline(is, s, '\0');
return s;
}
virtual void ReadHandler(
const bs::error_code &ec,
std::size_t bytes_transferred) {
if (!ec) {
std::cout << (ExtractString() + "\n");
std::cout.flush();
AsyncReadString(); // read again
}
else {
// do nothing, "this" will be deleted later
}
}
virtual void WriteHandler(
const bs::error_code &ec,
std::size_t bytes_transferred) {
}
public:
Connection(Server *s) :
service(),
sock(service),
server(s),
thread(NULL)
{ }
socket_type& Socket() {
return sock;
}
void Start() {
if (thread) delete thread;
thread = new b::thread(
b::bind(&Connection::Session, this));
}
void Join() {
if (thread) thread->join();
}
void Stop() {
service.stop();
}
void KillMe();
virtual ~Connection() {
}
};
// a server also has its own io_service but it's only used for accepting
class Server {
public:
std::list<Connection*> Connections;
protected:
ba::io_service service;
acceptor_type acc;
b::thread *thread;
virtual void AcceptHandler(const bs::error_code &ec) {
if (!ec) {
Connections.back()->Start();
Connections.push_back(new Connection(this));
acc.async_accept(
Connections.back()->Socket(),
b::bind(&Server::AcceptHandler,
this,
ba::placeholders::error));
}
else {
// do nothing
// since the new session will be deleted
// automatically by the destructor
}
}
virtual void ThreadFunc() {
Connections.push_back(new Connection(this));
acc.async_accept(
Connections.back()->Socket(),
b::bind(&Server::AcceptHandler,
this,
ba::placeholders::error));
service.run();
}
public:
Server():
service(),
acc(service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
thread(NULL)
{ }
void Start() {
if (thread) delete thread;
thread = new b::thread(
b::bind(&Server::ThreadFunc, this));
}
void Stop() {
service.stop();
}
void Join() {
if (thread) thread->join();
}
void StopAllConnections() {
for (auto c : Connections) {
c->Stop();
}
}
void JoinAllConnections() {
for (auto c : Connections) {
c->Join();
}
}
void KillAllConnections() {
for (auto c : Connections) {
delete c;
}
Connections.clear();
}
void KillConnection(Connection *c) {
Connections.remove(c);
delete c;
}
virtual ~Server() {
delete thread;
// connection should be deleted by the user (?)
}
};
void Connection::KillMe() {
server->KillConnection(this);
}
int main() {
try {
Server s;
s.Start();
std::cin.get(); // wait for enter
s.Stop(); // stop listening first
s.StopAllConnections(); // interrupt ongoing connections
s.Join(); // wait for server, should return immediately
s.JoinAllConnections(); // wait for ongoing connections
s.KillAllConnections(); // destroy connection objects
// at the end of scope, Server will be destroyed
}
catch (std::exception &e) {
std::cerr << "Exception: " << e.what() << std::endl;
return 1;
}
return 0;
}
最佳答案
没有。每个连接都使用一个 io_service
对象绝对是一种味道。特别是因为您还在专用线程上运行每个连接。
此时您必须问问自己,异步给您带来了什么?您可以让所有代码同步并拥有完全相同数量的线程等。
显然,您希望将连接多路复用到数量少得多的服务上。在实践中有一些合理的模型,例如
具有单个服务线程的单个 io_service
(这通常很好)。在服务上排队的任何任务都不会阻塞很长时间,否则延迟会受到影响
单个 io_service
具有多个执行处理程序的线程。池中的线程数应该足以服务最大值。支持的同时 CPU 密集型任务的数量(或者,延迟将开始上升)
每个线程一个 io_service,通常每个逻辑核心一个线程,并具有线程关联性,以便它“粘附”到该核心。这可能是缓存位置的理想选择
这是一个演示,使用上面的选项 1 显示惯用风格:
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>
namespace ba = boost::asio;
namespace bs = boost::system;
namespace b = boost;
typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket socket_type;
const short PORT = 11235;
// A connection has its own io_service and socket
class Connection : public b::enable_shared_from_this<Connection>
{
public:
typedef boost::shared_ptr<Connection> Ptr;
protected:
socket_type sock;
ba::streambuf stream_buffer; // for reading etc
std::string message;
void AsyncReadString() {
std::cout << __PRETTY_FUNCTION__ << "\n";
ba::async_read_until(
sock,
stream_buffer,
'\0', // null-char is a delimiter
b::bind(&Connection::ReadHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
void AsyncWriteString(const std::string &s) {
std::cout << __PRETTY_FUNCTION__ << "\n";
message = s;
ba::async_write(
sock,
ba::buffer(message.c_str(), message.size()+1),
b::bind(&Connection::WriteHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
std::string ExtractString() {
std::cout << __PRETTY_FUNCTION__ << "\n";
std::istream is(&stream_buffer);
std::string s;
std::getline(is, s, '\0');
return s;
}
void ReadHandler(
const bs::error_code &ec,
std::size_t bytes_transferred)
{
std::cout << __PRETTY_FUNCTION__ << "\n";
if (!ec) {
std::cout << (ExtractString() + "\n");
std::cout.flush();
AsyncReadString(); // read again
}
else {
// do nothing, "this" will be deleted later
}
}
void WriteHandler(const bs::error_code &ec, std::size_t bytes_transferred) {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
public:
Connection(ba::io_service& svc) : sock(svc) { }
virtual ~Connection() {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
socket_type& Socket() { return sock; }
void Session() { AsyncReadString(); }
void Stop() { sock.cancel(); }
};
// a server also has its own io_service but it's only used for accepting
class Server {
public:
std::list<boost::weak_ptr<Connection> > m_connections;
protected:
ba::io_service _service;
boost::optional<ba::io_service::work> _work;
acceptor_type _acc;
b::thread thread;
void AcceptHandler(const bs::error_code &ec, Connection::Ptr accepted) {
if (!ec) {
accepted->Session();
DoAccept();
}
else {
// do nothing the new session will be deleted automatically by the
// destructor
}
}
void DoAccept() {
auto newaccept = boost::make_shared<Connection>(_service);
_acc.async_accept(
newaccept->Socket(),
b::bind(&Server::AcceptHandler,
this,
ba::placeholders::error,
newaccept
));
}
public:
Server():
_service(),
_work(ba::io_service::work(_service)),
_acc(_service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
thread(b::bind(&ba::io_service::run, &_service))
{ }
~Server() {
std::cout << __PRETTY_FUNCTION__ << "\n";
Stop();
_work.reset();
if (thread.joinable()) thread.join();
}
void Start() {
std::cout << __PRETTY_FUNCTION__ << "\n";
DoAccept();
}
void Stop() {
std::cout << __PRETTY_FUNCTION__ << "\n";
_acc.cancel();
}
void StopAllConnections() {
std::cout << __PRETTY_FUNCTION__ << "\n";
for (auto c : m_connections) {
if (auto p = c.lock())
p->Stop();
}
}
};
int main() {
try {
Server s;
s.Start();
std::cerr << "Shutdown in 2 seconds...\n";
b::this_thread::sleep_for(b::chrono::seconds(2));
std::cerr << "Stop accepting...\n";
s.Stop();
std::cerr << "Shutdown...\n";
s.StopAllConnections(); // interrupt ongoing connections
} // destructor of Server will join the service thread
catch (std::exception &e) {
std::cerr << __FUNCTION__ << ":" << __LINE__ << "\n";
std::cerr << "Exception: " << e.what() << std::endl;
return 1;
}
std::cerr << "Byebye\n";
}
我修改了 main()
以在没有用户干预的情况下运行 2 秒。这样我就可以演示了 Live On Coliru (当然,它受限于客户端进程的数量)。
如果您在很多(很多)客户端上运行它,使用例如
$ time (for a in {1..1000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\\0" | netcat localhost 11235)& done; wait)
你会发现两个第二个窗口都处理了它们:
$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
2 hello world 28214
2 hello world 4554
2 hello world 6216
2 hello world 7864
2 hello world 9966
2 void Server::Stop()
1000 std::string Connection::ExtractString()
1001 virtual Connection::~Connection()
2000 void Connection::AsyncReadString()
2000 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)
如果你真的发狂并 boost 1000
到例如100000
那里,你会得到类似的东西:
sehe@desktop:/tmp$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
2 hello world 5483
2 hello world 579
2 hello world 5865
2 hello world 938
2 void Server::Stop()
3 hello world 9613
1741 std::string Connection::ExtractString()
1742 virtual Connection::~Connection()
3482 void Connection::AsyncReadString()
3482 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)
服务器重复运行 2 秒。
关于c++ - boost .Asio : Is it a good thing to use a `io_service` per connection/socket?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31486010/
我不明白以下两个语句之间的区别: Thing thing; Thing thing = Thing(); 两者都创建了一个Thing对象,并把它放在变量thing中,对吧?如果是,两个问题: 1- 两
我将 RPi3 与 Android Things 0.5.1 一起使用。虽然我在我的 AndroidManifest.xml许可: 我得到了异常(exception): java.lang.Secu
我在使用 Microsoft Access 2003 时遇到问题,它提示以下声明: select cardnr from change where year(date)<2009 group by
如何使用预览版 2 更新我的 Android Things 预览版 1?是否有自动 OTA 可用,或者现在不可能,所以我必须刷新 SD 卡(来自我的 Raspberry Pi)? 最佳答案 编辑 1:
任何人都可以解释为什么这是真的 $a = Array('b' = > 'okokokok'); if ( isset( $a['b']['ok'] ) ) { echo $a['b
无法找到如何从相机拍摄单张照片的方法。 在使用图像阅读器并使用 TEMPLATE_STILL_CAPTURE 发送捕获请求时,应用程序只是停在这个位置,没有其他任何事情发生。 尝试使用我的代码,认为问
我知道错误的含义,但我无法修复它。我正在使用 mockers 来测试我的工作,但在尝试验证提供给模拟特征函数的结构参数时遇到了困难。简化代码: #[cfg(test)] extern crate mo
在官方 schema.org 文档中,我可以看到每个类都继承了 Thing 的属性。类,例如 Book类还有name , image等等(来自 Thing 的属性)。 我的问题是,我可以获得例如 im
假设 $thing 可以是任何类型,这两个语句的执行是否相同? if (!empty($thing)) { // do stuff } if ($thing) { // do stuf
代码: static const void *const uniquePtr = &uniquePtr; …将在编译单元中提供一个唯一的void指针。以这种方式为喜欢将名称作为 void* 的 API
我在github中有这段代码 https://github.com/neuberfran/sample-button-master 我有问题: java.lang.IncompatibleClassC
试图从 gpio 读取模拟数据,但只看到一种将信号强度返回为 LOW 或 HIGH( bool 值)的方式。 在文档和代码引用中找不到任何解决方案。 现在甚至可能吗? 最佳答案 树莓派 3 没有 an
在使用 Android Things 控制台创建另一个 Android Things 产品时,我再次想知道要选择什么 oem 分区大小。此设置的信息是“必须在 32 到 512 MB 之间”,这是一个
我在Raspberry Pi上安装了一个小屏幕,并根据屏幕的documentation: Write the image to a TF card and append the following l
我计划将 Raspberry PI 用于 Android Things 项目,我可以在其上闪烁默认图像 (androidthings_rpi3_devpreview_4_1.zip)。我已经安装了我的
我知道 Android Things 支持 NDK。我找不到以一种或另一种方式说明 Android Things 是否支持 RenderScript Compute 的引用。我的假设是否定的(因为并非
昨天我开始了一个 Android Things 项目,我正在使用 Android Studio 测试该应用程序。为了使用 adb.exe 连接到设备,我使用了我在默认操作系统的应用程序(起始屏幕)上看
我正在解决的问题是在所有页面(问题、标签等)上调用 Stack Exchange API (1.1)。但事实上,这似乎也可能是一个普遍问题,所以我在这里发帖,而不是在 StackApps 上。 因此,
哪些因素决定哪种方法更合适? 最佳答案 我认为两者都有自己的位置。 你不应该简单地使用 DoSomethingToThing(Thing n)只是因为你认为“函数式编程很好”。同样,您不应该简单地使用
这个问题已经有答案了: using brackets with javascript import syntax (2 个回答) 已关闭 5 年前。 我正在观看 Pluralsight 上的一些培训视
我是一名优秀的程序员,十分优秀!