gpt4 book ai didi

c++ - 尝试非常频繁地发送数据包时,boost::asio 数据包传输失败

转载 作者:太空狗 更新时间:2023-10-29 21:30:05 27 4
gpt4 key购买 nike

最终,我试图将缓冲区从一台机器传输到另一台机器。下面的代码采用 <id><size><data with size bytes> 的流并读取 handleReadHeader 函数中的部分,然后读取 <size>字节数,然后返回并等待另一个 <id><size>一对。我粘贴了很多代码,但我真正怀疑的唯一功能是:
下行链路::addMsgToQueue
下行链路::writeCallback
下行链路::startWrites()下行链路::handleReadHeader
下行链路::handleReadFrameDataBGR

using namespace std;
using namespace boost;
using namespace boost::asio;

Downlink::Downlink() :
socket(nIO),
headerSize(sizeof(unsigned int)+1),
connected(false),
isWriting(false),
readHeaderBuffer(headerSize)
{}

Downlink::~Downlink() {
disconnect();
}

bool Downlink::connect(const std::string &robotHost, unsigned int port) {
disconnect();

ip::tcp::resolver resolver(nIO);
ip::tcp::resolver::query query(robotHost, lexical_cast<string>(port));
ip::tcp::resolver::iterator iterator = resolver.resolve(query);

ip::tcp::resolver::iterator end;
boost::system::error_code ec;
for(;iterator!=end;++iterator) {
socket.connect(*iterator, ec);
if(!ec)
break;
socket.close();
}
if(!socket.is_open())
return false;

async_read(socket, buffer(readHeaderBuffer),
bind(&Downlink::handleReadHeader, this, _1, _2));

//start network thread.
lock_guard<mutex> l(msgMutex);
outgoingMessages = queue<vector<char> >();
nIO.reset();
t = thread(bind(&boost::asio::io_service::run, &nIO));
connected = true;
return true;
}

bool Downlink::isConnected() const {
return connected;
}

void Downlink::disconnect() {
nIO.stop();
t.join();
socket.close();
connected = false;
isWriting = false;
nIO.reset();
nIO.run();
}

void Downlink::writeToLogs(const std::string &logMsg) {
vector<char> newMsg(logMsg.length()+headerSize);
newMsg[0] = MSG_WRITE_LOG;
const unsigned int msgLen(logMsg.length());
memcpy(&newMsg[1], &msgLen, sizeof(unsigned int));
vector<char>::iterator dataBegin = newMsg.begin();
advance(dataBegin, headerSize);
copy(logMsg.begin(), logMsg.end(), dataBegin);
assert(newMsg.size()==(headerSize+logMsg.length()));
addMsgToQueue(newMsg);
}

void Downlink::addMsgToQueue(const std::vector<char> &newMsg) {
lock_guard<mutex> l(msgMutex);
outgoingMessages.push(newMsg);
lock_guard<mutex> l2(outMutex);
if(!isWriting) {
nIO.post(bind(&Downlink::startWrites, this));
}
}

void Downlink::writeCallback(const boost::system::error_code& error,
std::size_t bytes_transferred) {
if(error) {
disconnect();
lock_guard<mutex> l(msgMutex);
outgoingMessages = queue<vector<char> >();
return;
}
{
lock_guard<mutex> l2(outMutex);
isWriting = false;
}
startWrites();
}


void Downlink::startWrites() {
lock_guard<mutex> l(msgMutex);
lock_guard<mutex> l2(outMutex);
if(outgoingMessages.empty()) {
isWriting = false;
return;
}

if(!isWriting) {
currentOutgoing = outgoingMessages.front();
outgoingMessages.pop();
async_write(socket, buffer(currentOutgoing),
bind(&Downlink::writeCallback, this, _1, _2));
isWriting = true;
}
}

