gpt4 book ai didi

c++ - Boost ASIO async_read 不从客户端读取数据

转载 作者:行者123 更新时间:2023-11-28 01:39:01 25 4
gpt4 key购买 nike

我有一个服务器/客户端应用程序,用于从客户端写入到服务器读取。

在服务器代码的 startHandlig 函数中,如果我注释 async_connect_1 并在其后返回,那么它工作正常,涉及同步写入函数。

我在 Service() 类中添加了 async_connect_1 函数以从套接字异步读取数据。

当客户端连接到服务器时调用此函数,此函数立即返回。

我希望async_read对应的回调函数被调用,但是并没有发生...

我很长一段时间以来一直坚持这一点..感谢这方面的帮助...

服务器代码

#include <boost/asio.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/bind.hpp>
#include <boost/serialization/vector.hpp>
#include <boost/tuple/tuple.hpp>

#include <thread>
#include <atomic>
#include <memory>
#include <iostream>

#include "../stocks.hpp"
using namespace boost;


class Service {
public:
Service(){}

void StartHandligClient(
std::shared_ptr<asio::ip::tcp::socket> sock) {

read_async_1(sock);
return;

std::thread th(([this, sock]() {
HandleClient(sock);
}));

std::cout << "Detached \n";

th.detach();
}

private:
void read_async_1(std::shared_ptr<asio::ip::tcp::socket> sock)
{
if(!(*sock.get()).is_open())
{
std::cout << getpid() << " : Socket closed in sync_read \n" << std::flush;
return ;
}

std::cout << "haha_1\n" << std::flush;

boost::asio::async_read( (*sock.get()), boost::asio::buffer(inbound_header_),
[this](boost::system::error_code ec,
size_t bytesRead)
{
std::cout << "haha_2\n" << std::flush;

if (!ec)
{
int headerBytesReceived = bytesRead;

std::cout << "\n\n headerBytesReceived : " << headerBytesReceived << "\n" << std::flush ;
// this->async_read(sock);

}
else
{
// Terminate connection ?
if(ec == boost::asio::error::eof)
{
std::cout << getpid() << " : ** sync_read : Connection lost : boost::asio::error::eof ** \n";
}
std::cout << "Error occured in sync_read! Error code = " << ec.value() << ". Message: " << ec.message() << "\n" << std::flush;

return ;
}
return ;
}
);
std::cout << getpid() << " : final return from async_read \n" << std::flush;
return ;
}
void HandleClient(std::shared_ptr<asio::ip::tcp::socket> sock) {
while(1)
{
try {
// asio::streambuf request;
// asio::read_until(*sock.get(), request, '\n');

int headerBytesReceived = asio::read( *sock.get(), boost::asio::buffer(inbound_header_) );
std::cout << "headerBytesReceived : " << headerBytesReceived << "\n" << std::flush;

// Determine the length of the serialized data.
std::istringstream is(std::string(inbound_header_, header_length));
std::cout << "is : " << is.str() << ", inbound_header_ : " << inbound_header_ << "\n";

std::size_t inbound_data_size = 0;
if (!(is >> std::hex >> inbound_data_size))
{
// Header doesn't seem to be valid. Inform the caller.
// boost::system::error_code error(boost::asio::error::invalid_argument);
// boost::get<0>(handler)(error);
std::cout << "RET-1 \n";
return;
}

std::cout << "inbound_data_size : " << inbound_data_size << "\n" << std::flush;

// Start an asynchronous call to receive the data.
inbound_data_.resize(inbound_data_size);

std::cout << "inbound_data_.size() : " << inbound_data_.size() << "\n" << std::flush;

int bytesReceived = asio::read( *sock.get(), boost::asio::buffer(inbound_data_) );

std::string archive_data(&inbound_data_[0], inbound_data_.size());
std::istringstream archive_stream(archive_data);
boost::archive::text_iarchive archive(archive_stream);
archive >> stocks_;


std::cout << "bytesReceived : " << bytesReceived << " , stocks_.size() : " << stocks_.size() << "\n";
// Print out the data that was received.
for (std::size_t i = 0; i < stocks_.size(); ++i)
{
std::cout << "Stock number " << i << "\n";
std::cout << " code: " << stocks_[i].code << "\n";
std::cout << " name: " << stocks_[i].name << "\n";
std::cout << " open_price: " << stocks_[i].open_price << "\n";
std::cout << " high_price: " << stocks_[i].high_price << "\n";
std::cout << " low_price: " << stocks_[i].low_price << "\n";
std::cout << " last_price: " << stocks_[i].last_price << "\n";
std::cout << " buy_price: " << stocks_[i].buy_price << "\n";
std::cout << " buy_quantity: " << stocks_[i].buy_quantity << "\n";
std::cout << " sell_price: " << stocks_[i].sell_price << "\n";
std::cout << " sell_quantity: " << stocks_[i].sell_quantity << "\n";
}
}
catch (system::system_error &e)
{
boost::system::error_code ec = e.code();
if(ec == boost::asio::error::eof)
{
std::cout << "EOF Error \n";
}
std::cout << "Server Error occured! Error code = "
<< e.code() << ". Message: "
<< e.what() << "\n";
break;
}
}

// Clean-up.
delete this;
}
/// The size of a fixed length header.
enum { header_length = 8 };

/// Holds an outbound header.
std::string outbound_header_;

/// Holds the outbound data.
std::string outbound_data_;

/// Holds an inbound header.
char inbound_header_[header_length];

/// Holds the inbound data.
std::vector<char> inbound_data_;
std::vector<stock> stocks_;
};

