gpt4 book ai didi

c++ - Protobuf、CodedInputStream 解析部分消息

转载 作者:行者123 更新时间:2023-11-30 01:51:01 28 4
gpt4 key购买 nike

我正在尝试实现与 java 版本兼容的 protobuf 发送/接收,它首先包含一个 varint32 前缀。

我几乎让它工作了,但由于某些原因,一些消息变得不完整并且无法通过 assert()。

/receiver.cpp:69: void tcp_connection::handle_read_message(const boost::system::error_code&, size_t): Assertion `line.ParseFromCodedStream(&input)' failed.

semder.pp

boost::asio::streambuf buffer;
std::ostream writer(&buffer);
bool packet_full = false;
uint32_t sent_lines = 0;
{ //new scope for protobuf streams, these flush in dtor
google::protobuf::io::OstreamOutputStream osostream(&writer);
google::protobuf::io::CodedOutputStream output(&osostream);
std::string lines;
while(std::getline(reader, line)) {
lines += line + "\n";
++sent_lines;
if(sent_lines > 100) {
packet_full = true;
break;
}
}
if(!lines.empty()) {
msg->set_text(lines);
const uint32_t size = msg->ByteSize();
output.WriteVarint32(size);
uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
if(buffer != 0) {
msg->SerializeWithCachedSizesToArray(buffer);
} else {
msg->SerializeWithCachedSizes(&output);
}
}
if(sent_lines > 0) {
sock.send(buffer.data());
if(!packet_full && !reader.eof()) { //Read ended, and not due to end of file
std::cout << "An error occured" << std::endl;
break;
}
reader.clear(); //clear EOF flag
}

接收器.cpp

这是一个 boost asio 回调。

成员变量:

boost::asio::ip::tcp::socket socket_;
boost::asio::streambuf buffer_;

代码

void handle_read_message(const boost::system::error_code& error,
size_t bytes_transferred) {


if(!error) {
buffer_.commit(bytes_transferred);
std::istream reader(&buffer_);
google::protobuf::io::IstreamInputStream isistream(&reader);
google::protobuf::io::CodedInputStream input(&isistream);
uint32_t size = 0;
assert(input.ReadVarint32(&size));
auto limit = input.PushLimit(size);
msgs::Line line;
assert(line.ParseFromCodedStream(&input));
assert(input.ConsumedEntireMessage());
input.PopLimit(limit);

start();
} else {
std::cout <<"error during handle_read_message: " << error << std::endl;
}
}

这主要是基于https://stackoverflow.com/a/22899712

编辑:新的receiver版本,reader_现在是一个成员变量:

void handle_read_message(const boost::system::error_code& error,
size_t bytes_transferred) {
std::cout << "handle_read_message(" << bytes_transferred << ")" <<std::endl;
if(!error) {
buffer_.commit(bytes_transferred);
uint32_t size = 0;
google::protobuf::io::IstreamInputStream isistream_(&reader_);
{
google::protobuf::io::CodedInputStream input(&isistream_);
if(!input.ReadVarint32(&size)) {
std::cout << "Failed to read size, waiting for more data" << std::endl;
start();
return;
}
}
std::size_t varint_size = isistream_.ByteCount();
std::cout <<"varintsize: " << varint_size << ", size: " << size << ", have bytes: " << buffer_.size() << std::endl;
if(varint_size + size > buffer_.size()) {
std::cout << "Not enough data received, waiting for more" << std::endl;
start();
return;
}
google::protobuf::io::CodedInputStream input(&isistream_);
auto limit = input.PushLimit(size);
msgs::Line line;
assert(line.ParseFromCodedStream(&input));
std::cout << line.text() << std::endl;
assert(input.ConsumedEntireMessage());
input.PopLimit(limit);

start();
} else {
std::cout <<"error during handle_read_message: " << error << std::endl;
}
}

最佳答案

如果您在接收端使用异步 I/O,则需要确保在开始解析之前您确实收到了整个消息。请记住,TCP 连接是一个流。只要有可用数据,异步回调就会运行——即使数据不完整。您可能只收到部分消息,也可能收到整条消息加上下一条消息的一部分。这就是为什么首先需要 readDelimitedFrom() 的原因:确定在解析之前您需要等待多少字节。

因此,在使用异步 I/O 时,您需要以不同的方式编写代码。您可以使用这样的策略:

  • 维护一个缓冲区,其中包含您目前收到的所有字节。
  • 每次收到更多字节时,将它们添加到缓冲区。然后,开始尝试按如下方式解析它们——您必须始终从头开始,使用全新的 ZeroCopyInputStreamCodedInputStream
  • 然后,尝试使用 ReadVarint32() 读取大小。如果 ReadVarint32 失败,那么您还没有收到完整的大小,因此请停止并等待更多字节。
  • 如果 ReadVarint32() 成功,销毁 CodedInputStream,然后在底层 ZeroCopyInputStream 上调用 ByteCount()找出 varint 消耗了多少字节。
  • 您现在知道消息的大小和 varint 前缀的大小。将它们加在一起。如果您的缓冲区中的字节数少于此数量,请停止并等待更多字节。
  • 您现在拥有消息的所有字节。继续将它们从缓冲区中拉出并解析它们。请注意,如果缓冲区中的字节数多于消息的大小,则应将多余的字节保留在缓冲区中,因为它们是下一条消息的一部分。

(此外:您的 sender.cpp 代码中似乎缺少右大括号。如果原始文件有相同的错误,则可能是您在 CodedOutputStream 刷新之前发送数据。但我猜错误不在原文中。)

关于c++ - Protobuf、CodedInputStream 解析部分消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26655733/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com