gpt4 book ai didi

c++ - 这个 boost::asio 和 boost::coroutine 使用模式有什么问题?

转载 作者:塔克拉玛干 更新时间:2023-11-02 23:45:27 25 4
gpt4 key购买 nike

this问题 我描述了 boost::asioboost::coroutine 使用模式,这导致我的应用程序随机崩溃,我发布了我的代码和 valgrindGDB 输出。

为了进一步调查问题,我创建了较小的概念验证应用程序,它应用了相同的模式。我看到我在此处发布的源代码较小的程序中出现了同样的问题。

代码启动了几个线程并创建了一个带有几个虚拟连接(用户提供的数字)的连接池。附加参数是无符号整数,它扮演伪请求的角色。 sendRequest 函数的虚拟实现只是启动异步计时器,等待秒数等于输入数和函数的 yileds

有人能看出这段代码的问题吗?他能提出一些修复建议吗?

#include "asiocoroutineutils.h"
#include "concurrentqueue.h"

#include <iostream>
#include <thread>

#include <boost/lexical_cast.hpp>

using namespace std;
using namespace boost;
using namespace utils;

#define id this_thread::get_id() << ": "

// ---------------------------------------------------------------------------

/*!
* \brief This is a fake Connection class
*/
class Connection
{
public:
Connection(unsigned connectionId)
: _id(connectionId)
{
}

unsigned getId() const
{
return _id;
}

void sendRequest(asio::io_service& ioService,
unsigned seconds,
AsioCoroutineJoinerProxy,
asio::yield_context yield)
{
cout << id << "Connection " << getId()
<< " Start sending: " << seconds << endl;

// waiting on this timer is palceholder for any asynchronous operation
asio::steady_timer timer(ioService);
timer.expires_from_now(chrono::seconds(seconds));
coroutineAsyncWait(timer, yield);

cout << id << "Connection " << getId()
<< " Received response: " << seconds << endl;
}

private:
unsigned _id;
};

typedef std::unique_ptr<Connection> ConnectionPtr;
typedef std::shared_ptr<asio::steady_timer> TimerPtr;

// ---------------------------------------------------------------------------

class ConnectionPool
{
public:
ConnectionPool(size_t connectionsCount)
{
for(size_t i = 0; i < connectionsCount; ++i)
{
cout << "Creating connection: " << i << endl;
_connections.emplace_back(new Connection(i));
}
}

ConnectionPtr getConnection(TimerPtr timer,
asio::yield_context& yield)
{
lock_guard<mutex> lock(_mutex);

while(_connections.empty())
{
cout << id << "There is no free connection." << endl;

_timers.emplace_back(timer);
timer->expires_from_now(
asio::steady_timer::clock_type::duration::max());

_mutex.unlock();
coroutineAsyncWait(*timer, yield);
_mutex.lock();

cout << id << "Connection was freed." << endl;
}

cout << id << "Getting connection: "
<< _connections.front()->getId() << endl;

ConnectionPtr connection = std::move(_connections.front());
_connections.pop_front();
return connection;
}

void addConnection(ConnectionPtr connection)
{
lock_guard<mutex> lock(_mutex);

cout << id << "Returning connection " << connection->getId()
<< " to the pool." << endl;

_connections.emplace_back(std::move(connection));

if(_timers.empty())
return;

auto timer = _timers.back();
_timers.pop_back();
auto& ioService = timer->get_io_service();

ioService.post([timer]()
{
cout << id << "Wake up waiting getConnection." << endl;
timer->cancel();
});
}

private:
mutex _mutex;
deque<ConnectionPtr> _connections;
deque<TimerPtr> _timers;
};

typedef unique_ptr<ConnectionPool> ConnectionPoolPtr;

// ---------------------------------------------------------------------------

class ScopedConnection
{
public:
ScopedConnection(ConnectionPool& pool,
asio::io_service& ioService,
asio::yield_context& yield)
: _pool(pool)
{
auto timer = make_shared<asio::steady_timer>(ioService);
_connection = _pool.getConnection(timer, yield);
}

Connection& get()
{
return *_connection;
}

~ScopedConnection()
{
_pool.addConnection(std::move(_connection));
}

private:
ConnectionPool& _pool;
ConnectionPtr _connection;
};

// ---------------------------------------------------------------------------