void Downlink::handleReadHeader(const boost::system::error_code& error,
std::size_t bytes_transferred) {
//TODO: how to handle disconnect on errors?
cout<<"handleReadHeader"<<endl;
if(error) {
return;
}
assert(bytes_transferred==headerSize);
if(bytes_transferred!=headerSize) {
cout<<"got "<<bytes_transferred<<" while waiting for a header."<<endl;
}
currentPacketID = readHeaderBuffer[0];

memcpy(&currentPacketLength, &readHeaderBuffer[1], sizeof(unsigned int));
dataStream.resize(currentPacketLength);
switch(currentPacketID) {
case MSG_FRAME_BGR: {
cout<<"- >> gone to read frame. ("<<currentPacketLength<<")"<<endl;
async_read(socket, asio::buffer(dataStream),
boost::asio::transfer_at_least(currentPacketLength),
bind(&Downlink::handleReadFrameDataBGR, this, _1, _2));
} break;
default: {
cout<<"->>> gone to read other. ("<<currentPacketLength<<")"<<endl;
cout<<" "<<(int)currentPacketID<<endl;
async_read(socket, asio::buffer(dataStream),
boost::asio::transfer_at_least(currentPacketLength),
bind(&Downlink::handleReadData, this, _1, _2));
} break;
}
}

void Downlink::handleReadData(const boost::system::error_code& error,
std::size_t bytes_transferred) {
cout<<"handleReadData"<<endl;
if(error) {
return;
}
if(bytes_transferred!=currentPacketLength) {
cout<<"Got "<<bytes_transferred<<" wanted "<<currentPacketLength<<endl;
}
assert(bytes_transferred==currentPacketLength);

switch(currentPacketID) {
case MSG_ASCII: {
string msg(dataStream.begin(), dataStream.end());
textCallback(&msg);
} break;
case MSG_IMU: {
Eigen::Vector3d a,g,m;
unsigned int stamp;
memcpy(a.data(), &dataStream[0], sizeof(double)*3);
memcpy(m.data(), &dataStream[0]+sizeof(double)*3, sizeof(double)*3);
memcpy(g.data(), &dataStream[0]+sizeof(double)*6, sizeof(double)*3);
memcpy(&stamp, &dataStream[0]+sizeof(double)*9, sizeof(unsigned int));
imuCallback(a,m,g,stamp);
} break;
default:
//TODO: handle this better?
cout<<"Unknown packet ID."<<endl;
}

async_read(socket, buffer(readHeaderBuffer),
boost::asio::transfer_at_least(headerSize),
bind(&Downlink::handleReadHeader, this, _1, _2));
}

void Downlink::handleReadFrameDataBGR(const boost::system::error_code& error,
std::size_t bytes_transferred) {
cout<<"Got a frame"<<endl;
if(error) {
return;
}
if(bytes_transferred!=currentPacketLength) {
cout<<"Got "<<bytes_transferred<<" wanted "<<currentPacketLength<<endl;
}
assert(bytes_transferred==currentPacketLength);
unsigned int imageWidth, imageHeight, cameraID;

unsigned char *readOffset = (unsigned char*)&dataStream[0];
memcpy(&imageWidth, readOffset, sizeof(unsigned int));
readOffset += sizeof(unsigned int);
memcpy(&imageHeight, readOffset, sizeof(unsigned int));
readOffset += sizeof(unsigned int);
memcpy(&cameraID, readOffset, sizeof(unsigned int));
readOffset += sizeof(unsigned int);

cout<<"("<<imageWidth<<"x"<<imageHeight<<") ID = "<<cameraID<<endl;

frameCallback(readOffset, imageWidth, imageHeight, cameraID);

async_read(socket, buffer(readHeaderBuffer),
boost::asio::transfer_at_least(headerSize),
bind(&Downlink::handleReadHeader, this, _1, _2));
}


boost::signals2::connection Downlink::connectTextDataCallback(boost::signals2::signal<void (std::string *)>::slot_type s) {
return textCallback.connect(s);
}

boost::signals2::connection Downlink::connectIMUDataCallback(boost::signals2::signal<void (Eigen::Vector3d, Eigen::Vector3d, Eigen::Vector3d, unsigned int)>::slot_type s) {
return imuCallback.connect(s);
}

boost::signals2::connection Downlink::connectVideoFrameCallback(boost::signals2::signal<void (unsigned char *, unsigned int, unsigned int, unsigned int)>::slot_type s) {
return frameCallback.connect(s);
}

这是另一端的代码。它几乎与其他代码完全相同,但错误可能在两端。

using namespace std;
using namespace boost;
using namespace boost::asio;

