gpt4 book ai didi

linux - Linux上的Asio停在epoll()中

转载 作者:塔克拉玛干 更新时间:2023-11-03 01:22:02 27 4
gpt4 key购买 nike

我们在Linux上遇到了独立(非增强型)Asio 1.10.6的异步操作问题,已通过以下测试应用程序进行了演示:

#define ASIO_STANDALONE
#define ASIO_HEADER_ONLY
#define ASIO_NO_EXCEPTIONS
#define ASIO_NO_TYPEID
#include "asio.hpp"

#include <chrono>
#include <iostream>
#include <list>
#include <map>
#include <thread>

static bool s_freeInboundSocket = false;
static bool s_freeOutboundSocket = false;

class Tester
{
public:
Tester(asio::io_service& i_ioService, unsigned i_n)
: m_inboundStrand(i_ioService)
, m_listener(i_ioService)
, m_outboundStrand(i_ioService)
, m_resolver(i_ioService)
, m_n(i_n)
, m_traceStart(std::chrono::high_resolution_clock::now())
{}

~Tester()
{}

void TraceIn(unsigned i_line)
{
m_inboundTrace.emplace_back(i_line, std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - m_traceStart));
}

void AbortIn(unsigned i_line)
{
TraceIn(i_line);
abort();
}

void TraceOut(unsigned i_line)
{
m_outboundTrace.emplace_back(i_line, std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - m_traceStart));
}

void AbortOut(unsigned i_line)
{
TraceOut(i_line);
abort();
}

void DumpTrace(std::map<unsigned, unsigned>& o_counts)
{
std::cout << "## " << m_n << " ##\n";
std::cout << "-- " << m_traceStart.time_since_epoch().count() << "\n";
std::cout << "- in - - out -\n";

auto in = m_inboundTrace.begin();
auto out = m_outboundTrace.begin();

while ((in != m_inboundTrace.end()) || (out != m_outboundTrace.end()))
{
if (in == m_inboundTrace.end())
{
++o_counts[out->first];

std::cout << " " << out->first << " : " << out->second.count() << "\n";
++out;
}
else if (out == m_outboundTrace.end())
{
++o_counts[in->first];

std::cout << in->first << " : " << in->second.count() << "\n";
++in;
}
else if (out->second < in->second)
{
++o_counts[out->first];

std::cout << " " << out->first << " : " << out->second.count() << "\n";
++out;
}
else
{
++o_counts[in->first];

std::cout << in->first << " : " << in->second.count() << "\n";
++in;
}
}
std::cout << std::endl;
}

//////////////
// Inbound

void Listen(uint16_t i_portBase)
{
m_inboundSocket.reset(new asio::ip::tcp::socket(m_inboundStrand.get_io_service()));

asio::error_code ec;
if (m_listener.open(asio::ip::tcp::v4(), ec)
|| m_listener.bind(asio::ip::tcp::endpoint(asio::ip::tcp::v4(), i_portBase+m_n), ec)
|| m_listener.listen(-1, ec))
{
AbortIn(__LINE__); return;
}

TraceIn(__LINE__);

m_listener.async_accept(*m_inboundSocket,
m_inboundStrand.wrap([this](const asio::error_code& i_error)
{
OnInboundAccepted(i_error);
}));
}

void OnInboundAccepted(const asio::error_code& i_error)
{
TraceIn(__LINE__);

if (i_error) { AbortIn(__LINE__); return; }

asio::async_read_until(*m_inboundSocket, m_inboundRxBuf, '\n',
m_inboundStrand.wrap([this](const asio::error_code& i_err, size_t i_nRd)
{
OnInboundReadCompleted(i_err, i_nRd);
}));
}


void OnInboundReadCompleted(const asio::error_code& i_error, size_t i_nRead)
{
TraceIn(__LINE__);

if (i_error.value() != 0) { AbortIn(__LINE__); return; }
if (bool(i_error)) { AbortIn(__LINE__); return; }
if (i_nRead != 4) { AbortIn(__LINE__); return; } // "msg\n"

std::istream is(&m_inboundRxBuf);
std::string s;
if (!std::getline(is, s)) { AbortIn(__LINE__); return; }
if (s != "msg") { AbortIn(__LINE__); return; }
if (m_inboundRxBuf.in_avail() != 0) { AbortIn(__LINE__); return; }

asio::async_read_until(*m_inboundSocket, m_inboundRxBuf, '\n',
m_inboundStrand.wrap([this](const asio::error_code& i_err, size_t i_nRd)
{
OnInboundWaitCompleted(i_err, i_nRd);
}));

}

void OnInboundWaitCompleted(const asio::error_code& i_error, size_t i_nRead)
{
TraceIn(__LINE__);

if (i_error != asio::error::eof) { AbortIn(__LINE__); return; }
if (i_nRead != 0) { AbortIn(__LINE__); return; }

if (s_freeInboundSocket)
{
m_inboundSocket.reset();
}
}

//////////////
// Outbound