class Acceptor {
public:
Acceptor(asio::io_service& ios, unsigned short port_num) :
m_ios(ios),
m_acceptor(m_ios,
asio::ip::tcp::endpoint(
asio::ip::address_v4::any(),
port_num))
{
m_acceptor.listen();
}

void Accept() {
std::cout << "Server Accept() \n" << std::flush;
std::shared_ptr<asio::ip::tcp::socket>
sock(new asio::ip::tcp::socket(m_ios));

m_acceptor.accept(*sock.get());

(new Service)->StartHandligClient(sock);
}

private:
asio::io_service& m_ios;
asio::ip::tcp::acceptor m_acceptor;
};

class Server {
public:
Server() : m_stop(false) {}

void Start(unsigned short port_num) {
m_thread.reset(new std::thread([this, port_num]() {
Run(port_num);
}));
}

void Stop() {
m_stop.store(true);
m_thread->join();
}

private:
void Run(unsigned short port_num) {
Acceptor acc(m_ios, port_num);

while (!m_stop.load()) {
std::cout << "Server accept\n" << std::flush;
acc.Accept();
}
}

std::unique_ptr<std::thread> m_thread;
std::atomic<bool> m_stop;
asio::io_service m_ios;
};

int main()
{
unsigned short port_num = 3333;

try {
Server srv;
srv.Start(port_num);

std::this_thread::sleep_for(std::chrono::seconds(100));

std::cout << "Stopping server \n";

srv.Stop();
}
catch (system::system_error &e) {
std::cout << "Error occured! Error code = "
<< e.code() << ". Message: "
<< e.what();
}

return 0;
}

客户端代码

#include <boost/asio.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/bind.hpp>
#include <boost/serialization/vector.hpp>
#include <iostream>

#include "../stocks.hpp"
using namespace boost;

