- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我想创建一个实现一个线程一个连接模型的应用程序。但是每个连接都必须是可停止的。我试过this boost.asio example它实现了我想要的阻塞版本。但经过一番询问后,我发现没有可靠的方法来停止该示例的 session 。所以我试图实现我自己的。我不得不使用异步函数。由于我想让一个线程只管理一个连接,并且无法控制哪个异步作业用于哪个线程,所以我决定为每个连接/套接字/线程使用 io_service
。
那么这是一个好的方法吗,你知道更好的方法吗?
我的代码在这里,您可以检查和查看它:
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/array.hpp>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>
#include <list>
#include <iostream>
#include <string>
#include <istream>
namespace ba = boost::asio;
namespace bs = boost::system;
namespace b = boost;
typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket socket_type;
const short PORT = 11235;
class Server;
// A connection has its own io_service and socket
class Connection {
protected:
ba::io_service service;
socket_type sock;
b::thread *thread;
ba::streambuf stream_buffer; // for reading etc
Server *server;
void AsyncReadString() {
ba::async_read_until(
sock,
stream_buffer,
'\0', // null-char is a delimiter
b::bind(&Connection::ReadHandler, this,
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
void AsyncWriteString(const std::string &s) {
std::string newstr = s + '\0'; // add a null char
ba::async_write(
sock,
ba::buffer(newstr.c_str(), newstr.size()),
b::bind(&Connection::WriteHandler, this,
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
virtual void Session() {
AsyncReadString();
service.run(); // run at last
}
std::string ExtractString() {
std::istream is(&stream_buffer);
std::string s;
std::getline(is, s, '\0');
return s;
}
virtual void ReadHandler(
const bs::error_code &ec,
std::size_t bytes_transferred) {
if (!ec) {
std::cout << (ExtractString() + "\n");
std::cout.flush();
AsyncReadString(); // read again
}
else {
// do nothing, "this" will be deleted later
}
}
virtual void WriteHandler(
const bs::error_code &ec,
std::size_t bytes_transferred) {
}
public:
Connection(Server *s) :
service(),
sock(service),
server(s),
thread(NULL)
{ }
socket_type& Socket() {
return sock;
}
void Start() {
if (thread) delete thread;
thread = new b::thread(
b::bind(&Connection::Session, this));
}
void Join() {
if (thread) thread->join();
}
void Stop() {
service.stop();
}
void KillMe();
virtual ~Connection() {
}
};
// a server also has its own io_service but it's only used for accepting
class Server {
public:
std::list<Connection*> Connections;
protected:
ba::io_service service;
acceptor_type acc;
b::thread *thread;
virtual void AcceptHandler(const bs::error_code &ec) {
if (!ec) {
Connections.back()->Start();
Connections.push_back(new Connection(this));
acc.async_accept(
Connections.back()->Socket(),
b::bind(&Server::AcceptHandler,
this,
ba::placeholders::error));
}
else {
// do nothing
// since the new session will be deleted
// automatically by the destructor
}
}
virtual void ThreadFunc() {
Connections.push_back(new Connection(this));
acc.async_accept(
Connections.back()->Socket(),
b::bind(&Server::AcceptHandler,
this,
ba::placeholders::error));
service.run();
}
public:
Server():
service(),
acc(service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
thread(NULL)
{ }
void Start() {
if (thread) delete thread;
thread = new b::thread(
b::bind(&Server::ThreadFunc, this));
}
void Stop() {
service.stop();
}
void Join() {
if (thread) thread->join();
}
void StopAllConnections() {
for (auto c : Connections) {
c->Stop();
}
}
void JoinAllConnections() {
for (auto c : Connections) {
c->Join();
}
}
void KillAllConnections() {
for (auto c : Connections) {
delete c;
}
Connections.clear();
}
void KillConnection(Connection *c) {
Connections.remove(c);
delete c;
}
virtual ~Server() {
delete thread;
// connection should be deleted by the user (?)
}
};
void Connection::KillMe() {
server->KillConnection(this);
}
int main() {
try {
Server s;
s.Start();
std::cin.get(); // wait for enter
s.Stop(); // stop listening first
s.StopAllConnections(); // interrupt ongoing connections
s.Join(); // wait for server, should return immediately
s.JoinAllConnections(); // wait for ongoing connections
s.KillAllConnections(); // destroy connection objects
// at the end of scope, Server will be destroyed
}
catch (std::exception &e) {
std::cerr << "Exception: " << e.what() << std::endl;
return 1;
}
return 0;
}
最佳答案
没有。每个连接都使用一个 io_service
对象绝对是一种味道。特别是因为您还在专用线程上运行每个连接。
此时您必须问问自己,异步给您带来了什么?您可以让所有代码同步并拥有完全相同数量的线程等。
显然,您希望将连接多路复用到数量少得多的服务上。在实践中有一些合理的模型,例如
具有单个服务线程的单个 io_service
(这通常很好)。在服务上排队的任何任务都不会阻塞很长时间,否则延迟会受到影响
单个 io_service
具有多个执行处理程序的线程。池中的线程数应该足以服务最大值。支持的同时 CPU 密集型任务的数量(或者,延迟将开始上升)
每个线程一个 io_service,通常每个逻辑核心一个线程,并具有线程关联性,以便它“粘附”到该核心。这可能是缓存位置的理想选择
这是一个演示,使用上面的选项 1 显示惯用风格:
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>
namespace ba = boost::asio;
namespace bs = boost::system;
namespace b = boost;
typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket socket_type;
const short PORT = 11235;
// A connection has its own io_service and socket
class Connection : public b::enable_shared_from_this<Connection>
{
public:
typedef boost::shared_ptr<Connection> Ptr;
protected:
socket_type sock;
ba::streambuf stream_buffer; // for reading etc
std::string message;
void AsyncReadString() {
std::cout << __PRETTY_FUNCTION__ << "\n";
ba::async_read_until(
sock,
stream_buffer,
'\0', // null-char is a delimiter
b::bind(&Connection::ReadHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
void AsyncWriteString(const std::string &s) {
std::cout << __PRETTY_FUNCTION__ << "\n";
message = s;
ba::async_write(
sock,
ba::buffer(message.c_str(), message.size()+1),
b::bind(&Connection::WriteHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
std::string ExtractString() {
std::cout << __PRETTY_FUNCTION__ << "\n";
std::istream is(&stream_buffer);
std::string s;
std::getline(is, s, '\0');
return s;
}
void ReadHandler(
const bs::error_code &ec,
std::size_t bytes_transferred)
{
std::cout << __PRETTY_FUNCTION__ << "\n";
if (!ec) {
std::cout << (ExtractString() + "\n");
std::cout.flush();
AsyncReadString(); // read again
}
else {
// do nothing, "this" will be deleted later
}
}
void WriteHandler(const bs::error_code &ec, std::size_t bytes_transferred) {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
public:
Connection(ba::io_service& svc) : sock(svc) { }
virtual ~Connection() {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
socket_type& Socket() { return sock; }
void Session() { AsyncReadString(); }
void Stop() { sock.cancel(); }
};
// a server also has its own io_service but it's only used for accepting
class Server {
public:
std::list<boost::weak_ptr<Connection> > m_connections;
protected:
ba::io_service _service;
boost::optional<ba::io_service::work> _work;
acceptor_type _acc;
b::thread thread;
void AcceptHandler(const bs::error_code &ec, Connection::Ptr accepted) {
if (!ec) {
accepted->Session();
DoAccept();
}
else {
// do nothing the new session will be deleted automatically by the
// destructor
}
}
void DoAccept() {
auto newaccept = boost::make_shared<Connection>(_service);
_acc.async_accept(
newaccept->Socket(),
b::bind(&Server::AcceptHandler,
this,
ba::placeholders::error,
newaccept
));
}
public:
Server():
_service(),
_work(ba::io_service::work(_service)),
_acc(_service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
thread(b::bind(&ba::io_service::run, &_service))
{ }
~Server() {
std::cout << __PRETTY_FUNCTION__ << "\n";
Stop();
_work.reset();
if (thread.joinable()) thread.join();
}
void Start() {
std::cout << __PRETTY_FUNCTION__ << "\n";
DoAccept();
}
void Stop() {
std::cout << __PRETTY_FUNCTION__ << "\n";
_acc.cancel();
}
void StopAllConnections() {
std::cout << __PRETTY_FUNCTION__ << "\n";
for (auto c : m_connections) {
if (auto p = c.lock())
p->Stop();
}
}
};
int main() {
try {
Server s;
s.Start();
std::cerr << "Shutdown in 2 seconds...\n";
b::this_thread::sleep_for(b::chrono::seconds(2));
std::cerr << "Stop accepting...\n";
s.Stop();
std::cerr << "Shutdown...\n";
s.StopAllConnections(); // interrupt ongoing connections
} // destructor of Server will join the service thread
catch (std::exception &e) {
std::cerr << __FUNCTION__ << ":" << __LINE__ << "\n";
std::cerr << "Exception: " << e.what() << std::endl;
return 1;
}
std::cerr << "Byebye\n";
}
我修改了 main()
以在没有用户干预的情况下运行 2 秒。这样我就可以演示了 Live On Coliru (当然,它受限于客户端进程的数量)。
如果您在很多(很多)客户端上运行它,使用例如
$ time (for a in {1..1000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\\0" | netcat localhost 11235)& done; wait)
你会发现两个第二个窗口都处理了它们:
$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
2 hello world 28214
2 hello world 4554
2 hello world 6216
2 hello world 7864
2 hello world 9966
2 void Server::Stop()
1000 std::string Connection::ExtractString()
1001 virtual Connection::~Connection()
2000 void Connection::AsyncReadString()
2000 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)
如果你真的发狂并 boost 1000
到例如100000
那里,你会得到类似的东西:
sehe@desktop:/tmp$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
2 hello world 5483
2 hello world 579
2 hello world 5865
2 hello world 938
2 void Server::Stop()
3 hello world 9613
1741 std::string Connection::ExtractString()
1742 virtual Connection::~Connection()
3482 void Connection::AsyncReadString()
3482 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)
服务器重复运行 2 秒。
关于c++ - boost .Asio : Is it a good thing to use a `io_service` per connection/socket?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44269836/
我正在使用 pytesseract(tesseract 版本 3.05)对以数字方式创建的打印 PDF 帐单进行 OCR(光学字符识别)。我对其进行预处理以去除任何颜色并将其设置为纯黑白和 600 D
以下是我尝试运行的代码,输出是Good。那么,我们可以使用类实现的接口(interface)的变量吗? interface IDummyInterface { public String TY
按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我正在编写一个 Java 客户端,它通过 HTTP/XML 与远程服务器通信。 服务器以 XML 格式向我的客户端发送命令,如下所示: C1 ..... 大约有 10 个或更多不同的命令(C1
我在 android 应用程序上有一个奇怪的问题,我没有这样的表异常,但我确定数据库存在。对于某些将我重定向到此处其他帖子的人,我想补充一点,我正在自己的手机上尝试该应用程序而不是模拟器,这是因为我在
我在谷歌上搜索 RNGCryptoServiceProvider,其中包含有关如何限制最大值和最小值之间的范围并仍然获得均匀分布的示例。在我使用模运算符之前,但有时我会得到奇怪的值(高于最大值)...
这个问题在这里已经有了答案: calling constructor of a class member in constructor (5 个答案) 关闭 4 年前。 我有一个非指针类成员需要在构
很难说出这里要问什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或夸夸其谈,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开,visit the help center . 关闭10
作为hadoop配置什么好.. 大量小型机器,每台具有 512 MB Ram 或少量大型机器(大约 2Gb 或 4GB Ram) 我可以选择两者中的任何一个,因为我的节点是虚拟机.. 请分享您的想法.
伙计们,我是vue新手所以不知道如何实现以下情况我如何获取当前选定行的数据这是代码
我经常访问一个名为 GOOD 的网站我特别喜欢一种审美风格;导航栏如何在网站背景中扩展其颜色。如果您访问该网站,就会明白我的意思。 在 CSS 中,我怎样才能以最简单的方式复制它?我已经用 z-ind
我有一个存在主义假设,例如: H : exists (a : A) (b : B) (c : C), P a b c 我想分解为: a : A b : B c : C H0 : P a b c 战术d
在 Github 上,我注意到一些拉取请求说它们“适合合并”,因为它们通过了 Travis构建通过。我已经使用了一点 Travis,它如何与 Github 上的拉取请求集成? 这是我正在谈论的内容的屏
刚刚在 Apple docs: 中找到 Note: Although good for occasional communication between threads, you should not
我正处于一个应用程序的概念阶段,该应用程序将有大量音频/视频输入和输出。我想用Java来做;但不知何故我还没有完全相信。你怎么认为?到底能有多糟糕?有什么建议吗? 为什么我想到 Java: 这是我最熟
我正在用 Java 重新实现 .Net API,该 API 指定了一大堆事件,但 java 并不隐式支持。 我将使用观察者模式,但由于事件的数量,我真的不想让界面变得困惑。 我想知道声明一个“Even
我的类有很多方法,其中一些方法是用注释标记的。每个方法可以有很多注释,例如 @StepAnnotation(name="Action1" ) @SequenceAnnotation(name="tra
启用GD后(通过调用enableSecureCommunication或对应用程序进行身份验证),将自动保护NSURLConnection。它是如何做到的? 最佳答案 我的猜测是,Good Dynam
我是 Vue 的新手并且陷入了困境,如果有人建议我如何做到这一点,我不知道该怎么做,让我先展示我的代码 save 并在脚本中 data(){ return{
考虑以下代码: private List types; if(!getTest().contains(type)) { return Color.LIGHT_GRAY;
我是一名优秀的程序员,十分优秀!