gpt4 book ai didi

c++ - 使用线程超时 boost ASIO 线程池

转载 作者:行者123 更新时间:2023-11-28 05:32:53 25 4
gpt4 key购买 nike

我将 Boost ASIO 库用作线程池,这被广泛描述。但是,如果线程处理的时间超过 1 秒,我想中断每个线程并转到该线程的下一个已发布任务。

我可以使用一个单独的 deadline_timer 轻松实现这一点,如果线程在截止日期之前完成,它会被重置,或者如果任务持续时间过长则中断线程。但是我认为这将内置到 ASIO 中。有一个任务似乎很自然,网络操作超时。但是我在 API 中看不到它的任何内容,简而言之。

谁能告诉我这个功能是否已经存在?还是应该按照我描述的方式实现?

最佳答案

这是我一起敲定的快速解决方案。

它要求您提交的函数对象接受类型为 exec_context 的参数。

在 io_service 中运行的任务可以查询 .canceled() 访问器(它是原子的)以确定它是否应该提前取消。

然后它可以抛出异常或返回它打算返回的任何值。

调用者通过submit 函数提交。此函数用上下文对象包装辅助函数,并将其返回值和/或异常编码到 std::future 中。

然后,调用者可以根据需要查询或等待(或忽略)这个 future 。

调用者得到一个句柄对象,它上面有方法cancel()。使用此句柄,调用者可以取消、查询或等待提交的任务。

希望对您有所帮助。写起来很有趣。

#include <boost/asio.hpp>
#include <iostream>
#include <atomic>
#include <thread>
#include <chrono>
#include <future>
#include <stdexcept>
#include <exception>
#include <utility>
#include <type_traits>


//
// an object to allow the caller to communicate a cancellation request to the
// submitted task
//
struct exec_controller
{
/// @returns previous cancellation request state;
bool notify_cancel()
{
return _should_cancel.exchange(true);
}

bool should_cancel() const {
return _should_cancel;
}

private:
std::atomic<bool> _should_cancel = { false };
};

template<class Ret>
struct exec_state : std::enable_shared_from_this<exec_state<Ret>>
{
using return_type = Ret;

bool notify_cancel() {
return _controller.notify_cancel();
}

std::shared_ptr<exec_controller>
get_controller_ptr() {
return std::shared_ptr<exec_controller>(this->shared_from_this(),
std::addressof(_controller));
}

std::promise<return_type>& promise() { return _promise; }

private:
std::promise<return_type> _promise;
exec_controller _controller;
};

struct applyer;

struct exec_context
{
exec_context(std::shared_ptr<exec_controller> impl)
: _impl(impl)
{}

bool canceled() const {
return _impl->should_cancel();
}

private:
friend applyer;
std::shared_ptr<exec_controller> _impl;
};

struct applyer
{
template<class F, class Ret>
void operator()(F& f, std::shared_ptr<exec_state<Ret>> const& p) const
{
try {
p->promise().set_value(f(exec_context { p->get_controller_ptr() }));
}
catch(...) {
p->promise().set_exception(std::current_exception());
}
}

template<class F>
void operator()(F& f, std::shared_ptr<exec_state<void>> const& p) const
{
try {
f(exec_context { p->get_controller_ptr() });
p->promise().set_value();
}
catch(...) {
p->promise().set_exception(std::current_exception());
}
}
};

template<class Ret>
struct exec_result
{
using return_type = Ret;
exec_result(std::shared_ptr<exec_state<return_type>> p)
: _impl(p)
{}

bool cancel() {
return _impl->notify_cancel();
}

std::future<Ret>& get_future()
{
return _future;
}

private:

std::shared_ptr<exec_state<return_type>> _impl;
std::future<return_type> _future { _impl->promise().get_future() };
};


template<class Executor, class F>
auto submit(Executor& exec, F&& f)
{
using function_type = std::decay_t<F>;
using result_type = std::result_of_t<function_type(exec_context)>;
using state_type = exec_state<result_type>;
auto shared_state = std::make_shared<state_type>();
exec.post([shared_state, f = std::forward<F>(f)]
{
applyer()(f, shared_state);
});
return exec_result<result_type>(std::move(shared_state));
}


int main()
{
using namespace std::literals;

boost::asio::io_service ios;
boost::asio::io_service::strand strand(ios);
boost::asio::io_service::work work(ios);

std::thread runner([&] { ios.run(); });
std::thread runner2([&] { ios.run(); });

auto func = [](auto context)
{
for(int i = 0 ; i < 1000 ; ++i)
{
if (context.canceled())
throw std::runtime_error("canceled");
std::this_thread::sleep_for(100ms);
}
};

auto handle = submit(strand, func);
auto handle2 = submit(ios, [](auto context) { return 2 + 2; });
// cancel the handle, or wait on it as you wish

std::this_thread::sleep_for(1s);
handle.cancel();
handle2.cancel(); // prove that late cancellation is a nop
try {
std::cout << "2 + 2 is " << handle2.get_future().get() << std::endl;
}
catch(std::exception& e)
{
std::cerr << "failed to add 2 + 2 : " << e.what() << std::endl;
}
try {
handle.get_future().get();
std::cout << "task completed" << std::endl;
}
catch(std::exception const& e) {
std::cout << "task threw exception: " << e.what() << std::endl;
}

ios.stop();
runner.join();
runner2.join();
}

更新:v2 为类添加了一些隐私保护,演示了 2 个同步任务。

预期输出:

2 + 2 is 4
task threw exception: canceled

关于c++ - 使用线程超时 boost ASIO 线程池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39024901/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com