- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我正在实现一个由 N 台机器组成的小型分布式系统。他们每个人都从某个远程服务器接收一些数据,然后将数据传播到其他 n-1 台机器。我正在使用 Boost Asio async_read 和 async_write 来实现它。我设置了一个包含 N=30 台机器的测试集群。当我尝试较小的数据集(每台机器接收 75KB 到 750KB)时,该程序始终有效。但是当我继续处理一个稍微大一点的数据集(7.5MB)时,我观察到奇怪的行为:一开始,读取和写入按预期进行,但过了一会儿,一些机器挂了而另一些机器完成了,挂起的机器数量每次运行都不同。我尝试在每个handler中打印一些消息,发现对于那些挂掉的机器,async_read基本上过一段时间就无法读取成功,所以之后就什么也做不了了。我查看了远程服务器,都写完了。我尝试过使用strand来控制异步读写的执行顺序,也尝试过使用不同的io_services来进行读写。他们都没有解决问题。我很绝望。谁能帮帮我?
这是执行读取和传播的类的代码:
const int TRANS_TUPLE_SIZE=15;
const int TRANS_BUFFER_SIZE=5120/TRANS_TUPLE_SIZE*TRANS_TUPLE_SIZE;
class Asio_Trans_Broadcaster
{
private:
char buffer[TRANS_BUFFER_SIZE];
int node_id;
int mpi_size;
int mpi_rank;
boost::asio::ip::tcp::socket* dbsocket;
boost::asio::ip::tcp::socket** sender_sockets;
int n_send;
boost::mutex mutex;
bool done;
public:
Asio_Trans_Broadcaster(boost::asio::ip::tcp::socket* dbskt, boost::asio::ip::tcp::socket** senderskts,
int msize, int mrank, int id)
{
dbsocket=dbskt;
count=0;
node_id=id;
mpi_size=mpi_rank=-1;
sender_sockets=senderskts;
mpi_size=msize;
mpi_rank=mrank;
n_send=-1;
done=false;
}
static std::size_t completion_condition(const boost::system::error_code& error, std::size_t bytes_transferred)
{
int remain=bytes_transferred%TRANS_TUPLE_SIZE;
if(remain==0 && bytes_transferred>0)
return 0;
else
return TRANS_BUFFER_SIZE-bytes_transferred;
}
void write_handler(const boost::system::error_code &ec, std::size_t bytes_transferred)
{
int n=-1;
mutex.lock();
n_send--;
n=n_send;
mutex.unlock();
fprintf(stdout, "~~~~~~ @%d, write_handler: %d bytes, copies_to_send: %d\n",
node_id, bytes_transferred, n);
if(n==0 && !done)
boost::asio::async_read(*dbsocket,
boost::asio::buffer(buffer, TRANS_BUFFER_SIZE),
Asio_Trans_Broadcaster::completion_condition, boost::bind(&Asio_Trans_Broadcaster::broadcast_handler, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void broadcast_handler(const boost::system::error_code &ec, std::size_t bytes_transferred)
{
fprintf(stdout, "@%d, broadcast_handler: %d bytes, mpi_size:%d, mpi_rank: %d\n", node_id, bytes_transferred, mpi_size, mpi_rank);
if (!ec)
{
int pos=0;
while(pos<bytes_transferred && pos<TRANS_BUFFER_SIZE)
{
int id=-1;
memcpy(&id, &buffer[pos], 4);
if(id<0)
{
done=true;
fprintf(stdout, "@%d, broadcast_handler: done!\n", mpi_rank);
break;
}
pos+=TRANS_TUPLE_SIZE;
}
mutex.lock();
n_send=mpi_size-1;
mutex.unlock();
for(int i=0; i<mpi_size; i++)
if(i!=mpi_rank)
{
boost::asio::async_write(*sender_sockets[i], boost::asio::buffer(buffer, bytes_transferred),
boost::bind(&Asio_Trans_Broadcaster::write_handler, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
else
{
cerr<<mpi_rank<<" error: "<<ec.message()<<endl;
delete this;
}
}
void broadcast()
{
boost::asio::async_read(*dbsocket,
boost::asio::buffer(buffer, TRANS_BUFFER_SIZE),
Asio_Trans_Broadcaster::completion_condition, boost::bind(&Asio_Trans_Broadcaster::broadcast_handler, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
};
下面是每台机器上运行的主要代码:
int N=30;
boost::asio::io_service* sender_io_service=new boost::asio::io_service();
boost::asio::io_service::work* p_work=new boost::asio::io_service::work(*sender_io_service);
boost::thread_group send_thread_pool;
for(int i=0; i<NUM_THREADS; i++)
{
send_thread_pool.create_thread( boost::bind( & boost::asio::io_service::run, sender_io_service ) );
}
boost::asio::io_service* receiver_io_service=new boost::asio::io_service();
shared_ptr<boost::asio::io_service::work> p_work2(new boost::asio::io_service::work(*receiver_io_service));
boost::thread_group thread_pool2;
thread_pool2.create_thread( boost::bind( & boost::asio::io_service::run, receiver_io_service) );
boost::asio::ip::tcp::socket* receiver_socket;
//establish nonblocking connection with remote server
AsioConnectToRemote(5000, 1, receiver_io_service, receiver_socket, true);
boost::asio::ip::tcp::socket* send_sockets[N];
//establish blocking connection with other machines
hadoopNodes = SetupAsioConnectionsWIthOthers(sender_io_service, send_sockets, hostFileName, mpi_rank, mpi_size, 3000, false);
Asio_Trans_Broadcaster* db_receiver=new Asio_Trans_Broadcaster(receiver_socket, send_sockets,
mpi_size, mpi_rank, mpi_rank);
db_receiver->broadcast();
p_work2.reset();
thread_pool2.join_all();
delete p_work;
send_thread_pool.join_all();
最佳答案
我不知道你的代码试图实现什么。缺少的位太多。
当然,如果任务是在网络套接字上异步发送/接收流量,Asio 就是合适的选择。很难看出您的代码有什么特别之处。
我建议清理更明显的问题:
error_code
-s!)%lu
作为 size_t
如果可以使用 sizeof,则永远不要假设对象的大小:
memcpy(&id, &trans_buffer[pos], sizeof(id));
想想看,看起来缓冲区的索引无论如何都不安全:
while(pos < bytes_transferred && pos < TRANS_BUFFER_SIZE)
{
int id = -1;
memcpy(&id, &buffer[pos], sizeof(id));
如果例如pos == TRANS_BUFFER_SIZE-1
这里 memcpy 调用未定义行为...
为什么会有这么多 new
发生?您正在将一大堆毛茸茸的错误引入您的代码。好像内存管理不是低级编码的致命弱点。使用值或共享指针。 切勿删除此内容
。曾经[1]
为什么有那么多重复代码?为什么一个线程池以sender
命名,另一个以thread_pool2
命名?其中包含 1 个线程。嗯?为什么将一个 work
项作为原始指针,将另一个作为 shared_ptr
?
你可以只是:
struct service_wrap {
service_wrap(int threads) {
while(threads--)
pool.create_thread(boost::bind(&boost::asio::io_service::run, boost::ref(io_service)));
}
~service_wrap() {
io_service.post(boost::bind(&service_wrap::stop, this));
pool.join_all();
}
private: // mind the initialization order!
boost::asio::io_service io_service;
boost::optional<boost::asio::io_service::work> work;
boost::thread_group pool;
void stop() {
work = boost::none;
}
};
所以你可以简单地写:
service_wrap senders(NUM_THREADS);
service_wrap receivers(1);
哇。你看到了吗?没有更多的错误机会。如果你修复了一个池,你会自动修复另一个池。不再 delete
第一个,.reset()
第二个 work
项。简而言之:不再有困惑的代码,也没有那么复杂。
使用异常安全锁守卫:
int local_n_send = -1; // not clear naming
{
boost::lock_guard<boost::mutex> lk(mutex);
n_send--;
local_n_send = n_send;
}
broadcast
的主体在write_handler()
中完全重复。为什么不直接调用它:
if(local_n_send == 0 && !done)
broadcast();
我认为仍然存在竞争条件 - 不是访问 n_send
本身的数据竞争,但如果 n_send
重新广播的决定可能是错误的> 释放锁后达到零。现在,因为 broadcast()
只做一个异步操作,你可以在锁下做它并摆脱竞争条件:
void write_handler(const error_code &ec, size_t bytes_transferred) {
boost::lock_guard<boost::mutex> lk(mutex);
if(!(done || --n_send))
broadcast();
}
呜呜呜。现在是三行代码。更少的代码就是更少的错误。
我的猜测是,如果您像这样勤奋地擦洗代码,您将不可避免地找到您的线索。把它想象成你会寻找丢失的结婚戒指:你不会在周围留下一团糟。相反,你会从一个房间走到另一个房间,把它整理干净。如果需要,先把所有东西“扔掉”。
如果你可以让这个东西自包含/和/可重现,我什至会为你进一步调试它!
干杯
这是我在查看代码时所做的起点: Compiling on Coliru
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/array.hpp>
#include <boost/make_shared.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <iostream>
const/*expr*/ int TRANS_TUPLE_SIZE = 15;
const/*expr*/ int TRANS_BUFFER_SIZE = 5120 / TRANS_TUPLE_SIZE * TRANS_TUPLE_SIZE;
namespace AsioTrans
{
using boost::system::error_code;
using namespace boost::asio;
typedef ip::tcp::socket socket_t;
typedef boost::ptr_vector<socket_t> socket_list;
class Broadcaster
{
private:
boost::array<char, TRANS_BUFFER_SIZE> trans_buffer;
int node_id;
int mpi_rank;
socket_t& dbsocket;
socket_list& sender_sockets;
int n_send;
boost::mutex mutex;
bool done;
public:
Broadcaster(
socket_t& dbskt,
socket_list& senderskts,
int mrank,
int id) :
node_id(id),
mpi_rank(mrank),
dbsocket(dbskt),
sender_sockets(senderskts),
n_send(-1),
done(false)
{
// count=0;
}
static size_t completion_condition(const error_code& error, size_t bytes_transferred)
{
// TODO FIXME handler error_code here
int remain = bytes_transferred % TRANS_TUPLE_SIZE;
if(bytes_transferred && !remain)
{
return 0;
}
else
{
return TRANS_BUFFER_SIZE - bytes_transferred;
}
}
void write_handler(const error_code &ec, size_t bytes_transferred)
{
// TODO handle errors
// TODO check bytes_transferred
boost::lock_guard<boost::mutex> lk(mutex);
if(!(done || --n_send))
broadcast();
}
void broadcast_handler(const error_code &ec, size_t bytes_transferred)
{
fprintf(stdout, "@%d, broadcast_handler: %lu bytes, mpi_size:%lu, mpi_rank: %d\n", node_id, bytes_transferred, sender_sockets.size(), mpi_rank);
if(!ec)
{
for(size_t pos = 0; (pos < bytes_transferred && pos < TRANS_BUFFER_SIZE); pos += TRANS_TUPLE_SIZE)
{
int id = -1;
memcpy(&id, &trans_buffer[pos], sizeof(id));
if(id < 0)
{
done = true;
fprintf(stdout, "@%d, broadcast_handler: done!\n", mpi_rank);
break;
}
}
{
boost::lock_guard<boost::mutex> lk(mutex);
n_send = sender_sockets.size() - 1;
}
for(int i = 0; size_t(i) < sender_sockets.size(); i++)
{
if(i != mpi_rank)
{
async_write(
sender_sockets[i],
buffer(trans_buffer, bytes_transferred),
boost::bind(&Broadcaster::write_handler, this, placeholders::error, placeholders::bytes_transferred));
}
}
}
else
{
std::cerr << mpi_rank << " error: " << ec.message() << std::endl;
delete this;
}
}
void broadcast()
{
async_read(
dbsocket,
buffer(trans_buffer),
Broadcaster::completion_condition,
boost::bind(&Broadcaster::broadcast_handler, this,
placeholders::error,
placeholders::bytes_transferred));
}
};
struct service_wrap {
service_wrap(int threads) {
while(threads--)
_pool.create_thread(boost::bind(&io_service::run, boost::ref(_service)));
}
~service_wrap() {
_service.post(boost::bind(&service_wrap::stop, this));
_pool.join_all();
}
io_service& service() { return _service; }
private: // mind the initialization order!
io_service _service;
boost::optional<io_service::work> _work;
boost::thread_group _pool;
void stop() {
_work = boost::none;
}
};
extern void AsioConnectToRemote(int, int, io_service&, socket_t&, bool);
extern void SetupAsioConnectionsWIthOthers(io_service&, socket_list&, std::string, int, bool);
}
int main()
{
using namespace AsioTrans;
// there's no use in increasing #threads unless there are blocking operations
service_wrap senders(boost::thread::hardware_concurrency());
service_wrap receivers(1);
socket_t receiver_socket(receivers.service());
AsioConnectToRemote(5000, 1, receivers.service(), receiver_socket, true);
socket_list send_sockets(30);
/*hadoopNodes =*/ SetupAsioConnectionsWIthOthers(senders.service(), send_sockets, "hostFileName", 3000, false);
int mpi_rank = send_sockets.size();
AsioTrans::Broadcaster db_receiver(receiver_socket, send_sockets, mpi_rank, mpi_rank);
db_receiver.broadcast();
}
[1] 没有异常(exception)。除非没有异常(exception)规则有异常(exception)。异常接收。
关于c++ - Boost Asio async_read 有时会在阅读时挂起,但并非总是如此,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22642268/
我正在尝试读取一个大型日志文件,该文件已使用不同的分隔符(遗留更改)进行了解析。 此代码有效 import os, subprocess, time, re import pandas as pd f
我试图理解在 Linux 下以 Turbo 模式(特别是 fpc -Mtp -vw)编译的 Free Pascal 中看到的有点神奇的行为。代码来自 Jack Crenshaw 的“让我们构建一个编译
我有一个具有以下结构的 txt 文件: NAME DATA1 DATA2 a 10 1,2,3 b 6 8,9 c 2
我试图理解在 Linux 下以 Turbo 模式(特别是 fpc -Mtp -vw)编译的 Free Pascal 中看到的有点神奇的行为。代码来自 Jack Crenshaw 的“让我们构建一个编译
public class Bug1 { private String s; public void Bug1(){ s = "hello"; } public Stri
我们有这样一种情况,我们的应用程序需要处理一系列文件,而不是同步执行此功能,我们希望采用多线程将工作负载分配给不同的线程。 每一项工作是: 1.以只读方式打开文件 2.处理文件中的数据 3.将处理后的
我正在尝试读取 .php 文件并替换十六进制字符。php文件格式如下: 问题是它弄乱了转义字符 (\") 到目前为止我的代码: while(i=48 && str[i+2]=97 && str[i+
我正在用 C# 开发一个程序,我需要一些帮助。我正在尝试创建一个数组或项目列表,显示在某个网站上。我想要做的是阅读 anchor 文本,它是 href。例如,这是 HTML:
我有一个偏好设置,它控制我的应用程序是否在用户单击按钮时播放声音(这种情况经常发生,想想计算器)。每次用户单击按钮时,都会调用以下方法: private void playButtonClickSou
我正在尝试在我的标签末尾创建一个阅读更多按钮。我希望它默认显示 3 行。我正在用 swift 而不是 objective c 编写代码。只有当用户点击标签的阅读更多部分时,标签才会展开。它的外观和工作
当您获得第三方库(c、c++)、开源(LGPL 说)但没有很好的文档时,了解它以便能够集成到您的应用程序中的最佳方法是什么? 该库通常有一些示例程序,我最终使用 gdb 浏览了代码。还有其他建议/最佳
同时从 2 个或更多不同线程对同一个文件描述符使用 pread 是否有问题? 最佳答案 pread 本身是线程安全的,因为它不在 list of unsafe functions 上.所以调用它是安全
当您使用命令 pd.read_csv 读取 csv 时,如何跳过连续包含特定值的行?如果在第 50、55 行,第一列的值为 100,那么我想在读取 csv 文件时跳过这些行。我如何将这些命令放入像 p
我迫切需要在 C# 中使用 T4 生成 HTML 输出。 我正在使用 Runtime-T4-Files 并选择“TextTemplatingFilePreprocessor”而不是“TextTempl
今年夏天我在实习期间一直在学习 ERP 应用程序。由于我是一名即将毕业的程序员,我希望有一个可靠的软件分支可以帮助我完成工作,直到我确定下一步该做什么(直到我对大局有一个很好的了解)。到现在为止,我刚
将包含列(例如“a”、“b”)的数据帧保存为 parquet,然后在稍后的时间点读取 parquet 不会提供相同的列顺序(可能是“b”、“a”fe)文件保存为。 不幸的是,我无法弄清楚订单是如何受到
我正在开发一个使用谷歌表格作为数据库的应用程序,但我不知道如何让 Swift 从谷歌表格中读取。我浏览了 API 网站和一些问题,但刚开始我需要一些帮助。到目前为止,我有; 私有(private)让范
我打算阅读swing concept,如果值得一读,请推荐一些学习 Material 最佳答案 自 AWT 崩溃以来,Java 的 GUI 工具包太多了。即使是 Swing 也被评论家严重低估,但他们
我已经使用 J 几个月了,我发现阅读不熟悉的代码(例如,不是我自己写的)是该语言最具挑战性的方面之一,尤其是在默认情况下。过了一会儿,我想出了这个策略: 1)将代码段复制到word文档中 2)从(1)
很难说出这里问的是什么。这个问题是含糊的、模糊的、不完整的、过于宽泛的或修辞性的,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开它,visit the help center 。 已关
我是一名优秀的程序员,十分优秀!