Uplink::Uplink(unsigned int port) :
socket(nIO),
acceptor(nIO),
endpoint(ip::tcp::v4(), port),
headerSize(sizeof(unsigned int)+1), //id + data size
headerBuffer(headerSize)
{
//move socket into accept state.
acceptor.open(endpoint.protocol());
acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
acceptor.bind(endpoint);
acceptor.listen(1); //1 means only one client in connect queue.
acceptor.async_accept(socket, bind(&Uplink::accept_handler, this, _1));
//start network thread.
nIO.reset();
t = thread(boost::bind(&boost::asio::io_service::run, &nIO));
}

Uplink::~Uplink() {
nIO.stop(); //tell the network thread to stop.
t.join(); //wait for the network thread to stop.
acceptor.close(); //close listen port.
socket.close(); //close active connections.
nIO.reset();
nIO.run(); //let clients know that we're disconnecting.
}

void Uplink::parse_header(const boost::system::error_code& error,
std::size_t bytes_transferred) {
if(error || bytes_transferred!=headerSize) {
disconnect();
return;
}
currentPacketID = headerBuffer[0];
memcpy(&currentPacketLength, &headerBuffer[1], sizeof(unsigned int));
//move to read data state

//TODO: move to different states to parse various packet types.

async_read(socket, asio::buffer(dataStream), transfer_at_least(currentPacketLength),
bind(&Uplink::parse_data, this, _1, _2));
}

void Uplink::parse_data(const boost::system::error_code& error,
std::size_t bytes_transferred) {
if(error) {
disconnect();
return;
}

if(bytes_transferred != currentPacketLength) {
cout<<"bytes_transferred != currentPacketLength"<<endl;
disconnect();
return;
}

//move back into the header reading state
async_read(socket, buffer(headerBuffer),
bind(&Uplink::parse_header, this, _1, _2));
}

void Uplink::disconnect() {
acceptor.close();
socket.close();
acceptor.open(endpoint.protocol());
acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
acceptor.bind(endpoint);
acceptor.listen(1); //1 means only one client in connect queue.
acceptor.async_accept(socket, bind(&Uplink::accept_handler, this, _1));
}

void Uplink::accept_handler(const boost::system::error_code& error)
{
if (!error) {
//no more clents.
acceptor.close();
//move to read header state.
async_read(socket, buffer(headerBuffer),
bind(&Uplink::parse_header, this, _1, _2));
}
}

void Uplink::sendASCIIMessage(const std::string &m) {
//Format the message
unsigned int msgLength(m.length());
vector<char> outBuffer(msgLength+headerSize);
outBuffer[0] = MSG_ASCII;
memcpy(&outBuffer[1], &msgLength, sizeof(unsigned int));
vector<char>::iterator dataBegin(outBuffer.begin());
advance(dataBegin, headerSize);
copy(m.begin(), m.end(), dataBegin);
//queue the message
addToQueue(outBuffer);
}

void Uplink::sendIMUDataBlock(const nIMUDataBlock *d) {
//Format the message.
//a,g,m, 3 components each plus a stamp
const unsigned int msgLength(3*3*sizeof(double)+sizeof(unsigned int));
vector<char> outBuffer(msgLength+headerSize);
outBuffer[0] = MSG_IMU;
memcpy(&outBuffer[1], &msgLength, sizeof(unsigned int));

const Eigen::Vector3d a(d->getAccel());
const Eigen::Vector3d m(d->getMag());
const Eigen::Vector3d g(d->getGyro());
const unsigned int s(d->getUpdateStamp());

memcpy(&outBuffer[headerSize], a.data(), sizeof(double)*3);
memcpy(&outBuffer[headerSize+3*sizeof(double)], m.data(), sizeof(double)*3);
memcpy(&outBuffer[headerSize+6*sizeof(double)], g.data(), sizeof(double)*3);
memcpy(&outBuffer[headerSize+9*sizeof(double)], &s, sizeof(unsigned int));

/*
cout<<"----------------------------------------"<<endl;
cout<<"Accel = ("<<a[0]<<","<<a[1]<<","<<a[2]<<")"<<endl;
cout<<"Mag = ("<<m[0]<<","<<m[1]<<","<<m[2]<<")"<<endl;
cout<<"Gyro = ("<<g[0]<<","<<g[1]<<","<<g[2]<<")"<<endl;
cout<<"Stamp = "<<s<<endl;
cout<<"----------------------------------------"<<endl;
*/

//queue the message
addToQueue(outBuffer);
}

