- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
首先,我的母语不是英语,所以我可能会犯一些语法错误,抱歉......
我正在尝试使用 C++ 和 Boost 创建一个异步 TCP 服务器。我已经成功地接受了客户并收到了他们的消息,但我无法回复他们的消息。
我想要实现的是在 TCPServer 类上有一个方法来回复所有连接的客户端。我为此创建了一个方法,但是当我调用 TCPServer::write 时,我在 TcpConnectionHandler::handle_write 错误参数上收到“错误的文件描述符”错误。
你能帮我弄清楚我做错了什么吗?
tcp_server.h
#ifndef TCP_SERVER_
#define TCP_SERVER_
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <boost/enable_shared_from_this.hpp>
namespace my_project
{
class TcpConnectionHandler : public boost::enable_shared_from_this<TcpConnectionHandler>
{
public:
TcpConnectionHandler(std::string log_prefix, boost::asio::io_service& io_service, boost::function<void(std::string&)> received_message_callback);
boost::asio::ip::tcp::socket& socket();
void start();
void write(const std::string& message);
private:
void writeImpl(const std::string& message);
void write();
void handle_read(const boost::system::error_code& error, size_t bytes_transferred);
void handle_write(const boost::system::error_code& error, size_t bytes_transferred);
boost::asio::ip::tcp::socket socket_;
boost::asio::streambuf message_;
std::string log_prefix_;
boost::function<void(std::string&)> received_message_callback_;
std::deque<std::string> outbox_;
boost::asio::io_service& io_service_;
boost::asio::io_service::strand strand_;
};
class TcpServer
{
public:
TcpServer(std::string log_prefix, unsigned int port, boost::function<void(std::string&)> received_message_callback);
~TcpServer();
void start();
void write(std::string content);
private:
void start_accept();
void handle_accept(boost::shared_ptr<TcpConnectionHandler> connection, const boost::system::error_code& error);
boost::shared_ptr<TcpConnectionHandler> connection_;
boost::asio::io_service io_service_;
boost::asio::ip::tcp::acceptor acceptor_;
std::string log_prefix_;
boost::function<void(std::string&)> received_message_callback_;
boost::condition_variable connection_cond_;
boost::mutex connection_mutex_;
bool client_connected_;
boost::thread *io_thread_; /**< Thread to run boost.asio io_service. */
};
} // namespace my_project
#endif // #ifndef TCP_SERVER_
tcp_server.cpp
#include "tcp_server.h"
//#include "easylogging++.h"
//#include "utils.h"
namespace my_project
{
// TcpConnectionHandler
TcpConnectionHandler::TcpConnectionHandler(std::string log_prefix, boost::asio::io_service& io_service, boost::function<void(std::string&)> received_message_callback) : io_service_(io_service), strand_(io_service_), socket_(io_service), outbox_()
{
log_prefix_ = log_prefix;
received_message_callback_ = received_message_callback;
}
boost::asio::ip::tcp::socket& TcpConnectionHandler::socket()
{
return socket_;
}
void TcpConnectionHandler::start()
{
async_read_until(socket_,
message_,
"\r\n",
boost::bind(&TcpConnectionHandler::handle_read,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void TcpConnectionHandler::write(const std::string& message)
{
strand_.post(boost::bind(&TcpConnectionHandler::writeImpl, this, message));
}
void TcpConnectionHandler::writeImpl(const std::string& message)
{
outbox_.push_back( message );
if ( outbox_.size() > 1 ) {
// outstanding async_write
return;
}
this->write();
}
void TcpConnectionHandler::write()
{
const std::string& message = outbox_[0];
boost::asio::async_write(
socket_,
boost::asio::buffer( message.c_str(), message.size() ),
strand_.wrap(
boost::bind(
&TcpConnectionHandler::handle_write,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
)
);
}
void TcpConnectionHandler::handle_read(const boost::system::error_code& error, size_t bytes_transferred)
{
// Check for client disconnection
if ((boost::asio::error::eof == error) || (boost::asio::error::connection_reset == error))
{
//LOG(ERROR) << log_prefix_ << " TCP/IP client disconnected!";
return;
}
// Convert stream to string
std::istream stream(&message_);
std::istreambuf_iterator<char> eos;
std::string message_str(std::istreambuf_iterator<char>(stream), eos);
//LOG(DEBUG) << log_prefix_ << " communication object received message: " << getPrintableMessage(message_str);
std::istringstream iss(message_str);
std::string msg;
std::getline(iss, msg, '\r'); // Consumes from the streambuf.
// Discard the rest of the message from buffer
message_.consume(message_.size());
if (!error)
{
received_message_callback_(msg);
start();
}
else
{
// TODO: Handler here the error
}
}
void TcpConnectionHandler::handle_write(const boost::system::error_code& error, size_t bytes_transferred)
{
outbox_.pop_front();
if ( error ) {
std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl;
return;
}
if ( !outbox_.empty() ) {
// more messages to send
this->write();
}
}
// TcpServer
TcpServer::TcpServer(std::string log_prefix, unsigned int port, boost::function<void(std::string&)> received_message_callback) : acceptor_(io_service_, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)), client_connected_(false), io_thread_(NULL)
{
log_prefix_ = log_prefix;
received_message_callback_ = received_message_callback;
start_accept();
// Run io_service in secondary thread
io_thread_ = new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
}
TcpServer::~TcpServer()
{
if (io_thread_)
{
io_service_.stop();
io_thread_->interrupt();
io_thread_->join();
delete io_thread_;
}
}
void TcpServer::start()
{
// Wait until client is connected to our TCP server. (condition variable)
boost::unique_lock<boost::mutex> lock(connection_mutex_);
while (!client_connected_)
{
//LOG(INFO) << "Waiting for " << log_prefix_ << " client to establish connection...";
connection_cond_.wait(lock);
}
//LOG(INFO) << log_prefix_ << " client successfully connected.";
}
void TcpServer::write(std::string content)
{
connection_->write(content);
}
void TcpServer::start_accept()
{
// Create a new connection handler
connection_.reset(new TcpConnectionHandler(log_prefix_, acceptor_.get_io_service(), received_message_callback_));
// Asynchronous accept operation and wait for a new connection.
acceptor_.async_accept(connection_->socket(),
boost::bind(&TcpServer::handle_accept, this, connection_,
boost::asio::placeholders::error));
//LOG(DEBUG) << log_prefix_ << " communication object started asynchronous TCP/IP connection acceptance.";
}
void TcpServer::handle_accept(boost::shared_ptr<TcpConnectionHandler> connection, const boost::system::error_code& error)
{
if (!error)
{
//LOG(INFO) << log_prefix_ << " client connected!";
connection->start();
boost::mutex::scoped_lock lock(connection_mutex_);
client_connected_ = true;
connection_cond_.notify_one();
//LOG(INFO) << log_prefix_ << " client connection accepted";
}
start_accept();
}
}
提前致谢!
最佳答案
我稍微修改了code to be compatible with Boost 1.74.0 .
然后我用 ASAN 运行它:
=================================================================
==22695==ERROR: AddressSanitizer: heap-use-after-free on address 0x61b000000080 at pc 0x5571c3b61379 bp 0x7ffddce81980 sp 0x7ffddce81970
READ of size 8 at 0x61b000000080 thread T0
#0 0x5571c3b61378 in boost::asio::detail::strand_executor_service::strand_impl::~strand_i...
#1 0x5571c3c02599 in std::_Sp_counted_ptr<boost::asio::detail::strand_executor_service::s...
#2 0x5571c3b874e9 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr...
#3 0x5571c3b6d429 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /...
#4 0x5571c3b5ff84 in std::__shared_ptr<boost::asio::detail::strand_executor_service::stra...
#5 0x5571c3b5ffeb in std::shared_ptr<boost::asio::detail::strand_executor_service::strand...
#6 0x5571c3b7d8d0 in boost::asio::strand<boost::asio::execution::any_executor<boost::asio...
#7 0x5571c3c0da6c in void std::destroy_at<boost::asio::strand<boost::asio::execution::any...
#8 0x5571c3c0b761 in void std::allocator_traits<std::allocator<boost::asio::strand<boost:...
#9 0x5571c3c01847 in std::_Sp_counted_ptr_inplace<boost::asio::strand<boost::asio::execut...
#10 0x5571c3b874e9 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /us...
#11 0x5571c3b6d429 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() ...
#12 0x5571c3b25690 in std::__shared_ptr<void, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr(...
#13 0x5571c3b256f7 in std::shared_ptr<void>::~shared_ptr() /usr/include/c++/10/bits/share...
#14 0x5571c3b25769 in boost::asio::execution::detail::any_executor_base::destroy_shared(b...
#15 0x5571c3b24ef2 in boost::asio::execution::detail::any_executor_base::~any_executor_ba...
#16 0x5571c3b6a0f7 in boost::asio::execution::any_executor<boost::asio::execution::contex...
#17 0x5571c3bbf447 in my_project::TcpConnectionHandler::~TcpConnectionHandler() /home/seh...
#18 0x5571c3bbf4e1 in void boost::checked_delete<my_project::TcpConnectionHandler>(my_pro...
#19 0x5571c3c020a9 in boost::detail::sp_counted_impl_p<my_project::TcpConnectionHandler>:...
#20 0x5571c3b5bf6b in boost::detail::sp_counted_base::release() /home/sehe/custom/boost_1...
#21 0x5571c3b5c537 in boost::detail::shared_count::~shared_count() /home/sehe/custom/boos...
#22 0x5571c3b6a324 in boost::shared_ptr<my_project::TcpConnectionHandler>::~shared_ptr() ...
#23 0x5571c3b1d48e in my_project::TcpServer::~TcpServer() /home/sehe/Projects/stackoverfl...
#24 0x5571c3b1e837 in main /home/sehe/Projects/stackoverflow/test.cpp:266
#25 0x7ff1f049fbf6 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x21bf6)
#26 0x5571c3b18b99 in _start (/home/sehe/Projects/stackoverflow/sotest+0x182b99)
0x61b000000080 is located 0 bytes inside of 1640-byte region [0x61b000000080,0x61b0000006e8)
freed by thread T0 here:
#0 0x7ff1f1eb4407 in operator delete(void*, unsigned long) (/usr/lib/x86_64-linux-gnu/lib...
#1 0x5571c3c003f4 in boost::asio::detail::strand_executor_service::~strand_executor_servi...
#2 0x5571c3b35e4a in boost::asio::detail::service_registry::destroy(boost::asio::executio...
#3 0x5571c3b3578b in boost::asio::detail::service_registry::destroy_services() /home/sehe...
#4 0x5571c3b37a4f in boost::asio::execution_context::destroy() /home/sehe/custom/boost_1_...
#5 0x5571c3b3788a in boost::asio::execution_context::~execution_context() /home/sehe/cust...
#6 0x5571c3b54621 in boost::asio::io_context::~io_context() /home/sehe/custom/boost_1_74_...
#7 0x5571c3b1d43b in my_project::TcpServer::~TcpServer() /home/sehe/Projects/stackoverflo...
#8 0x5571c3b1e837 in main /home/sehe/Projects/stackoverflow/test.cpp:266
#9 0x7ff1f049fbf6 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x21bf6)
previously allocated by thread T0 here:
#0 0x7ff1f1eb33a7 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.6+...
#1 0x5571c3bc0faa in boost::asio::execution_context::service* boost::asio::detail::servic...
#2 0x5571c3b3623a in boost::asio::detail::service_registry::do_use_service(boost::asio::e...
#3 0x5571c3bb74f9 in boost::asio::detail::strand_executor_service& boost::asio::detail::s...
#4 0x5571c3bab8e7 in boost::asio::detail::strand_executor_service& boost::asio::use_servi...
#5 0x5571c3ba085f in std::shared_ptr<boost::asio::detail::strand_executor_service::strand...
#6 0x5571c3b92744 in boost::asio::strand<boost::asio::execution::any_executor<boost::asio...
#7 0x5571c3b7d844 in boost::asio::strand<boost::asio::execution::any_executor<boost::asio...
#8 0x5571c3b18e8a in my_project::TcpConnectionHandler::TcpConnectionHandler(std::__cxx11:...
#9 0x5571c3b1dbe4 in my_project::TcpServer::start_accept() /home/sehe/Projects/stackoverf...
#10 0x5571c3b1c775 in my_project::TcpServer::TcpServer(std::__cxx11::basic_string<char, s...
#11 0x5571c3b1e7b9 in main /home/sehe/Projects/stackoverflow/test.cpp:266
#12 0x7ff1f049fbf6 in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x21bf6)
SUMMARY: AddressSanitizer: heap-use-after-free /home/sehe/custom/boost_1_74_0/boost/asio/detail/impl/strand_executor_service.ipp:88 in boost::asio::detail::strand_executor_service::strand_impl::~strand_impl()
Shadow bytes around the buggy address:
0x0c367fff7fc0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
0x0c367fff7fd0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
0x0c367fff7fe0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
0x0c367fff7ff0: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
0x0c367fff8000: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
=>0x0c367fff8010:[fd]fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
0x0c367fff8020: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
0x0c367fff8030: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
0x0c367fff8040: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
0x0c367fff8050: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
0x0c367fff8060: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
Shadow byte legend (one shadow byte represents 8 application bytes):
Addressable: 00
Partially addressable: 01 02 03 04 05 06 07
Heap left redzone: fa
Freed heap region: fd
Stack left redzone: f1
Stack mid redzone: f2
Stack right redzone: f3
Stack after return: f5
Stack use after scope: f8
Global redzone: f9
Global init order: f6
Poisoned by user: f7
Container overflow: fc
Array cookie: ac
Intra object redzone: bb
ASan internal: fe
Left alloca redzone: ca
Right alloca redzone: cb
Shadow gap: cc
==22695==ABORTING
如您所见,
~TcpServer
析构函数导致连接使用
io_service
在它被摧毁之后。重新排序成员将解决此问题:
boost::shared_ptr<TcpConnectionHandler> connection_;
boost::asio::io_service io_service_;
boost::asio::ip::tcp::acceptor acceptor_;
应该
boost::asio::io_service io_service_;
boost::shared_ptr<TcpConnectionHandler> connection_;
boost::asio::ip::tcp::acceptor acceptor_;
更多的
stop()
和
interrupt()
线程?
Note also, there is no need to dynamically allocate the thread. Why should C++ programmers minimize use of 'new'?
TcpServer::~TcpServer() {
if (io_thread_.joinable()) {
//io_service_.stop();
//io_thread_.interrupt();
io_thread_.join();
}
}
终身问题:shared_from_this?
boost::bind(&TcpConnectionHandler::handle_write, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)
这段没能捕获到共享指针,导致
TcpConnectionHandler
被破坏。 (当
socket
被破坏时,它是关闭的)。其实你的问题是
UB因为它是 use-after-free 但你很“幸运”没有看到崩溃,这会让你看到无效句柄形式的 UB。
post(executor_,
boost::bind(&TcpConnectionHandler::writeImpl, shared_from_this(), message));
和
async_read_until(socket_, message_, "\r\n",
boost::bind(&TcpConnectionHandler::handle_read,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
现场演示
#define BOOST_BIND_NO_PLACEHOLDERS
#ifndef TCP_SERVER_
#define TCP_SERVER_
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <iostream>
namespace my_project {
class TcpConnectionHandler
: public boost::enable_shared_from_this<TcpConnectionHandler> {
public:
TcpConnectionHandler(
std::string log_prefix, boost::asio::any_io_executor executor,
boost::function<void(std::string&)> received_message_callback);
boost::asio::ip::tcp::socket& socket();
void start();
void write(const std::string& message);
private:
void writeImpl(const std::string& message);
void write();
void handle_read(const boost::system::error_code& error,
size_t bytes_transferred);
void handle_write(const boost::system::error_code& error,
size_t bytes_transferred);
boost::asio::any_io_executor executor_;
boost::asio::ip::tcp::socket socket_;
boost::asio::streambuf message_;
std::string log_prefix_;
boost::function<void(std::string&)> received_message_callback_;
std::deque<std::string> outbox_;
};
class TcpServer {
public:
TcpServer(std::string log_prefix, unsigned int port,
boost::function<void(std::string&)> received_message_callback);
~TcpServer();
void start();
void write(std::string content);
private:
void start_accept();
void handle_accept(boost::shared_ptr<TcpConnectionHandler> connection,
const boost::system::error_code& error);
boost::asio::io_service io_service_;
boost::shared_ptr<TcpConnectionHandler> connection_;
boost::asio::ip::tcp::acceptor acceptor_;
std::string log_prefix_;
boost::function<void(std::string&)> received_message_callback_;
boost::condition_variable connection_cond_;
boost::mutex connection_mutex_;
bool client_connected_;
boost::thread io_thread_; /**< Thread to run boost.asio io_service. */
};
} // namespace my_project
#endif // #ifndef TCP_SERVER_
//#include "tcp_server.h"
//#include "easylogging++.h"
//#include "utils.h"
namespace my_project {
// TcpConnectionHandler
TcpConnectionHandler::TcpConnectionHandler(
std::string log_prefix, boost::asio::any_io_executor executor,
boost::function<void(std::string&)> received_message_callback)
: executor_(make_strand(executor)),
socket_(executor_),
log_prefix_(log_prefix),
received_message_callback_(received_message_callback)
{ }
boost::asio::ip::tcp::socket& TcpConnectionHandler::socket() { return socket_; }
void TcpConnectionHandler::start() {
async_read_until(socket_, message_, "\r\n",
boost::bind(&TcpConnectionHandler::handle_read,
shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void TcpConnectionHandler::write(const std::string& message) {
post(executor_,
boost::bind(&TcpConnectionHandler::writeImpl, shared_from_this(), message));
}
void TcpConnectionHandler::writeImpl(const std::string& message) {
outbox_.push_back(message);
if (outbox_.size() > 1) {
// outstanding async_write
return;
}
write();
}
void TcpConnectionHandler::write() {
const std::string& message = outbox_[0];
boost::asio::async_write(
socket_, boost::asio::buffer(message.c_str(), message.size()),
boost::asio::bind_executor(
executor_,
boost::bind(&TcpConnectionHandler::handle_write, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
}
void TcpConnectionHandler::handle_read(const boost::system::error_code& error,
size_t /*bytes_transferred*/) {
std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
// Check for client disconnection
if ((boost::asio::error::eof == error) ||
(boost::asio::error::connection_reset == error)) {
// LOG(ERROR) << log_prefix_ << " TCP/IP client disconnected!";
return;
}
// Convert stream to string
std::istream stream(&message_);
std::istreambuf_iterator<char> eos;
std::string message_str(std::istreambuf_iterator<char>(stream), eos);
// LOG(DEBUG) << log_prefix_ << " communication object received message: "
// << getPrintableMessage(message_str);
std::istringstream iss(message_str);
std::string msg;
std::getline(iss, msg, '\r'); // Consumes from the streambuf.
// Discard the rest of the message from buffer
message_.consume(message_.size());
if (!error) {
received_message_callback_(msg);
start();
} else {
// TODO: Handler here the error
}
}
void TcpConnectionHandler::handle_write(const boost::system::error_code& error,
size_t /*bytes_transferred*/) {
outbox_.pop_front();
if (error) {
std::cerr << "could not write: "
<< boost::system::system_error(error).what() << std::endl;
return;
}
if (!outbox_.empty()) {
// more messages to send
write();
}
}
// TcpServer
TcpServer::TcpServer(
std::string log_prefix, unsigned int port,
boost::function<void(std::string&)> received_message_callback)
: acceptor_(io_service_, boost::asio::ip::tcp::endpoint(
boost::asio::ip::tcp::v4(), port)),
client_connected_(false) {
log_prefix_ = log_prefix;
received_message_callback_ = received_message_callback;
start_accept();
// Run io_service in secondary thread
io_thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
}
TcpServer::~TcpServer() {
if (io_thread_.joinable()) {
//io_service_.stop();
//io_thread_.interrupt();
io_thread_.join();
}
}
void TcpServer::start() {
// Wait until client is connected to our TCP server. (condition variable)
boost::unique_lock<boost::mutex> lock(connection_mutex_);
while (!client_connected_) {
// LOG(INFO) << "Waiting for " << log_prefix_ << " client to establish
// connection...";
connection_cond_.wait(lock);
}
// LOG(INFO) << log_prefix_ << " client successfully connected.";
}
void TcpServer::write(std::string content) { connection_->write(content); }
void TcpServer::start_accept() {
// Create a new connection handler
connection_.reset(new TcpConnectionHandler(
log_prefix_, acceptor_.get_executor(), received_message_callback_));
// Asynchronous accept operation and wait for a new connection.
acceptor_.async_accept(connection_->socket(),
boost::bind(&TcpServer::handle_accept, this,
connection_,
boost::asio::placeholders::error));
// LOG(DEBUG) << log_prefix_ << " communication object started asynchronous
// TCP/IP connection acceptance.";
}
void TcpServer::handle_accept(
boost::shared_ptr<TcpConnectionHandler> connection,
const boost::system::error_code& error) {
std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
if (!error) {
// LOG(INFO) << log_prefix_ << " client connected!";
connection->start();
boost::mutex::scoped_lock lock(connection_mutex_);
client_connected_ = true;
connection_cond_.notify_one();
// LOG(INFO) << log_prefix_ << " client connection accepted";
}
start_accept();
}
} // namespace my_project
int main() {
my_project::TcpServer s("demo", 6868, [](std::string& s) {
std::cout << "Received msg: " << std::quoted(s) << "\n";
});
}
作为客户,例如
cat test.cpp | netcat -Cw 0 localhost 6868
打印整个像
Received msg: "//#define BOOST_BIND_GLOBAL_PLACEHOLDERS"
Received msg: "#include <boost/thread.hpp>"
Received msg: "std::string& message);"
Received msg: "td::string> outbox_;"
Received msg: ":shared_ptr<TcpConnectionHandler> connection_;"
Received msg: "#include \"utils.h\""
Received msg: "ConnectionHandler::start() {"
Received msg: "::writeImpl(const std::string& message) {"
Received msg: "s(),"
Received msg: " // LOG(ERROR) << log_prefix_ << \" TCP/IP client disconnected!\";"
Received msg: " // Consumes from the streambuf."
Received msg: "e: \""
Received msg: "nt_connected_(false) {"
Received msg: "d to our TCP server. (condition variable)"
Received msg: "ection handler"
Received msg: "nication object started asynchronous"
Received msg: "ond_.notify_one();"
如您所见,您必须修复您需要使用
bytes_received
的问题。考虑到这一点,但我暂时把它留给你。
connection_
,因为这意味着
write
作用于未连接的实例。也许您想保留已连接客户端的列表?
connection_
的简单返 union 员
list<weak_ptr<TcpConnectionHandler> > connections_;
.它甚至内置了基本的垃圾收集。
//#define BOOST_BIND_GLOBAL_PLACEHOLDERS
#define BOOST_BIND_NO_PLACEHOLDERS
#ifndef TCP_SERVER_
#define TCP_SERVER_
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <iostream>
namespace my_project {
namespace ph = boost::asio::placeholders;
using boost::system::error_code;
using boost::asio::ip::tcp;
using Executor = boost::asio::executor;
//using Executor = boost::asio::any_io_executor; // boost 1.74.0
using Callback = boost::function<void(std::string const&)>;
class TcpConnectionHandler : public boost::enable_shared_from_this<TcpConnectionHandler> {
public:
TcpConnectionHandler(Executor executor, Callback callback);
tcp::socket& socket() { return socket_; }
void start();
void write(std::string const& message);
~TcpConnectionHandler() { std::cerr << __FUNCTION__ << "\n"; }
private:
void writeImpl(const std::string& message);
void write_loop();
void handle_read(error_code error, size_t bytes_transferred);
void handle_write(error_code error, size_t bytes_transferred);
Executor executor_;
tcp::socket socket_;
boost::asio::streambuf message_;
Callback received_message_callback_;
std::list<std::string> outbox_;
};
using ConnectionPtr = boost::shared_ptr<TcpConnectionHandler>;
using ConnectionHandle = boost::weak_ptr<TcpConnectionHandler>;
class TcpServer {
public:
TcpServer(unsigned short port, Callback callback);
~TcpServer();
void write(std::string const& content);
private:
void start_accept();
void handle_accept(ConnectionPtr connection,
error_code error);
boost::asio::io_service io_service_;
std::list<ConnectionHandle> connections_;
tcp::acceptor acceptor_;
Callback received_message_callback_;
boost::thread io_thread_; /**< Thread to run boost.asio io_service. */
};
} // namespace my_project
#endif // #ifndef TCP_SERVER_
//#include "tcp_server.h"
//#include "easylogging++.h"
//#include "utils.h"
namespace my_project {
// TcpConnectionHandler
TcpConnectionHandler::TcpConnectionHandler(Executor executor, Callback callback)
: executor_(make_strand(executor)),
socket_(executor_),
received_message_callback_(callback)
{ }
void TcpConnectionHandler::start() {
async_read_until(socket_, message_, "\r\n",
boost::bind(&TcpConnectionHandler::handle_read,
shared_from_this(), ph::error, ph::bytes_transferred));
}
void TcpConnectionHandler::write(const std::string& message) {
//std::cerr << __FUNCTION__ << ": " << message.length() << "\n";
post(executor_, boost::bind(&TcpConnectionHandler::writeImpl, shared_from_this(), message));
}
void TcpConnectionHandler::writeImpl(const std::string& message) {
outbox_.push_back(message);
if (outbox_.size() == 1)
write_loop();
}
void TcpConnectionHandler::write_loop() {
boost::asio::async_write(
socket_, boost::asio::buffer(outbox_.front()),
boost::asio::bind_executor(
executor_,
boost::bind(&TcpConnectionHandler::handle_write, shared_from_this(),
ph::error, ph::bytes_transferred)));
}
void TcpConnectionHandler::handle_read(error_code error, size_t bytes_transferred) {
auto f = buffers_begin(message_.data()), l = f + bytes_transferred;
message_.consume(bytes_transferred);
std::istringstream iss(std::string(f, l));
for (std::string msg; std::getline(iss, msg, '\n');) {
if (msg.back() == '\r') {
msg.pop_back();
}
received_message_callback_(msg);
}; // Consumes from the streambuf.
if (!error) {
start();
} else {
std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
}
}
void TcpConnectionHandler::handle_write(error_code error, size_t /*bytes_transferred*/) {
outbox_.pop_front();
if (error) {
std::cerr << "could not write: " << error.message() << std::endl;
} else if (!outbox_.empty()) {
// more messages to send
write_loop();
}
}
// TcpServer
TcpServer::TcpServer(unsigned short port, Callback callback)
: acceptor_(io_service_, { {}, port }),
received_message_callback_(callback)
{
start_accept();
io_thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_));
}
TcpServer::~TcpServer() {
if (io_thread_.joinable()) {
//io_service_.stop();
//io_thread_.interrupt();
io_thread_.join();
}
}
void TcpServer::write(std::string const& content) {
for (auto& handle : connections_)
if (ConnectionPtr con = handle.lock())
con->write(content);
}
void TcpServer::start_accept() {
// optionally garbage-collect connection handles
connections_.remove_if(std::mem_fn(&ConnectionHandle::expired));
// Create a new connection handler
auto connection_ = boost::make_shared<TcpConnectionHandler>(
acceptor_.get_executor(), received_message_callback_);
// Asynchronous accept operation and wait for a new connection.
acceptor_.async_accept(connection_->socket(),
boost::bind(&TcpServer::handle_accept, this, connection_, ph::error));
}
void TcpServer::handle_accept(boost::shared_ptr<TcpConnectionHandler> connection, error_code error) {
//std::cerr << __FUNCTION__ << ": " << error.message() << "\n";
if (!error) {
connections_.push_back(connection);
connection->start();
}
start_accept();
}
} // namespace my_project
int main() {
my_project::TcpServer server(6868, [&server](std::string const& msg) {
std::cout << "Broadcasting msg: " << std::quoted(msg) << "\n";
server.write(msg + "\r\n");
});
}
使用模拟客户端:
(for a in 1 2 3; do echo -e "Foo $a\nBar $a\nQux $a" | nc -Cw1 localhost 6868 | while read line; do echo "Job $a: $line"; done& done;)
在服务器打印:
Broadcasting msg: "Foo 1"
Broadcasting msg: "Foo 2"
Broadcasting msg: "Bar 1"
Broadcasting msg: "Bar 2"
Broadcasting msg: "Foo 3"
Broadcasting msg: "Qux 1"
Broadcasting msg: "Qux 2"
Broadcasting msg: "Bar 3"
Broadcasting msg: "Qux 3"
handle_read: End of file
~TcpConnectionHandler
handle_read: End of file
~TcpConnectionHandler
handle_read: End of file
~TcpConnectionHandler
客户端打印(取决于时间):
Job 1: Foo 1
Job 2: Bar 1
Job 1: Bar 1
Job 2: Foo 2
Job 1: Foo 2
Job 3: Foo 3
Job 2: Qux 1
Job 1: Qux 1
Job 3: Bar 2
Job 2: Foo 3
Job 1: Foo 3
Job 3: Bar 3
Job 1: Bar 2
Job 2: Bar 2
Job 3: Qux 2
Job 1: Bar 3
Job 2: Bar 3
Job 3: Qux 3
Job 1: Qux 2
Job 2: Qux 2
Job 1: Qux 3
Job 2: Qux 3
关于c++ - 使用 boost 的 async_write 的异步 tcp 服务器导致文件描述符错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65285735/
我是 ZMQ 的新手。我发现 ZMQ 套接字实现比 winsock 简单得多。但我怀疑 “使用 ZMQ TCP 套接字创建的客户端可以与传统的 TCP 服务器通信吗?” 换句话说我的 ZMQ 客户端可
我想使用 TCP 协议(protocol) 将数据发送到 Logstash。为了发送数据,我正在使用 Node-RED。一个简单的配置如下所示: 在 Logstash 文件夹中,我创建了一个名为 no
当我尝试更改窗口缩放选项时,作为 root,我可以通过在 /proc/sys/net/中执行 net.ipv4.tcp_mem=16777000 来更改值。如果我必须更改这 100 个系统,那将需要大
明天做一些练习题,这道做不出来 TCP 服务器连接 TCP 客户端进行通信所需的最小套接字端口数是多少? 肯定只有两个吧?一个用于服务器,一个用于客户端,但这似乎是显而易见的。我的伙伴们认为 TCP
考虑一个存在一个服务器和多个客户端的场景。每个客户端创建 TCP 连接以与服务器交互。 TCP alive的三种用法: 服务器端保活:服务器发送 TCP 保活以确保客户端处于事件状态。如果客户端死了,
TCP TAHOE 和 TCP RENO 有什么区别。 我想知道的是关于 3-dup-ack 和超时的行为? SST 发生了什么变化? 谢谢! 最佳答案 TCP Tahoe 和 Reno 是处理 TC
大家早上好。我一直在阅读(其中大部分在堆栈溢出中)关于如何进行安全密码身份验证(散列 n 次,使用盐等)但我怀疑我将如何在我的 TCP 客户端中实际实现它-服务器架构。 我已经实现并测试了我需要的方法
在遍历 RFC793 时,我开始知道应该以这种方式选择初始序列号段重叠被阻止。 有人能解释一下如果发生重叠,重复段将如何影响 TCP? 最佳答案 不同的操作系统有不同的行为。参见 http://ins
你能举例说明一下tcp/ip中nagle算法的概念吗? 最佳答案 我认为Wikipedia在开头的段落中做得很好。 Nagle's document, Congestion Control in IP
似乎最大 TCP 接收窗口大小为 1GB(使用缩放时)。因此,仍然可以用一个连接填充 100Gb 管道的最大 RTT 是 40ms(因为 2 * 40E-3 * 100E9/8 = 1GB)。这会将这
考虑在两个 TCP 端点之间建立的 TCP 连接,其中一个调用: 关闭():此处,不允许进一步读取或写入。 关机(fd,SHUT_WR):这会将全双工连接转换为单工连接,其中调用 SHUT_WR 的端
我是在 Lua 中编写解析器的新手,我有两个简短的问题。我有一个包含 TCP 选项的数据包,如 MSS、TCP SACK、时间戳、NOP、窗口比例、未知。我基本上是在尝试剖析 TCP 选项字段中的未知
TCP 是否不负责通过在传输过程中发生丢失等情况时采取任何可能必要的措施来确保通过网络完整地发送流? 它做的不对吗? 为什么更高的应用层协议(protocol)及其应用程序仍然执行校验和? 最佳答案
考虑使用 10 Mbps 链路的单个 TCP (Reno) 连接。假设此链路不缓冲数据并且接收方的接收缓冲区比拥塞窗口大得多。设每个 TCP 段的大小为 1500 字节,发送方和接收方之间连接的双向传
考虑这样一个场景,有client-a和server-b。 server-b 禁用了 TCP keepalive。 server-b 没有任何应用程序逻辑来检查 TCP 连接是否打开。 client-a
我正在尝试用 Rust 编写回显服务器。 use std::net::{TcpStream, TcpListener}; use std::io::prelude::*; fn main() {
听说对于TCP连接,服务器会监听一个端口,并使用另一个端口发送数据。 例如,Web 服务器监听端口 80。每当客户端连接到它时,该服务器将使用另一个端口(比如 9999)向客户端发送数据(Web 内容
我试图了解带有标记 PSH 和标记 URG 的 TCP 段之间的区别。我阅读了 RFC,但仍然无法理解,其中一个在将数据发送到进程之前缓冲数据而另一个没有吗? 最佳答案 它们是两种截然不同的机制。 #
有第三方服务公开 TCP 服务器,我的 Node 服务器(TCP 客户端)应使用 tls Node 模块与其建立 TCP 连接。作为 TCP 客户端, Node 服务器同时也是 HTTP 服务器,它应
我正在发送一些 TCP SYN 数据包以获得 TCP RST 的返回。为了识别每个探测器,我在 TCP 序列字段中包含一个计数器。我注意到以下几点: 当SYN probe中的sequence numb
我是一名优秀的程序员,十分优秀!