void Connect(std::string i_host, uint16_t i_portBase)
{
asio::error_code ec;
auto endpoint = m_resolver.resolve(asio::ip::tcp::resolver::query(i_host, std::to_string(i_portBase+m_n)), ec);
if (ec) { AbortOut(__LINE__); return; }

m_outboundSocket.reset(new asio::ip::tcp::socket(m_outboundStrand.get_io_service()));

TraceOut(__LINE__);

asio::async_connect(*m_outboundSocket, endpoint,
m_outboundStrand.wrap([this](const std::error_code& i_error, const asio::ip::tcp::resolver::iterator& i_ep)
{
OnOutboundConnected(i_error, i_ep);
}));
}

void OnOutboundConnected(const asio::error_code& i_error, const asio::ip::tcp::resolver::iterator& i_endpoint)
{
TraceOut(__LINE__);

if (i_error) { AbortOut(__LINE__); return; }

std::ostream(&m_outboundTxBuf) << "msg" << '\n';

asio::async_write(*m_outboundSocket, m_outboundTxBuf.data(),
m_outboundStrand.wrap([this](const asio::error_code& i_error, size_t i_nWritten)
{
OnOutboundWriteCompleted(i_error, i_nWritten);
}));
}

void OnOutboundWriteCompleted(const asio::error_code& i_error, size_t i_nWritten)
{
TraceOut(__LINE__);

if (i_error) { AbortOut(__LINE__); return; }
if (i_nWritten != 4) { AbortOut(__LINE__); return; } // "msg\n"

TraceOut(__LINE__);
m_outboundSocket->shutdown(asio::socket_base::shutdown_both);

asio::async_read_until(*m_outboundSocket, m_outboundRxBuf, '\n',
m_outboundStrand.wrap([this](const asio::error_code& i_error, size_t i_nRead)
{
OnOutboundWaitCompleted(i_error, i_nRead);
}));
}

void OnOutboundWaitCompleted(const asio::error_code& i_error, size_t i_nRead)
{
TraceOut(__LINE__);

if (i_error != asio::error::eof) { AbortOut(__LINE__); return; }
if (i_nRead != 0) { AbortOut(__LINE__); return; }

if (s_freeOutboundSocket)
{
m_outboundSocket.reset();
}
}

private:
//////////////
// Inbound

asio::io_service::strand m_inboundStrand;

asio::ip::tcp::acceptor m_listener;
std::unique_ptr<asio::ip::tcp::socket> m_inboundSocket;

asio::streambuf m_inboundRxBuf;
asio::streambuf m_inboundTxBuf;

//////////////
// Outbound

asio::io_service::strand m_outboundStrand;

asio::ip::tcp::resolver m_resolver;
std::unique_ptr<asio::ip::tcp::socket> m_outboundSocket;

asio::streambuf m_outboundRxBuf;
asio::streambuf m_outboundTxBuf;

//////////////
// Common

unsigned m_n;

const std::chrono::high_resolution_clock::time_point m_traceStart;
std::vector<std::pair<unsigned, std::chrono::nanoseconds>> m_inboundTrace;
std::vector<std::pair<unsigned, std::chrono::nanoseconds>> m_outboundTrace;
};

static int Usage(int i_ret)
{
std::cout << "[" << i_ret << "]" << "Usage: example <nThreads> <nConnections> <inboundFree> <outboundFree>" << std::endl;
return i_ret;
}

int main(int argc, char* argv[])
{
if (argc < 5)
return Usage(__LINE__);

const unsigned nThreads = unsigned(std::stoul(argv[1]));
if (nThreads == 0)
return Usage(__LINE__);
const unsigned nConnections = unsigned(std::stoul(argv[2]));
if (nConnections == 0)
return Usage(__LINE__);

s_freeInboundSocket = (*argv[3] == 'y');
s_freeOutboundSocket = (*argv[4] == 'y');

const uint16_t listenPortBase = 25000;
const uint16_t connectPortBase = 25000;
const std::string connectHost = "127.0.0.1";

asio::io_service ioService;

std::cout << "Creating." << std::endl;

std::list<Tester> testers;

for (unsigned i = 0; i < nConnections; ++i)
{
testers.emplace_back(ioService, i);
testers.back().Listen(listenPortBase);
testers.back().Connect(connectHost, connectPortBase);
}

std::cout << "Starting." << std::endl;

std::vector<std::thread> threads;

for (unsigned i = 0; i < nThreads; ++i)
{
threads.emplace_back([&]()
{
ioService.run();
});
}

std::cout << "Waiting." << std::endl;

for (auto& thread : threads)
{
thread.join();
}

std::cout << "Stopped." << std::endl;

return 0;
}

void DumpAllTraces(std::list<Tester>& i_testers)
{
std::map<unsigned, unsigned> counts;

for (auto& tester : i_testers)
{
tester.DumpTrace(counts);
}

std::cout << "##############################\n";
for (const auto& count : counts)
{
std::cout << count.first << " : " << count.second << "\n";
}
std::cout << std::endl;
}