class SyncTCPClient {
public:
SyncTCPClient(const std::string& raw_ip_address,
unsigned short port_num) :
m_ep(asio::ip::address::from_string(raw_ip_address),
port_num),
m_sock(m_ios) {

m_sock.open(m_ep.protocol());
}

void connect() {
m_sock.connect(m_ep);
}

void close() {
m_sock.shutdown(
boost::asio::ip::tcp::socket::shutdown_both);
m_sock.close();
}

std::string emulateLongComputationOp(
unsigned int duration_sec) {

std::string request = "EMULATE_LONG_COMP_OP "
+ std::to_string(duration_sec)
+ "\n";

sendRequest(request);
return receiveResponse();
};

private:
void sendRequest(const std::string& request) {

std::vector<stock> stocks_;
// Create the data to be sent to each client.
stock s;
s.code = "ABC";
s.name = "A Big Company";
s.open_price = 4.56;
s.high_price = 5.12;
s.low_price = 4.33;
s.last_price = 4.98;
s.buy_price = 4.96;
s.buy_quantity = 1000;
s.sell_price = 4.99;
s.sell_quantity = 2000;
stocks_.push_back(s);

// Serialize the data first so we know how large it is.
std::ostringstream archive_stream;
boost::archive::text_oarchive archive(archive_stream);
archive << stocks_;
outbound_data_ = archive_stream.str();
std::cout << "outbound_data_ : " << outbound_data_ << "\n" << std::flush;
std::cout << "outbound_data_.size() : " << outbound_data_.size() << "\n" << std::flush;

// Format the header.
std::ostringstream header_stream;
header_stream << std::setw(header_length) << std::hex << outbound_data_.size();

std::cout << "header_stream.str() : " << header_stream.str() << "\n" << std::flush;
std::cout << "header_stream.str().size() : " << header_stream.str().size() << "\n" << std::flush;

if (!header_stream || header_stream.str().size() != header_length)
{
// Something went wrong, inform the caller.
// boost::system::error_code error(boost::asio::error::invalid_argument);
// socket_.get_io_service().post(boost::bind(handler, error));
return;
}

outbound_header_ = header_stream.str();
std::cout << "outbound_header_ : " << outbound_header_ << "\n" << std::flush;

// Write the serialized data to the socket. We use "gather-write" to send
// both the header and the data in a single write operation.
std::vector<boost::asio::const_buffer> buffers;
buffers.push_back(boost::asio::buffer(outbound_header_));
buffers.push_back(boost::asio::buffer(outbound_data_));
std::size_t sizeSent = asio::write(m_sock, buffers);
std::cout << "sizeSent : " << sizeSent << "\n" << std::flush;

}

std::string receiveResponse() {
std::string response;
/*
asio::streambuf buf;
asio::read_until(m_sock, buf, '\n');

std::istream input(&buf);

std::getline(input, response);
*/

return response;
}

private:
asio::io_service m_ios;

asio::ip::tcp::endpoint m_ep;
asio::ip::tcp::socket m_sock;
enum { header_length = 8 };
std::string outbound_data_;
std::string outbound_header_;

};

int main()
{
const std::string raw_ip_address = "127.0.0.1";
const unsigned short port_num = 3333;

try {
SyncTCPClient client(raw_ip_address, port_num);

// Sync connect.
client.connect();

sleep(1);

std::cout << "Sending request to the server... "
<< std::endl;

std::string response = client.emulateLongComputationOp(10);

std::cout << "Response received: " << response << std::endl;

sleep(100);
std::cout << "\n\n Closing client connection \n\n";

// Close the connection and free resources.
client.close();
}
catch (system::system_error &e) {
std::cout << "Client Error occured! Error code = " << e.code()
<< ". Message: " << e.what();

return e.code().value();
}

return 0;
}

包含文件 (stocks.hpp)

#ifndef _STOCKS_HPP_
#define _STOCKS_HPP_

struct stock
{
std::string code;
std::string name;
double open_price;
double high_price;
double low_price;
double last_price;
double buy_price;
int buy_quantity;
double sell_price;
int sell_quantity;

template <typename Archive>
void serialize(Archive& ar, const unsigned int version)
{
ar & code;
ar & name;
ar & open_price;
ar & high_price;
ar & low_price;
ar & last_price;
ar & buy_price;
ar & buy_quantity;
ar & sell_price;
ar & sell_quantity;
}
};

#endif

最佳答案

你已经写了 Error code = 125. Message: Operation canceled 作为先前响应中的评论,我认为套接字可能会在异步操作完成之前关闭。

你的套接字的生命周期是多少?

[1] 套接字在Accept 方法中创建

    std::shared_ptr<asio::ip::tcp::socket>
sock(new asio::ip::tcp::socket(m_ios)); // ref count +1
//...
(new Service)->StartHandligClient(sock); // this function returns immediately
// so socket's ref count -1

[2] 在 StartHandligClient()sock按值传递,所以socket的ref计数+1,但是

    void StartHandligClient(
std::shared_ptr<asio::ip::tcp::socket> sock) { // +1 ref count

read_async_1(sock); // this function returns immediately
return; // -1 ref count of socket
}

[3] read_async_1 套接字按值传递,套接字的引用计数+1,但此函数立即返回,当函数结束时,引用计数减少并删除套接字对象。

您创建了 lambda 对象来执行异步操作,但 socket 对象可能会在执行之前关闭。

关于c++ - Boost ASIO async_read 不从客户端读取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48128834/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com