- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
最终,我试图将缓冲区从一台机器传输到另一台机器。下面的代码采用 <id><size><data with size bytes>
的流并读取 handleReadHeader 函数中的部分,然后读取 <size>
字节数,然后返回并等待另一个 <id><size>
一对。我粘贴了很多代码,但我真正怀疑的唯一功能是:
下行链路::addMsgToQueue
下行链路::writeCallback
下行链路::startWrites()下行链路::handleReadHeader
下行链路::handleReadFrameDataBGR
using namespace std;
using namespace boost;
using namespace boost::asio;
Downlink::Downlink() :
socket(nIO),
headerSize(sizeof(unsigned int)+1),
connected(false),
isWriting(false),
readHeaderBuffer(headerSize)
{}
Downlink::~Downlink() {
disconnect();
}
bool Downlink::connect(const std::string &robotHost, unsigned int port) {
disconnect();
ip::tcp::resolver resolver(nIO);
ip::tcp::resolver::query query(robotHost, lexical_cast<string>(port));
ip::tcp::resolver::iterator iterator = resolver.resolve(query);
ip::tcp::resolver::iterator end;
boost::system::error_code ec;
for(;iterator!=end;++iterator) {
socket.connect(*iterator, ec);
if(!ec)
break;
socket.close();
}
if(!socket.is_open())
return false;
async_read(socket, buffer(readHeaderBuffer),
bind(&Downlink::handleReadHeader, this, _1, _2));
//start network thread.
lock_guard<mutex> l(msgMutex);
outgoingMessages = queue<vector<char> >();
nIO.reset();
t = thread(bind(&boost::asio::io_service::run, &nIO));
connected = true;
return true;
}
bool Downlink::isConnected() const {
return connected;
}
void Downlink::disconnect() {
nIO.stop();
t.join();
socket.close();
connected = false;
isWriting = false;
nIO.reset();
nIO.run();
}
void Downlink::writeToLogs(const std::string &logMsg) {
vector<char> newMsg(logMsg.length()+headerSize);
newMsg[0] = MSG_WRITE_LOG;
const unsigned int msgLen(logMsg.length());
memcpy(&newMsg[1], &msgLen, sizeof(unsigned int));
vector<char>::iterator dataBegin = newMsg.begin();
advance(dataBegin, headerSize);
copy(logMsg.begin(), logMsg.end(), dataBegin);
assert(newMsg.size()==(headerSize+logMsg.length()));
addMsgToQueue(newMsg);
}
void Downlink::addMsgToQueue(const std::vector<char> &newMsg) {
lock_guard<mutex> l(msgMutex);
outgoingMessages.push(newMsg);
lock_guard<mutex> l2(outMutex);
if(!isWriting) {
nIO.post(bind(&Downlink::startWrites, this));
}
}
void Downlink::writeCallback(const boost::system::error_code& error,
std::size_t bytes_transferred) {
if(error) {
disconnect();
lock_guard<mutex> l(msgMutex);
outgoingMessages = queue<vector<char> >();
return;
}
{
lock_guard<mutex> l2(outMutex);
isWriting = false;
}
startWrites();
}
void Downlink::startWrites() {
lock_guard<mutex> l(msgMutex);
lock_guard<mutex> l2(outMutex);
if(outgoingMessages.empty()) {
isWriting = false;
return;
}
if(!isWriting) {
currentOutgoing = outgoingMessages.front();
outgoingMessages.pop();
async_write(socket, buffer(currentOutgoing),
bind(&Downlink::writeCallback, this, _1, _2));
isWriting = true;
}
}
void Downlink::handleReadHeader(const boost::system::error_code& error,
std::size_t bytes_transferred) {
//TODO: how to handle disconnect on errors?
cout<<"handleReadHeader"<<endl;
if(error) {
return;
}
assert(bytes_transferred==headerSize);
if(bytes_transferred!=headerSize) {
cout<<"got "<<bytes_transferred<<" while waiting for a header."<<endl;
}
currentPacketID = readHeaderBuffer[0];
memcpy(¤tPacketLength, &readHeaderBuffer[1], sizeof(unsigned int));
dataStream.resize(currentPacketLength);
switch(currentPacketID) {
case MSG_FRAME_BGR: {
cout<<"- >> gone to read frame. ("<<currentPacketLength<<")"<<endl;
async_read(socket, asio::buffer(dataStream),
boost::asio::transfer_at_least(currentPacketLength),
bind(&Downlink::handleReadFrameDataBGR, this, _1, _2));
} break;
default: {
cout<<"->>> gone to read other. ("<<currentPacketLength<<")"<<endl;
cout<<" "<<(int)currentPacketID<<endl;
async_read(socket, asio::buffer(dataStream),
boost::asio::transfer_at_least(currentPacketLength),
bind(&Downlink::handleReadData, this, _1, _2));
} break;
}
}
void Downlink::handleReadData(const boost::system::error_code& error,
std::size_t bytes_transferred) {
cout<<"handleReadData"<<endl;
if(error) {
return;
}
if(bytes_transferred!=currentPacketLength) {
cout<<"Got "<<bytes_transferred<<" wanted "<<currentPacketLength<<endl;
}
assert(bytes_transferred==currentPacketLength);
switch(currentPacketID) {
case MSG_ASCII: {
string msg(dataStream.begin(), dataStream.end());
textCallback(&msg);
} break;
case MSG_IMU: {
Eigen::Vector3d a,g,m;
unsigned int stamp;
memcpy(a.data(), &dataStream[0], sizeof(double)*3);
memcpy(m.data(), &dataStream[0]+sizeof(double)*3, sizeof(double)*3);
memcpy(g.data(), &dataStream[0]+sizeof(double)*6, sizeof(double)*3);
memcpy(&stamp, &dataStream[0]+sizeof(double)*9, sizeof(unsigned int));
imuCallback(a,m,g,stamp);
} break;
default:
//TODO: handle this better?
cout<<"Unknown packet ID."<<endl;
}
async_read(socket, buffer(readHeaderBuffer),
boost::asio::transfer_at_least(headerSize),
bind(&Downlink::handleReadHeader, this, _1, _2));
}
void Downlink::handleReadFrameDataBGR(const boost::system::error_code& error,
std::size_t bytes_transferred) {
cout<<"Got a frame"<<endl;
if(error) {
return;
}
if(bytes_transferred!=currentPacketLength) {
cout<<"Got "<<bytes_transferred<<" wanted "<<currentPacketLength<<endl;
}
assert(bytes_transferred==currentPacketLength);
unsigned int imageWidth, imageHeight, cameraID;
unsigned char *readOffset = (unsigned char*)&dataStream[0];
memcpy(&imageWidth, readOffset, sizeof(unsigned int));
readOffset += sizeof(unsigned int);
memcpy(&imageHeight, readOffset, sizeof(unsigned int));
readOffset += sizeof(unsigned int);
memcpy(&cameraID, readOffset, sizeof(unsigned int));
readOffset += sizeof(unsigned int);
cout<<"("<<imageWidth<<"x"<<imageHeight<<") ID = "<<cameraID<<endl;
frameCallback(readOffset, imageWidth, imageHeight, cameraID);
async_read(socket, buffer(readHeaderBuffer),
boost::asio::transfer_at_least(headerSize),
bind(&Downlink::handleReadHeader, this, _1, _2));
}
boost::signals2::connection Downlink::connectTextDataCallback(boost::signals2::signal<void (std::string *)>::slot_type s) {
return textCallback.connect(s);
}
boost::signals2::connection Downlink::connectIMUDataCallback(boost::signals2::signal<void (Eigen::Vector3d, Eigen::Vector3d, Eigen::Vector3d, unsigned int)>::slot_type s) {
return imuCallback.connect(s);
}
boost::signals2::connection Downlink::connectVideoFrameCallback(boost::signals2::signal<void (unsigned char *, unsigned int, unsigned int, unsigned int)>::slot_type s) {
return frameCallback.connect(s);
}
这是另一端的代码。它几乎与其他代码完全相同,但错误可能在两端。
using namespace std;
using namespace boost;
using namespace boost::asio;
Uplink::Uplink(unsigned int port) :
socket(nIO),
acceptor(nIO),
endpoint(ip::tcp::v4(), port),
headerSize(sizeof(unsigned int)+1), //id + data size
headerBuffer(headerSize)
{
//move socket into accept state.
acceptor.open(endpoint.protocol());
acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
acceptor.bind(endpoint);
acceptor.listen(1); //1 means only one client in connect queue.
acceptor.async_accept(socket, bind(&Uplink::accept_handler, this, _1));
//start network thread.
nIO.reset();
t = thread(boost::bind(&boost::asio::io_service::run, &nIO));
}
Uplink::~Uplink() {
nIO.stop(); //tell the network thread to stop.
t.join(); //wait for the network thread to stop.
acceptor.close(); //close listen port.
socket.close(); //close active connections.
nIO.reset();
nIO.run(); //let clients know that we're disconnecting.
}
void Uplink::parse_header(const boost::system::error_code& error,
std::size_t bytes_transferred) {
if(error || bytes_transferred!=headerSize) {
disconnect();
return;
}
currentPacketID = headerBuffer[0];
memcpy(¤tPacketLength, &headerBuffer[1], sizeof(unsigned int));
//move to read data state
//TODO: move to different states to parse various packet types.
async_read(socket, asio::buffer(dataStream), transfer_at_least(currentPacketLength),
bind(&Uplink::parse_data, this, _1, _2));
}
void Uplink::parse_data(const boost::system::error_code& error,
std::size_t bytes_transferred) {
if(error) {
disconnect();
return;
}
if(bytes_transferred != currentPacketLength) {
cout<<"bytes_transferred != currentPacketLength"<<endl;
disconnect();
return;
}
//move back into the header reading state
async_read(socket, buffer(headerBuffer),
bind(&Uplink::parse_header, this, _1, _2));
}
void Uplink::disconnect() {
acceptor.close();
socket.close();
acceptor.open(endpoint.protocol());
acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
acceptor.bind(endpoint);
acceptor.listen(1); //1 means only one client in connect queue.
acceptor.async_accept(socket, bind(&Uplink::accept_handler, this, _1));
}
void Uplink::accept_handler(const boost::system::error_code& error)
{
if (!error) {
//no more clents.
acceptor.close();
//move to read header state.
async_read(socket, buffer(headerBuffer),
bind(&Uplink::parse_header, this, _1, _2));
}
}
void Uplink::sendASCIIMessage(const std::string &m) {
//Format the message
unsigned int msgLength(m.length());
vector<char> outBuffer(msgLength+headerSize);
outBuffer[0] = MSG_ASCII;
memcpy(&outBuffer[1], &msgLength, sizeof(unsigned int));
vector<char>::iterator dataBegin(outBuffer.begin());
advance(dataBegin, headerSize);
copy(m.begin(), m.end(), dataBegin);
//queue the message
addToQueue(outBuffer);
}
void Uplink::sendIMUDataBlock(const nIMUDataBlock *d) {
//Format the message.
//a,g,m, 3 components each plus a stamp
const unsigned int msgLength(3*3*sizeof(double)+sizeof(unsigned int));
vector<char> outBuffer(msgLength+headerSize);
outBuffer[0] = MSG_IMU;
memcpy(&outBuffer[1], &msgLength, sizeof(unsigned int));
const Eigen::Vector3d a(d->getAccel());
const Eigen::Vector3d m(d->getMag());
const Eigen::Vector3d g(d->getGyro());
const unsigned int s(d->getUpdateStamp());
memcpy(&outBuffer[headerSize], a.data(), sizeof(double)*3);
memcpy(&outBuffer[headerSize+3*sizeof(double)], m.data(), sizeof(double)*3);
memcpy(&outBuffer[headerSize+6*sizeof(double)], g.data(), sizeof(double)*3);
memcpy(&outBuffer[headerSize+9*sizeof(double)], &s, sizeof(unsigned int));
/*
cout<<"----------------------------------------"<<endl;
cout<<"Accel = ("<<a[0]<<","<<a[1]<<","<<a[2]<<")"<<endl;
cout<<"Mag = ("<<m[0]<<","<<m[1]<<","<<m[2]<<")"<<endl;
cout<<"Gyro = ("<<g[0]<<","<<g[1]<<","<<g[2]<<")"<<endl;
cout<<"Stamp = "<<s<<endl;
cout<<"----------------------------------------"<<endl;
*/
//queue the message
addToQueue(outBuffer);
}
void Uplink::send_handler(const boost::system::error_code& error,
std::size_t bytes_transferred) {
{
lock_guard<mutex> l(queueLock);
lock_guard<mutex> l2(sendingLock);
if(outQueue.empty()) {
currentlySending = false;
return;
}
}
startSend();
}
void Uplink::addToQueue(const std::vector<char> &out) {
bool needsRestart = false;
{
lock_guard<mutex> l(queueLock);
lock_guard<mutex> l2(sendingLock);
outQueue.push(out);
needsRestart = !currentlySending;
}
if(needsRestart)
nIO.post(bind(&Uplink::startSend, this));
}
void Uplink::startSend() {
lock_guard<mutex> l(queueLock);
lock_guard<mutex> l2(sendingLock);
if(outQueue.empty())
return;
currentlySending = true;
currentWrite = outQueue.front();
outQueue.pop();
async_write(socket, buffer(currentWrite), bind(&Uplink::send_handler,
this, _1, _2));
}
void Uplink::sendVideoFrameBGR(const unsigned int width, const unsigned int height,
const unsigned int cameraID, const unsigned char *frameData) {
// image data image metadata header
const unsigned int packetSize(width*height*3 + sizeof(unsigned int)*3 + headerSize);
const unsigned int dataSize(width*height*3 + sizeof(unsigned int)*3);
vector<char> outgoingBuffer(packetSize);
outgoingBuffer[0] = MSG_FRAME_BGR;
memcpy(&outgoingBuffer[1], &dataSize, sizeof(unsigned int));
char *writePtr = &outgoingBuffer[headerSize];
memcpy(writePtr, &width, sizeof(unsigned int));
writePtr += sizeof(unsigned int);
memcpy(writePtr, &height, sizeof(unsigned int));
writePtr += sizeof(unsigned int);
memcpy(writePtr, &cameraID, sizeof(unsigned int));
writePtr += sizeof(unsigned int);
memcpy(writePtr, frameData, width*height*3*sizeof(char));
//TODO: can we avoid the whole image copy here?
//TODO: should come up with a better packet buffer build system.
//IDEA!: maybe have a "request buffer" funxction so the Uplink
//class can have sole ownership, rather than do the copy in "addtoQueue"
addToQueue(outgoingBuffer);
}
这个程序大部分时间都有效,但很少,当发送大量数据且数据包之间没有延迟时,它会失败。例如:
sendVideoFrameBGR(...); //occasional fail
sendASCIIMessage("...");
sendVideoFrameBGR(...); //never fails.
sleep(1);
sendASCIIMessage("...");
在下行链路中处理视频帧后,它返回到 hadleHeaderData 并等待长度为几兆字节的数据包和不存在的数据包 ID。流以某种方式被破坏。我不知道为什么。
我不太关心我现在编写的代码,所以如果有人知道一个好的类或库可以为我将 TCP 上的流解析为缓冲区 block ,我宁愿使用它。
编辑:
这是运行数据发送的确切代码:
if(frontImage) {
uplink.sendVideoFrameBGR(frontImage->width, frontImage->height, 0,
(unsigned char*)frontImage->imageData);
cout<<"Sent"<<endl;
//sleep(1); //works fine if this is uncommented !
}
uplink.sendASCIIMessage("Alive...");
sleep(1);
uplink.sendIMUDataBlock(imuDataBlock.get());
cout<<"Loop"<<endl;
sleep(1);
}
最佳答案
问题很可能是你的 ioservice 对象有多个线程处理工作。
当您在第一个发送函数之后立即调用第二个发送函数时,发布到 ioservice 的两个函数对象可能被委托(delegate)给不同的线程。所以基本上,两个写入并行发生在同一个套接字上。这很可能是非法的。将 Winsock2 与非阻塞套接字一起使用,这会导致传出数据损坏。
即使您使用 bool 来检查它当前是否正在发送,在 ioservice 线程之一处理函数之前也不会检查 bool。如果在您发布这两项工作时有两个 ioservice 线程处于事件状态,它可能会同时分派(dispatch)两个发送,导致两个发送函数在不同的线程上异步发生。 “当前正在发送”检查可能在两个调用中都返回 false,因为两个发送是并行运行的。
关于c++ - 尝试非常频繁地发送数据包时,boost::asio 数据包传输失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3444551/
说真的,你怎么能在不发疯的情况下处理所有这些异常呢?我是不是读了太多关于异常处理的文章或什么?我尝试重构了几次,但每次似乎都以更糟糕的结果告终。也许我应该承认确实会发生异常(exception)情况,
背景 两者 try/rescue和 try/catch是 Elixir 中的错误处理技术。根据 corresponding chapter在介绍指南中。 Errors can be rescued u
每当我尝试在 Raspberry PI 上运行此 python 脚本时,我都会遇到问题: import socket import sys # Create a TCP/IP socket sock
我想知道一些关于 PHP 的 try , catch声明。 让我们考虑以下示例。 abstract class ExceptionA extends Exception {} class Except
我的 laravel v5.4 项目中有两个模型,user 和 admin。 在 config/auth.php 中,我向守卫和提供者添加了管理员,如下所示: 'guards' => [ 'w
try: r = requests.get(url, params={'s': thing}) except requests.ConnectionError, e: print e
我有以下代码。 但是,它并不能捕获所有错误,而我仍然会收到“throw er;//未处理的'错误'事件”。 为什么是这样? app.post('/api/properties/zip/:zip/bed
问题与细节 我正在使用自定义错误处理,遇到的错误之一是“路径中的非法字符”。我有一个自定义函数,旨在通过路径字符串查找此类非法字符,并在找到它们时引发自定义错误。但是我发现,取决于非法字符,Test-
This question already has answers here: How do I catch a numpy warning like it's an exception (not j
我正在使用其他人的代码,但我不熟悉try/catch,因此我举了一个类似的小例子。在第11行上,如果我写了error(''),似乎没有发现错误并增加了索引j。但是,编写error(' ')或error
我在我的一个程序中遇到了这个问题,在这种情况下,尝试/异常(exception)的错误使程序变得更好,以防用户意外输入了他们不应该输入的内容。它仍然给我错误,我为为什么感到困惑。如果对我的问题确实很重
我在尝试TRY ... CATCH块时遇到问题。有人可以解释为什么以下代码无法执行我的sp吗? DECLARE @Result int SET @Result = 0 BEGIN TRY SE
我有一个相当大的 powershell 脚本,其中包含许多(20 多个)执行各种操作的函数。 现在所有代码实际上都没有任何错误处理或重试功能。如果某个特定的任务/功能失败,它就会失败并继续。 我想改进
为什么我尝试时需要导入 inputmismatchException catch(InputMismatchException e){ System.out.println("
我对此感到困惑 - 我为辅助方法编写了一个 try/catch 。它的目的是捕获任何无效输入(任何不是“男性”或“女性”的内容(没有特定情况)。如果输入无效,它将通知用户,然后让他们重试。如果有效,则
我有时会发现自己处于如下场景。尽可能简单地陈述问题 “有时我会创建一段代码,Java 让我将其包含在 try/catch 语句中。我没有使用 catch,所以我将其留空。为什么这是错误的?” boo
我有点困惑为什么当我不使用 Try block 时会出现 Try block 错误。 我在代码块底部附近收到错误通知。如果我不使用 try/catch,有人可以向我解释为什么会发生这种情况吗? 它是否
我已经盯着我的电脑两个小时了,我不知道我做错了什么。谁能帮助我看到光明? package blackjack; import java.util.Random; import java.util.Sc
我想将方法保存在 Enum 中,但 Class.getDeclaredMethod 抛出 NoSuchMethodException,那么我该如何处理呢?我的代码: public enum Car
这个问题已经有答案了: Executing multi-line statements in the one-line command-line (18 个回答) 已关闭 3 年前。 如何使用try.
我是一名优秀的程序员,十分优秀!