gpt4 book ai didi

c++ - 是什么导致 boost::coroutine 随机崩溃?

转载 作者:行者123 更新时间:2023-11-30 03:50:42 25 4
gpt4 key购买 nike

我有一个多线程应用程序,它通过在 boost::asio 中的集成使用 boost::asioboost::coroutine。每个线程都有自己的 io_service 对象。线程之间唯一的共享状态是连接池,当从连接池获取连接或从连接池返回连接时,连接池被 mutex 锁定。当池中没有足够的连接时,我将无限 asio::steady_tiemer 插入池的内部结构并异步等待它,然后我从协程函数中yield。当其他线程返回到池的连接时,它检查是否有等待计时器,它从内部结构中获取等待计时器,它获取它的 io_service 对象并发送一个 lambda 来唤醒计时器以恢复暂停协程。我在应用程序中随机崩溃。我尝试使用 valgrind 调查问题。它发现了一些问题,但我无法理解它们,因为它们发生在 boost::coroutineboost::asio 内部。以下是我的代码和 valgrind 输出的片段。有人可以看到并解释问题吗?

调用代码如下:

template <class ContextsType>
void executeRequests(ContextsType& avlRequestContexts)
{
AvlRequestDataList allRequests;
for(auto& requestContext : avlRequestContexts)
{
if(!requestContext.pullProvider || !requestContext.toAskGDS())
continue;

auto& requests = requestContext.pullProvider->getRequestsData();
copy(requests.begin(), requests.end(), back_inserter(allRequests));
}

if(allRequests.size() == 0)
return;

boost::asio::io_service ioService;
curl::AsioMultiplexer multiplexer(ioService);

for(auto& request : allRequests)
{
using namespace boost::asio;

spawn(ioService, [&multiplexer, &request](yield_context yield)
{
request->prepare(multiplexer, yield);
});
}

while(true)
{
try
{
VLOG_DEBUG(avlGeneralLogger, "executeRequests: Starting ASIO event loop.");
ioService.run();
VLOG_DEBUG(avlGeneralLogger, "executeRequests: ASIO event loop finished.");
break;
}
catch(const std::exception& e)
{
VLOG_ERROR(avlGeneralLogger, "executeRequests: Error while executing GDS request: " << e.what());
}
catch(...)
{
VLOG_ERROR(avlGeneralLogger, "executeRequests: Unknown error while executing GDS request.");
}
}
}

这是在生成的 lambda 中调用的 prepare 函数实现:

void AvlRequestData::prepareImpl(curl::AsioMultiplexer& multiplexer,
boost::asio::yield_context yield)
{
auto& ioService = multiplexer.getIoService();
_connection = _pool.getConnection(ioService, yield);
_connection->prepareRequest(xmlRequest, xmlResponse, requestTimeoutMS);

multiplexer.addEasyHandle(_connection->getHandle(),
[this](const curl::EasyHandleResult& result)
{
if(0 == result.responseCode)
returnQuota();
VLOG_DEBUG(lastSeatLogger, "Response " << id << ": " << xmlResponse);
_pool.addConnection(std::move(_connection));
});
}


void AvlRequestData::prepare(curl::AsioMultiplexer& multiplexer,
boost::asio::yield_context yield)
{
try
{
prepareImpl(multiplexer, yield);
}
catch(const std::exception& e)
{
VLOG_ERROR(lastSeatLogger, "Error wile preparing request: " << e.what());
returnQuota();
}
catch(...)
{
VLOG_ERROR(lastSeatLogger, "Unknown error while preparing request.");
returnQuota();
}
}

returnQuota 函数是 AvlRequestData 类的纯虚方法,它在我所有测试中使用的 TravelportRequestData 类的实现是以下内容:

void returnQuota() const override
{
auto& avlQuotaManager = AvlQuotaManager::getInstance();
avlQuotaManager.consumeQuotaTravelport(-1);
}

下面是连接池的pushpop方法

auto AvlConnectionPool::getConnection(
TimerPtr timer,
asio::yield_context yield) -> ConnectionPtr
{
lock_guard<mutex> lock(_mutex);

while(_connections.empty())
{
_timers.emplace_back(timer);
timer->expires_from_now(
asio::steady_timer::clock_type::duration::max());

_mutex.unlock();
coroutineAsyncWait(*timer, yield);
_mutex.lock();
}

ConnectionPtr connection = std::move(_connections.front());
_connections.pop_front();

VLOG_TRACE(defaultLogger, str(format("Getted connection from pool: %s. Connections count %d.")
% _connectionPoolName % _connections.size()));

++_connectionsGiven;

return connection;
}

