gpt4 book ai didi

c++ - 使用 boost::asio 时是否需要实现阻塞?

转载 作者:搜寻专家 更新时间:2023-10-31 02:07:33 26 4
gpt4 key购买 nike

我的问题是,如果我在多线程上运行io_service::run(),是否需要对这些异步函数实现阻塞?

例子:

int i = 0;
int j = 0;

void test_timer(boost::system::error_code ec)
{
//I need to lock up here ?
if (i++ == 10)
{
j = i * 10;
}
timer.expires_at(timer.expires_at() + boost::posix_time::milliseconds(500));
timer.async_wait(&test_timer);
}

void threadMain()
{
io_service.run();
}

int main()
{
boost::thread_group workers;
timer.async_wait(&test_timer);

for (int i = 0; i < 5; i++){
workers.create_thread(&threadMain);
}

io_service.run();
workers.join_all();
return 0;
}

最佳答案

async的定义是非阻塞的。

如果你想问“我是否必须同步访问来自不同线程的共享对象”——这个问题是不相关的,答案取决于为你共享的对象记录的线程安全。

对于 Asio,基本上(粗略总结)您需要将并发访问(并发,如:来自多个线程)同步到所有类型,除了boost::asio::io_context ¹,².

你的样本

您的示例使用多个线程运行 io 服务,这意味着处理程序在这些线程中的任何一个上运行。这意味着您实际上是在共享全局变量,而且它们确实需要保护。

但是 因为您的应用程序逻辑(异步调用链)规定只有一个操作永远挂起,而共享计时器对象上的下一个异步操作是总是从该链中调度,访问逻辑上全部来自单个线程(称为隐式链。参见Why do I need strand per connection when using boost::asio?

最简单的方法:

逻辑链

Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>

boost::asio::io_service io_service;
boost::asio::deadline_timer timer { io_service };

struct state_t {
int i = 0;
int j = 0;
} state;

void test_timer(boost::system::error_code ec)
{
if (ec != boost::asio::error::operation_aborted) {
{
if (state.i++ == 10) {
state.j = state.i * 10;
if (state.j > 100)
return; // stop after 5 seconds
}
}
timer.expires_at(timer.expires_at() + boost::posix_time::milliseconds(50));
timer.async_wait(&test_timer);
}
}

int main()
{
boost::thread_group workers;
timer.expires_from_now(boost::posix_time::milliseconds(50));
timer.async_wait(&test_timer);

for (int i = 0; i < 5; i++){
workers.create_thread([] { io_service.run(); });
}

workers.join_all();
std::cout << "i = " << state.i << std::endl;
std::cout << "j = " << state.j << std::endl;
}

Note I removed the io_service::run() from the main thread as it is redundant with the join() (unless you really wanted 6 threads running the handlers, not 5).

打印

i = 11
j = 110

警告

这里潜伏着一个陷阱。比方说,您不想像我一样以固定数量退出,但想停下来,您可能会想这样做:

timer.cancel();

来自 main .那是合法的,因为deadline_timer对象不是线程安全的。你需要要么

  • 使用全局 atomic_bool发出终止请求信号
  • 发布timer.cancel()在与定时器异步链相同的 strand 上。但是,只有一个显式链,因此如果不更改代码以使用显式链,您将无法做到这一点。

更多计时器

让我们通过使用两个计时器来使事情复杂化,这两个计时器具有它们自己的隐式链。这意味着对定时器实例的访问仍然不需要同步,但是对 i 的访问和 j 确实需要。

Note In this demo I use synchronized_value<> for elegance. You can write similar logic manually using mutex and lock_guard.

Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/synchronized_value.hpp>
#include <iostream>

boost::asio::io_service io_service;

struct state {
int i = 0;
int j = 0;
};

boost::synchronized_value<state> shared_state;

struct TimerChain {
boost::asio::deadline_timer _timer;

TimerChain() : _timer{io_service} {
_timer.expires_from_now(boost::posix_time::milliseconds(50));
resume();
}

void resume() {
_timer.async_wait(boost::bind(&TimerChain::test_timer, this, _1));
};

void test_timer(boost::system::error_code ec)
{
if (ec != boost::asio::error::operation_aborted) {
{
auto state = shared_state.synchronize();
if (state->i++ == 10) {
state->j = state->i * 10;
}
if (state->j > 100) return; // stop after some iterations
}
_timer.expires_at(_timer.expires_at() + boost::posix_time::milliseconds(50));
resume();
}
}
};

int main()
{
boost::thread_group workers;
TimerChain timer1;
TimerChain timer2;

for (int i = 0; i < 5; i++){
workers.create_thread([] { io_service.run(); });
}

workers.join_all();
auto state = shared_state.synchronize();
std::cout << "i = " << state->i << std::endl;
std::cout << "j = " << state->j << std::endl;
}

打印

i = 12
j = 110

添加显式链

现在添加它们非常简单:

struct TimerChain {
boost::asio::io_service::strand _strand;
boost::asio::deadline_timer _timer;

TimerChain() : _strand{io_service}, _timer{io_service} {
_timer.expires_from_now(boost::posix_time::milliseconds(50));
resume();
}

void resume() {
_timer.async_wait(_strand.wrap(boost::bind(&TimerChain::test_timer, this, _1)));
};

void stop() { // thread safe
_strand.post([this] { _timer.cancel(); });
}

// ...

Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/synchronized_value.hpp>
#include <iostream>

boost::asio::io_service io_service;

struct state {
int i = 0;
int j = 0;
};

boost::synchronized_value<state> shared_state;

struct TimerChain {
boost::asio::io_service::strand _strand;
boost::asio::deadline_timer _timer;

TimerChain() : _strand{io_service}, _timer{io_service} {
_timer.expires_from_now(boost::posix_time::milliseconds(50));
resume();
}

void resume() {
_timer.async_wait(_strand.wrap(boost::bind(&TimerChain::test_timer, this, _1)));
};

void stop() { // thread safe
_strand.post([this] { _timer.cancel(); });
}

void test_timer(boost::system::error_code ec)
{
if (ec != boost::asio::error::operation_aborted) {
{
auto state = shared_state.synchronize();
if (state->i++ == 10) {
state->j = state->i * 10;
}
}
// continue indefinitely
_timer.expires_at(_timer.expires_at() + boost::posix_time::milliseconds(50));
resume();
}
}
};

int main()
{
boost::thread_group workers;
TimerChain timer1;
TimerChain timer2;

for (int i = 0; i < 5; i++){
workers.create_thread([] { io_service.run(); });
}

boost::this_thread::sleep_for(boost::chrono::seconds(10));
timer1.stop();
timer2.stop();

workers.join_all();

auto state = shared_state.synchronize();
std::cout << "i = " << state->i << std::endl;
std::cout << "j = " << state->j << std::endl;
}

打印

i = 400
j = 110

¹(或使用旧名称 boost::asio::io_service)

² 在这方面,生命周期突变不被视为成员操作(即使对于线程安全对象,您也必须手动同步共享对象的构造/销毁)

关于c++ - 使用 boost::asio 时是否需要实现阻塞?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48507883/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com