- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
最终,我试图将缓冲区从一台机器传输到另一台机器。下面的代码采用 <id><size><data with size bytes>
的流并读取 handleReadHeader 函数中的部分,然后读取 <size>
字节数,然后返回并等待另一个 <id><size>
一对。我粘贴了很多代码,但我真正怀疑的唯一功能是:
下行链路::addMsgToQueue
下行链路::writeCallback
下行链路::startWrites()下行链路::handleReadHeader
下行链路::handleReadFrameDataBGR
using namespace std;
using namespace boost;
using namespace boost::asio;
Downlink::Downlink() :
socket(nIO),
headerSize(sizeof(unsigned int)+1),
connected(false),
isWriting(false),
readHeaderBuffer(headerSize)
{}
Downlink::~Downlink() {
disconnect();
}
bool Downlink::connect(const std::string &robotHost, unsigned int port) {
disconnect();
ip::tcp::resolver resolver(nIO);
ip::tcp::resolver::query query(robotHost, lexical_cast<string>(port));
ip::tcp::resolver::iterator iterator = resolver.resolve(query);
ip::tcp::resolver::iterator end;
boost::system::error_code ec;
for(;iterator!=end;++iterator) {
socket.connect(*iterator, ec);
if(!ec)
break;
socket.close();
}
if(!socket.is_open())
return false;
async_read(socket, buffer(readHeaderBuffer),
bind(&Downlink::handleReadHeader, this, _1, _2));
//start network thread.
lock_guard<mutex> l(msgMutex);
outgoingMessages = queue<vector<char> >();
nIO.reset();
t = thread(bind(&boost::asio::io_service::run, &nIO));
connected = true;
return true;
}
bool Downlink::isConnected() const {
return connected;
}
void Downlink::disconnect() {
nIO.stop();
t.join();
socket.close();
connected = false;
isWriting = false;
nIO.reset();
nIO.run();
}
void Downlink::writeToLogs(const std::string &logMsg) {
vector<char> newMsg(logMsg.length()+headerSize);
newMsg[0] = MSG_WRITE_LOG;
const unsigned int msgLen(logMsg.length());
memcpy(&newMsg[1], &msgLen, sizeof(unsigned int));
vector<char>::iterator dataBegin = newMsg.begin();
advance(dataBegin, headerSize);
copy(logMsg.begin(), logMsg.end(), dataBegin);
assert(newMsg.size()==(headerSize+logMsg.length()));
addMsgToQueue(newMsg);
}
void Downlink::addMsgToQueue(const std::vector<char> &newMsg) {
lock_guard<mutex> l(msgMutex);
outgoingMessages.push(newMsg);
lock_guard<mutex> l2(outMutex);
if(!isWriting) {
nIO.post(bind(&Downlink::startWrites, this));
}
}
void Downlink::writeCallback(const boost::system::error_code& error,
std::size_t bytes_transferred) {
if(error) {
disconnect();
lock_guard<mutex> l(msgMutex);
outgoingMessages = queue<vector<char> >();
return;
}
{
lock_guard<mutex> l2(outMutex);
isWriting = false;
}
startWrites();
}
void Downlink::startWrites() {
lock_guard<mutex> l(msgMutex);
lock_guard<mutex> l2(outMutex);
if(outgoingMessages.empty()) {
isWriting = false;
return;
}
if(!isWriting) {
currentOutgoing = outgoingMessages.front();
outgoingMessages.pop();
async_write(socket, buffer(currentOutgoing),
bind(&Downlink::writeCallback, this, _1, _2));
isWriting = true;
}
}
void Downlink::handleReadHeader(const boost::system::error_code& error,
std::size_t bytes_transferred) {
//TODO: how to handle disconnect on errors?
cout<<"handleReadHeader"<<endl;
if(error) {
return;
}
assert(bytes_transferred==headerSize);
if(bytes_transferred!=headerSize) {
cout<<"got "<<bytes_transferred<<" while waiting for a header."<<endl;
}
currentPacketID = readHeaderBuffer[0];
memcpy(¤tPacketLength, &readHeaderBuffer[1], sizeof(unsigned int));
dataStream.resize(currentPacketLength);
switch(currentPacketID) {
case MSG_FRAME_BGR: {
cout<<"- >> gone to read frame. ("<<currentPacketLength<<")"<<endl;
async_read(socket, asio::buffer(dataStream),
boost::asio::transfer_at_least(currentPacketLength),
bind(&Downlink::handleReadFrameDataBGR, this, _1, _2));
} break;
default: {
cout<<"->>> gone to read other. ("<<currentPacketLength<<")"<<endl;
cout<<" "<<(int)currentPacketID<<endl;
async_read(socket, asio::buffer(dataStream),
boost::asio::transfer_at_least(currentPacketLength),
bind(&Downlink::handleReadData, this, _1, _2));
} break;
}
}
void Downlink::handleReadData(const boost::system::error_code& error,
std::size_t bytes_transferred) {
cout<<"handleReadData"<<endl;
if(error) {
return;
}
if(bytes_transferred!=currentPacketLength) {
cout<<"Got "<<bytes_transferred<<" wanted "<<currentPacketLength<<endl;
}
assert(bytes_transferred==currentPacketLength);
switch(currentPacketID) {
case MSG_ASCII: {
string msg(dataStream.begin(), dataStream.end());
textCallback(&msg);
} break;
case MSG_IMU: {
Eigen::Vector3d a,g,m;
unsigned int stamp;
memcpy(a.data(), &dataStream[0], sizeof(double)*3);
memcpy(m.data(), &dataStream[0]+sizeof(double)*3, sizeof(double)*3);
memcpy(g.data(), &dataStream[0]+sizeof(double)*6, sizeof(double)*3);
memcpy(&stamp, &dataStream[0]+sizeof(double)*9, sizeof(unsigned int));
imuCallback(a,m,g,stamp);
} break;
default:
//TODO: handle this better?
cout<<"Unknown packet ID."<<endl;
}
async_read(socket, buffer(readHeaderBuffer),
boost::asio::transfer_at_least(headerSize),
bind(&Downlink::handleReadHeader, this, _1, _2));
}
void Downlink::handleReadFrameDataBGR(const boost::system::error_code& error,
std::size_t bytes_transferred) {
cout<<"Got a frame"<<endl;
if(error) {
return;
}
if(bytes_transferred!=currentPacketLength) {
cout<<"Got "<<bytes_transferred<<" wanted "<<currentPacketLength<<endl;
}
assert(bytes_transferred==currentPacketLength);
unsigned int imageWidth, imageHeight, cameraID;
unsigned char *readOffset = (unsigned char*)&dataStream[0];
memcpy(&imageWidth, readOffset, sizeof(unsigned int));
readOffset += sizeof(unsigned int);
memcpy(&imageHeight, readOffset, sizeof(unsigned int));
readOffset += sizeof(unsigned int);
memcpy(&cameraID, readOffset, sizeof(unsigned int));
readOffset += sizeof(unsigned int);
cout<<"("<<imageWidth<<"x"<<imageHeight<<") ID = "<<cameraID<<endl;
frameCallback(readOffset, imageWidth, imageHeight, cameraID);
async_read(socket, buffer(readHeaderBuffer),
boost::asio::transfer_at_least(headerSize),
bind(&Downlink::handleReadHeader, this, _1, _2));
}
boost::signals2::connection Downlink::connectTextDataCallback(boost::signals2::signal<void (std::string *)>::slot_type s) {
return textCallback.connect(s);
}
boost::signals2::connection Downlink::connectIMUDataCallback(boost::signals2::signal<void (Eigen::Vector3d, Eigen::Vector3d, Eigen::Vector3d, unsigned int)>::slot_type s) {
return imuCallback.connect(s);
}
boost::signals2::connection Downlink::connectVideoFrameCallback(boost::signals2::signal<void (unsigned char *, unsigned int, unsigned int, unsigned int)>::slot_type s) {
return frameCallback.connect(s);
}
这是另一端的代码。它几乎与其他代码完全相同,但错误可能在两端。
using namespace std;
using namespace boost;
using namespace boost::asio;
Uplink::Uplink(unsigned int port) :
socket(nIO),
acceptor(nIO),
endpoint(ip::tcp::v4(), port),
headerSize(sizeof(unsigned int)+1), //id + data size
headerBuffer(headerSize)
{
//move socket into accept state.
acceptor.open(endpoint.protocol());
acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
acceptor.bind(endpoint);
acceptor.listen(1); //1 means only one client in connect queue.
acceptor.async_accept(socket, bind(&Uplink::accept_handler, this, _1));
//start network thread.
nIO.reset();
t = thread(boost::bind(&boost::asio::io_service::run, &nIO));
}
Uplink::~Uplink() {
nIO.stop(); //tell the network thread to stop.
t.join(); //wait for the network thread to stop.
acceptor.close(); //close listen port.
socket.close(); //close active connections.
nIO.reset();
nIO.run(); //let clients know that we're disconnecting.
}
void Uplink::parse_header(const boost::system::error_code& error,
std::size_t bytes_transferred) {
if(error || bytes_transferred!=headerSize) {
disconnect();
return;
}
currentPacketID = headerBuffer[0];
memcpy(¤tPacketLength, &headerBuffer[1], sizeof(unsigned int));
//move to read data state
//TODO: move to different states to parse various packet types.
async_read(socket, asio::buffer(dataStream), transfer_at_least(currentPacketLength),
bind(&Uplink::parse_data, this, _1, _2));
}
void Uplink::parse_data(const boost::system::error_code& error,
std::size_t bytes_transferred) {
if(error) {
disconnect();
return;
}
if(bytes_transferred != currentPacketLength) {
cout<<"bytes_transferred != currentPacketLength"<<endl;
disconnect();
return;
}
//move back into the header reading state
async_read(socket, buffer(headerBuffer),
bind(&Uplink::parse_header, this, _1, _2));
}
void Uplink::disconnect() {
acceptor.close();
socket.close();
acceptor.open(endpoint.protocol());
acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
acceptor.bind(endpoint);
acceptor.listen(1); //1 means only one client in connect queue.
acceptor.async_accept(socket, bind(&Uplink::accept_handler, this, _1));
}
void Uplink::accept_handler(const boost::system::error_code& error)
{
if (!error) {
//no more clents.
acceptor.close();
//move to read header state.
async_read(socket, buffer(headerBuffer),
bind(&Uplink::parse_header, this, _1, _2));
}
}
void Uplink::sendASCIIMessage(const std::string &m) {
//Format the message
unsigned int msgLength(m.length());
vector<char> outBuffer(msgLength+headerSize);
outBuffer[0] = MSG_ASCII;
memcpy(&outBuffer[1], &msgLength, sizeof(unsigned int));
vector<char>::iterator dataBegin(outBuffer.begin());
advance(dataBegin, headerSize);
copy(m.begin(), m.end(), dataBegin);
//queue the message
addToQueue(outBuffer);
}
void Uplink::sendIMUDataBlock(const nIMUDataBlock *d) {
//Format the message.
//a,g,m, 3 components each plus a stamp
const unsigned int msgLength(3*3*sizeof(double)+sizeof(unsigned int));
vector<char> outBuffer(msgLength+headerSize);
outBuffer[0] = MSG_IMU;
memcpy(&outBuffer[1], &msgLength, sizeof(unsigned int));
const Eigen::Vector3d a(d->getAccel());
const Eigen::Vector3d m(d->getMag());
const Eigen::Vector3d g(d->getGyro());
const unsigned int s(d->getUpdateStamp());
memcpy(&outBuffer[headerSize], a.data(), sizeof(double)*3);
memcpy(&outBuffer[headerSize+3*sizeof(double)], m.data(), sizeof(double)*3);
memcpy(&outBuffer[headerSize+6*sizeof(double)], g.data(), sizeof(double)*3);
memcpy(&outBuffer[headerSize+9*sizeof(double)], &s, sizeof(unsigned int));
/*
cout<<"----------------------------------------"<<endl;
cout<<"Accel = ("<<a[0]<<","<<a[1]<<","<<a[2]<<")"<<endl;
cout<<"Mag = ("<<m[0]<<","<<m[1]<<","<<m[2]<<")"<<endl;
cout<<"Gyro = ("<<g[0]<<","<<g[1]<<","<<g[2]<<")"<<endl;
cout<<"Stamp = "<<s<<endl;
cout<<"----------------------------------------"<<endl;
*/
//queue the message
addToQueue(outBuffer);
}
void Uplink::send_handler(const boost::system::error_code& error,
std::size_t bytes_transferred) {
{
lock_guard<mutex> l(queueLock);
lock_guard<mutex> l2(sendingLock);
if(outQueue.empty()) {
currentlySending = false;
return;
}
}
startSend();
}
void Uplink::addToQueue(const std::vector<char> &out) {
bool needsRestart = false;
{
lock_guard<mutex> l(queueLock);
lock_guard<mutex> l2(sendingLock);
outQueue.push(out);
needsRestart = !currentlySending;
}
if(needsRestart)
nIO.post(bind(&Uplink::startSend, this));
}
void Uplink::startSend() {
lock_guard<mutex> l(queueLock);
lock_guard<mutex> l2(sendingLock);
if(outQueue.empty())
return;
currentlySending = true;
currentWrite = outQueue.front();
outQueue.pop();
async_write(socket, buffer(currentWrite), bind(&Uplink::send_handler,
this, _1, _2));
}
void Uplink::sendVideoFrameBGR(const unsigned int width, const unsigned int height,
const unsigned int cameraID, const unsigned char *frameData) {
// image data image metadata header
const unsigned int packetSize(width*height*3 + sizeof(unsigned int)*3 + headerSize);
const unsigned int dataSize(width*height*3 + sizeof(unsigned int)*3);
vector<char> outgoingBuffer(packetSize);
outgoingBuffer[0] = MSG_FRAME_BGR;
memcpy(&outgoingBuffer[1], &dataSize, sizeof(unsigned int));
char *writePtr = &outgoingBuffer[headerSize];
memcpy(writePtr, &width, sizeof(unsigned int));
writePtr += sizeof(unsigned int);
memcpy(writePtr, &height, sizeof(unsigned int));
writePtr += sizeof(unsigned int);
memcpy(writePtr, &cameraID, sizeof(unsigned int));
writePtr += sizeof(unsigned int);
memcpy(writePtr, frameData, width*height*3*sizeof(char));
//TODO: can we avoid the whole image copy here?
//TODO: should come up with a better packet buffer build system.
//IDEA!: maybe have a "request buffer" funxction so the Uplink
//class can have sole ownership, rather than do the copy in "addtoQueue"
addToQueue(outgoingBuffer);
}
这个程序大部分时间都有效,但很少,当发送大量数据且数据包之间没有延迟时,它会失败。例如:
sendVideoFrameBGR(...); //occasional fail
sendASCIIMessage("...");
sendVideoFrameBGR(...); //never fails.
sleep(1);
sendASCIIMessage("...");
在下行链路中处理视频帧后,它返回到 hadleHeaderData 并等待长度为几兆字节的数据包和不存在的数据包 ID。流以某种方式被破坏。我不知道为什么。
我不太关心我现在编写的代码,所以如果有人知道一个好的类或库可以为我将 TCP 上的流解析为缓冲区 block ,我宁愿使用它。
编辑:
这是运行数据发送的确切代码:
if(frontImage) {
uplink.sendVideoFrameBGR(frontImage->width, frontImage->height, 0,
(unsigned char*)frontImage->imageData);
cout<<"Sent"<<endl;
//sleep(1); //works fine if this is uncommented !
}
uplink.sendASCIIMessage("Alive...");
sleep(1);
uplink.sendIMUDataBlock(imuDataBlock.get());
cout<<"Loop"<<endl;
sleep(1);
}
最佳答案
问题很可能是你的 ioservice 对象有多个线程处理工作。
当您在第一个发送函数之后立即调用第二个发送函数时,发布到 ioservice 的两个函数对象可能被委托(delegate)给不同的线程。所以基本上,两个写入并行发生在同一个套接字上。这很可能是非法的。将 Winsock2 与非阻塞套接字一起使用,这会导致传出数据损坏。
即使您使用 bool 来检查它当前是否正在发送,在 ioservice 线程之一处理函数之前也不会检查 bool。如果在您发布这两项工作时有两个 ioservice 线程处于事件状态,它可能会同时分派(dispatch)两个发送,导致两个发送函数在不同的线程上异步发生。 “当前正在发送”检查可能在两个调用中都返回 false,因为两个发送是并行运行的。
关于c++ - 尝试非常频繁地发送数据包时,boost::asio 数据包传输失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3444551/
我正在尝试使用boost.spirit的qi库解析某些内容,而我遇到了一个问题。根据spirit docs,a >> b应该产生类型为tuple的东西。但这是boost::tuple(又名 fusio
似乎有/正在努力做到这一点,但到目前为止我看到的大多数资源要么已经过时(带有死链接),要么几乎没有信息来实际构建一个小的工作样本(例如,依赖于boost program_options 以构建可执行文
我对 Boost.Log 的状态有点困惑。这是 Boost 的官方部分,还是尚未被接受?当我用谷歌搜索时,我看到一些帖子谈论它在 2010 年是如何被接受的,等等,但是当我查看最后一个 Boost 库
Boost 提供了两种不同的实现 string_view ,这将成为 C++17 的一部分: boost::string_ref在 utility/string_ref.hpp boost::stri
最近,我被一家GIS公司雇用来重写他们的旧地理信息库。所以我目前正在寻找一个好的计算几何库。我看过CGAL,这真是了不起,但是我的老板想要免费的东西。 所以我现在正在检查Boost.Geometry。
假设我有一个无向图 G。假设我添加以下内容 add_edge(1,2,G); add_edge(1,3,G); add_edge(0,2,G); 现在我再说一遍: add_edge(0,2,G); 我
我使用 CMake 来查找 Boost。找到了 Boost,但 CMake 出错了 Imported targets not available for Boost version 请参阅下面的完整错
我是 boost::fusion 和 boost::mpl 库的新手。谁能告诉我这两个库之间的主要区别? 到目前为止,我只使用 fusion::vector 和其他一些简单的东西。现在我想使用 fus
这个问题已经有答案了: 已关闭10 年前。 Possible Duplicate: What are the benefits of using Boost.Phoenix? 所以我开始阅读 boos
我正在尝试获得一个使用 Boost.Timer 的简单示例,用于一些秒表性能测量,但我不明白为什么我无法成功地将 Boost.Timer 链接到 Boost.Chrono。我使用以下简单脚本从源代码构
我有这样的东西: enum EFood{ eMeat, eFruit }; class Food{ }; class Meat: public Food{ void someM
有人可以告诉我,我如何获得boost::Variant处理无序地图? typedef boost::variant lut_value;unordered_map table; 我认为有一个用于boo
我对 Boost.Geometry 中的环和多边形感到困惑。 在文档中,没有图形显示什么是环,什么是多边形。 谁能画图解释两个概念的区别? 最佳答案 在 Boost.Geometry 中,多边形被定义
我正在使用 boost.pool,但我不知道何时使用 boost::pool<>::malloc和 boost::pool<>::ordered_malloc ? 所以, boost::pool<>:
我正在尝试通过 *boost::fast_pool_allocator* 使用 *boost::container::flat_set*。但是,我收到编译错误。非常感谢您的意见和建议。为了突出这个问题
sau_timer::sau_timer(int secs, timerparam f) : strnd(io), t(io, boost::posix_time::seconds(secs)
我无法理解此功能的文档,我已多次看到以下内容 tie (ei,ei_end) = out_edges(*(vi+a),g); **g**::out_edge_iterator ei, ei_end;
我想在 C++ 中序列化分层数据结构。我正在处理的项目使用 boost,所以我使用 boost::property_tree::ptree 作为我的数据节点结构。 我们有像 Person 这样的高级结
我需要一些帮助来解决这个异常,我正在实现一个 NPAPI 插件,以便能够使用来自浏览器扩展的本地套接字,为此我正在使用 Firebreath 框架。 对于套接字和连接,我使用带有异步调用的 Boost
我尝试将 boost::bind 与 boost::factory 结合使用但没有成功 我有这个类 Zambas 有 4 个参数(2 个字符串和 2 个整数)和 class Zambas { publ
我是一名优秀的程序员,十分优秀!