void AvlConnectionPool::addConnection(ConnectionPtr connection,
Side side /* = Back */)
{
lock_guard<mutex> lock(_mutex);

if(Front == side)
_connections.emplace_front(std::move(connection));
else
_connections.emplace_back(std::move(connection));

VLOG_TRACE(defaultLogger, str(format("Added connection to pool: %s. Connections count %d.")
% _connectionPoolName % _connections.size()));

if(_timers.empty())
return;

auto timer = _timers.back();
_timers.pop_back();

auto& ioService = timer->get_io_service();
ioService.post([timer](){ timer->cancel(); });

VLOG_TRACE(defaultLogger, str(format("Connection pool %s: Waiting thread resumed.")
% _connectionPoolName));
}

这是 coroutineAsyncWait 的实现。

inline void coroutineAsyncWait(boost::asio::steady_timer& timer,
boost::asio::yield_context yield)
{
boost::system::error_code ec;
timer.async_wait(yield[ec]);
if(ec && ec != boost::asio::error::operation_aborted)
throw std::runtime_error(ec.message());
}

最后是 valgrind 输出的第一部分:

==8189== Thread 41:
==8189== Invalid read of size 8
==8189== at 0x995F84: void boost::coroutines::detail::trampoline_push_void, void, boost::asio::detail::coro_entry_point, void (anonymous namespace)::executeRequests > >(std::vector<(anonymous namespace)::AvlRequestContext, std::allocator<(anonymous namespace)::AvlRequestContext> >&)::{lambda(boost::asio::basic_yield_context >)#1}>&, boost::coroutines::basic_standard_stack_allocator > >(long) (trampoline_push.hpp:65)
==8189== Address 0x2e3b5528 is not stack'd, malloc'd or (recently) free'd

当我使用附加了调试器的 valgrind 时,它会在 boost::coroutine 库的 trampoline_push.hpp 中的以下函数中停止。

53│ template< typename Coro >
54│ void trampoline_push_void( intptr_t vp)
55│ {
56│ typedef typename Coro::param_type param_type;
57│
58│ BOOST_ASSERT( vp);
59│
60│ param_type * param(
61│ reinterpret_cast< param_type * >( vp) );
62│ BOOST_ASSERT( 0 != param);
63│
64│ Coro * coro(
65├> reinterpret_cast< Coro * >( param->coro) );
66│ BOOST_ASSERT( 0 != coro);
67│
68│ coro->run();
69│ }

最佳答案

最终我发现,当需要删除对象时,如果没有正确使用shared_ptr和weak_ptr,boost::asio并不能优雅地处理它。当确实发生崩溃时,它们很难调试,因为很难查看 io_service 队列在发生故障时正在做什么。

在最近完成一个完整的异步客户端架构并遇到随机崩溃问题后,我有一些提示可以提供。不幸的是,我不知道这些是否能解决您的问题,但希望它能在正确的方向上提供一个良好的开端。

Boost Asio 协程使用技巧

  1. 使用 boost::asio::asio_handler_invoke 而不是 io_service.post():

    auto& ioService = timer->get_io_service();

    ioService.post(timer{ timer->cancel(); });

    在协程中使用 post/dispatch 通常不是一个好主意。从协程调用时,请始终使用 asio_handler_invoke。然而,在这种情况下,您可以安全地调用 timer->cancel(),而无需将其发布到消息循环中。

  2. 您的计时器似乎没有使用 shared_ptr 对象。无论应用程序的其余部分发生了什么,都无法确定何时应销毁这些对象。我强烈建议对所有计时器对象使用 shared_ptr 对象。此外,任何指向类方法的指针也应使用 shared_from_this()。如果 this 被破坏(在堆栈上)或超出 shared_ptr 中其他地方的范围,使用普通 this 可能会非常危险。无论您做什么,都不要在对象的构造函数中使用 shared_from_this()!

    如果在执行 io_service 中的处理程序时出现崩溃,但处理程序的一部分不再有效,这将是一件非常难以调试的事情。泵入 io_service 的处理程序对象包括任何指向计时器的指针,或指向执行处理程序可能需要的对象的指针。

    我强烈建议过分使用 shared_ptr 对象包裹任何 asio 类。如果问题消失,那么它的销毁顺序可能会出现问题。

  3. 故障地址是在堆上的某个地方还是指向堆栈?这将帮助您诊断它是否是一个对象在错误的时间超出了方法的范围,或者它是否是其他东西。例如,这向我证明,即使在单线程应用程序中,我的所有计时器也必须成为 shared_ptr 对象。

关于c++ - 是什么导致 boost::coroutine 随机崩溃?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31610415/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com