void sendRequest(asio::io_service& ioService,
ConnectionPool& pool,
unsigned seconds,
asio::yield_context yield)
{
cout << id << "Constructing request ..." << endl;

AsioCoroutineJoiner joiner(ioService);

ScopedConnection connection(pool, ioService, yield);

asio::spawn(ioService, bind(&Connection::sendRequest,
connection.get(),
std::ref(ioService),
seconds,
AsioCoroutineJoinerProxy(joiner),
placeholders::_1));

joiner.join(yield);

cout << id << "Processing response ..." << endl;
}

// ---------------------------------------------------------------------------

void threadFunc(ConnectionPool& pool,
ConcurrentQueue<unsigned>& requests)
{
try
{
asio::io_service ioService;

while(true)
{
unsigned request;
if(!requests.tryPop(request))
break;

cout << id << "Scheduling request: " << request << endl;

asio::spawn(ioService, bind(sendRequest,
std::ref(ioService),
std::ref(pool),
request,
placeholders::_1));
}

ioService.run();
}
catch(const std::exception& e)
{
cerr << id << "Error: " << e.what() << endl;
}
}

// ---------------------------------------------------------------------------

int main(int argc, char* argv[])
{
if(argc < 3)
{
cout << "Usage: ./async_request poolSize threadsCount r0 r1 ..."
<< endl;
return -1;
}

try
{
auto poolSize = lexical_cast<size_t>(argv[1]);
auto threadsCount = lexical_cast<size_t>(argv[2]);

ConcurrentQueue<unsigned> requests;
for(int i = 3; i < argc; ++i)
{
auto request = lexical_cast<unsigned>(argv[i]);
requests.tryPush(request);
}

ConnectionPoolPtr pool(new ConnectionPool(poolSize));

vector<unique_ptr<thread>> threads;
for(size_t i = 0; i < threadsCount; ++i)
{
threads.emplace_back(
new thread(threadFunc, std::ref(*pool), std::ref(requests)));
}

for_each(threads.begin(), threads.end(), mem_fn(&thread::join));
}
catch(const std::exception& e)
{
cerr << "Error: " << e.what() << endl;
}

return 0;
}

以下是上述代码使用的一些辅助工具:

#pragma once

#include <boost/asio/steady_timer.hpp>
#include <boost/asio/spawn.hpp>

namespace utils
{

inline void coroutineAsyncWait(boost::asio::steady_timer& timer,
boost::asio::yield_context& yield)
{
boost::system::error_code ec;
timer.async_wait(yield[ec]);
if(ec && ec != boost::asio::error::operation_aborted)
throw std::runtime_error(ec.message());
}

class AsioCoroutineJoiner
{
public:
explicit AsioCoroutineJoiner(boost::asio::io_service& io)
: _timer(io), _count(0) {}

void join(boost::asio::yield_context yield)
{
assert(_count > 0);
_timer.expires_from_now(
boost::asio::steady_timer::clock_type::duration::max());
coroutineAsyncWait(_timer, yield);
}

void inc()
{
++_count;
}

void dec()
{
assert(_count > 0);
--_count;
if(0 == _count)
_timer.cancel();
}

private:
boost::asio::steady_timer _timer;
std::size_t _count;

}; // AsioCoroutineJoiner class

class AsioCoroutineJoinerProxy
{
public:
AsioCoroutineJoinerProxy(AsioCoroutineJoiner& joiner)
: _joiner(joiner)
{
_joiner.inc();
}

AsioCoroutineJoinerProxy(const AsioCoroutineJoinerProxy& joinerProxy)
: _joiner(joinerProxy._joiner)
{
_joiner.inc();
}

~AsioCoroutineJoinerProxy()
{
_joiner.dec();
}

private:
AsioCoroutineJoiner& _joiner;

}; // AsioCoroutineJoinerProxy class

} // utils namespace

为了代码的完整性,最后缺少的部分是 ConcurrentQueue 类。贴在这里太长了,但是如果你想要的话可​​以找到它here .

应用程序的示例用法是:

./connectionpooltest 3 3 5 7 8 1 0 9 2 4 3 6

其中第一个数字 3 是假连接数,第二个数字 3 是使用的线程数。后面的数字是虚假请求。

valgrindGDB 的输出和上面提到的一样question .

使用的 boost 版本是 1.57。编译器是 GCC 4.8.3。操作系统为CentOS Linux release 7.1.1503

最佳答案

似乎所有 valgrind 错误都是由于 BOOST_USE_VALGRIND 宏未定义为 Tanner Sansbury 在与 this 相关的注释中指出的题。似乎除此之外程序都是正确的。

关于c++ - 这个 boost::asio 和 boost::coroutine 使用模式有什么问题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31639888/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com