void Uplink::send_handler(const boost::system::error_code& error,
std::size_t bytes_transferred) {
{
lock_guard<mutex> l(queueLock);
lock_guard<mutex> l2(sendingLock);
if(outQueue.empty()) {
currentlySending = false;
return;
}
}
startSend();
}

void Uplink::addToQueue(const std::vector<char> &out) {
bool needsRestart = false;
{
lock_guard<mutex> l(queueLock);
lock_guard<mutex> l2(sendingLock);
outQueue.push(out);
needsRestart = !currentlySending;
}
if(needsRestart)
nIO.post(bind(&Uplink::startSend, this));
}

void Uplink::startSend() {
lock_guard<mutex> l(queueLock);
lock_guard<mutex> l2(sendingLock);
if(outQueue.empty())
return;
currentlySending = true;
currentWrite = outQueue.front();
outQueue.pop();
async_write(socket, buffer(currentWrite), bind(&Uplink::send_handler,
this, _1, _2));
}

void Uplink::sendVideoFrameBGR(const unsigned int width, const unsigned int height,
const unsigned int cameraID, const unsigned char *frameData) {
// image data image metadata header
const unsigned int packetSize(width*height*3 + sizeof(unsigned int)*3 + headerSize);
const unsigned int dataSize(width*height*3 + sizeof(unsigned int)*3);
vector<char> outgoingBuffer(packetSize);
outgoingBuffer[0] = MSG_FRAME_BGR;
memcpy(&outgoingBuffer[1], &dataSize, sizeof(unsigned int));
char *writePtr = &outgoingBuffer[headerSize];
memcpy(writePtr, &width, sizeof(unsigned int));
writePtr += sizeof(unsigned int);
memcpy(writePtr, &height, sizeof(unsigned int));
writePtr += sizeof(unsigned int);
memcpy(writePtr, &cameraID, sizeof(unsigned int));
writePtr += sizeof(unsigned int);
memcpy(writePtr, frameData, width*height*3*sizeof(char));

//TODO: can we avoid the whole image copy here?
//TODO: should come up with a better packet buffer build system.
//IDEA!: maybe have a "request buffer" funxction so the Uplink
//class can have sole ownership, rather than do the copy in "addtoQueue"
addToQueue(outgoingBuffer);
}

这个程序大部分时间都有效,但很少,当发送大量数据且数据包之间没有延迟时,它会失败。例如:

sendVideoFrameBGR(...);  //occasional fail
sendASCIIMessage("...");

sendVideoFrameBGR(...); //never fails.
sleep(1);
sendASCIIMessage("...");

在下行链路中处理视频帧后,它返回到 hadleHeaderData 并等待长度为几兆字节的数据包和不存在的数据包 ID。流以某种方式被破坏。我不知道为什么。

我不太关心我现在编写的代码,所以如果有人知道一个好的类或库可以为我将 TCP 上的流解析为缓冲区 block ,我宁愿使用它。

编辑:
这是运行数据发送的确切代码:

    if(frontImage) {
uplink.sendVideoFrameBGR(frontImage->width, frontImage->height, 0,
(unsigned char*)frontImage->imageData);
cout<<"Sent"<<endl;
//sleep(1); //works fine if this is uncommented !
}

uplink.sendASCIIMessage("Alive...");
sleep(1);
uplink.sendIMUDataBlock(imuDataBlock.get());
cout<<"Loop"<<endl;
sleep(1);
}

最佳答案

问题很可能是你的 ioservice 对象有多个线程处理工作。

当您在第一个发送函数之后立即调用第二个发送函数时,发布到 ioservice 的两个函数对象可能被委托(delegate)给不同的线程。所以基本上,两个写入并行发生在同一个套接字上。这很可能是非法的。将 Winsock2 与非阻塞套接字一起使用,这会导致传出数据损坏。

即使您使用 bool 来检查它当前是否正在发送,在 ioservice 线程之一处理函数之前也不会检查 bool。如果在您发布这两项工作时有两个 ioservice 线程处于事件状态,它可能会同时分派(dispatch)两个发送,导致两个发送函数在不同的线程上异步发生。 “当前正在发送”检查可能在两个调用中都返回 false,因为两个发送是并行运行的。

关于c++ - 尝试非常频繁地发送数据包时,boost::asio 数据包传输失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3444551/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com