#if defined(ASIO_NO_EXCEPTIONS)
namespace asio
{
namespace detail
{

template <typename Exception>
void throw_exception(const Exception& e)
{
abort();
}

} // namespace detail
} // namespace asio
#endif

我们按以下方式编译(问题仅在优化的版本中发生):
g++ -o example -m64 -g -O3 --no-exceptions --no-rtti --std=c++11 -I asio-1.10.6/include -lpthread example.cpp

我们正在Debian Jessie上运行。 uname -a报告 (Linux <hostname> 3.16.0-4-amd64 #1 SMP Debian 3.16.36-1+deb8u2 (2016-10-19) x86_64 GNU/Linux
该问题同时出现在GCC( g++ (Debian 4.9.2-10) 4.9.2)和Clang( Debian clang version 3.5.0-10 (tags/RELEASE_350/final) (based on LLVM 3.5.0))下。

[编辑添加:这也发生在带有 Linux <hostname> 4.6.0-1-amd64 #1 SMP Debian 4.6.1-1 (2016-06-06) x86_64 GNU/Linux的Debian Stretch g++ (Debian 6.2.1-5) 6.2.1 20161124上。]

总而言之,测试应用程序执行以下操作:
  • 我们创建N个连接,每个连接由一个入站(监听)组成
    端和出站(连接)端。每个入站监听器均已绑定(bind)
    到唯一的端口(从25000开始),以及每个出站连接器
    使用系统选择的原始端口。
  • 入站端执行async_accept,然后继续
    完成时发出async_read。读取完成后会发出
    我们期望另一个async_read返回eof。什么时候
    完成后,我们要么立即释放套接字,要么保持原样
    (无待处理的异步操作)由相关人员清理
    程序导出处的析构函数。 (请注意,监听器套接字是
    总是保持原样,没有等待接受的消息,直到退出。)
  • 出站端执行async_connect,并在完成问题上执行
    一个async_write。写入完成后,将发出shutdown(具体来说是shutdown(both)),然后是我们的async_read期望返回eof。完成后,我们再次离开
    套接字原样,没有待处理的操作,否则我们会立即释放它。
  • 任何错误或意外接收数据都会立即产生abort()称呼。
  • 测试应用程序使我们可以为io_service以及要创建的连接总数
    以及控制入站和出站套接字的标志
    分别被释放或保持原样。
  • 我们重复运行测试应用程序,指定50个线程和1000个线程
    连接。

    while ./example 50 1000 n y >out.txt ; do echo -n . ; done

  • 如果我们指定所有套接字保持原样,则测试循环将无限期运行。为避免使用 SO_REUSEADDR造成困惑,在开始测试之前,请注意从上一次测试运行开始,没有套接字处于 TIME_WAIT状态,否则监听可能会失败。但是,满足了这一警告,测试应用程序实际上可以运行数百甚至数千次而没有错误。同样,如果我们指定应显式释放入站套接字(而不是出站套接字),则所有进程都可以正常运行。

    但是,如果我们指定释放出站套接字,则应用程序将在执行不定数量的程序后停止运行-有时不超过十次,有时不超过一百次,通常介于两者之间。

    使用GDB连接到停滞的进程,我们看到主线程正在等待加入工作线程,除其中一个工作线程之外的所有线程都处于空闲状态(正在Asio内部条件变量上等待),并且一个工作线程正在Asio的内部等待调用 epoll()。内部跟踪工具会验证某些套接字正在等待异步操作完成-有时是初始(入站)接受,有时是(入站)数据读取,有时是通常以 eof完成的最终入站或出站读取。

    在所有情况下,连接的另一端都已成功完成其位:如果入站接受仍处于挂起状态,我们将看到相应的出站连接以及出站写入已成功完成;同样,如果入站数据读取未决,则相应的出站连接和写入也已完成;如果入站EOF读取暂挂,则说明已执行出站关机,同样,如果出站EOF读取暂挂,则由于出站关机,入站EOF读取已完成。

    检查进程的/proc/N/fdinfo显示,epoll文件描述符确实在等待检测指示的文件描述符。

    最令人困惑的是, netstat显示了正在等待的套接字的RecvQ大小为非零-也就是说,正在等待读取操作的套接字显示已接收数据或准备读取的关闭事件。这与我们的工具相符,因为它表明已将写入数据传递到入站套接字,但尚未被读取(或者,出站关闭已向入站方发出FIN,但EOF尚未这样做)尚未“阅读”)。

    这使我怀疑Asio的 epoll簿记-尤其是其边缘触发的事件管理-由于比赛条件而在某些地方不同步。显然,这很可能是由于我不正确的操作造成的,但是我看不出问题出在哪里。

    我们将不胜感激所有见解,建议,已知问题和指出明显的细节。

    [编辑添加:使用 strace捕获内核调用会干扰执行,从而不会发生停顿。使用 sysdig不会产生此效果,但是当前无法捕获 epoll_waitepoll_ctl系统调用的参数。叹。]

    最佳答案

    关于linux - Linux上的Asio停在epoll()中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41804866/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com