gpt4 book ai didi

c++ - 如何从 std::vector 中自动删除已完成的 future

转载 作者:太空狗 更新时间:2023-10-29 21:34:52 28 4
gpt4 key购买 nike

在下面的例子中,mEventExecutors 是一个 std::vector<std::future<void>> .我希望能够在完成时从 vector 中删除 future 。这能做到吗?

void RaiseEvent(EventID messageID)
{
mEventExecutors.push_back(std::move(std::async([=]{
auto eventObject = mEventListeners.find(messageID);
if (eventObject != mEventListeners.end())
{
for (auto listener : eventObject->second)
{
listener();
}
}
})
));
}

最佳答案

这个问题本身已经被另一个人回答了,但它激起了我的好奇心,想知道如何用最少的代码行实现一个功能齐全、线程安全的任务管理器。

我还想知道是否可以将任务作为 futures 等待,或者选择性地提供回调函数。

当然,这引出了一个问题,即这些 futures 是否可以使用 .then(xxx) 的性感延续语法而不是阻塞代码。

这是我的尝试。

非常感谢 Christopher Kohlhoff,boost::asio 的作者。通过研究他的出色工作,我了解了将类分成以下部分的值(value):

  • handle - 控制对象的生命周期
  • 服务 - 提供对象逻辑、对象实现之间共享的状态,并管理实现对象的生命周期(如果它们比句柄还长(任何依赖回调的通常都会这样做)),以及
  • 实现提供每个对象的状态。

下面是一个调用代码的例子:

int main() {
task_manager mgr;

// an example of using async callbacks to indicate completion and error
mgr.submit([] {
emit("task 1 is doing something");
std::this_thread::sleep_for(1s);
emit("task 1 done");
},
[](auto err) {
if (not err) {
emit("task 1 completed");
} else {
emit("task 1 failed");
}
});

// an example of returning a future (see later)
auto f = mgr.submit([] {
emit("task 2 doing something");
std::this_thread::sleep_for(1500ms);
emit("task 2 is going to throw");
throw std::runtime_error("here is an error");
}, use_future);

// an example of returning a future and then immediately using its continuation.
// note that the continuation happens on the task_manager's thread pool
mgr.submit([]
{
emit("task 3 doing something");
std::this_thread::sleep_for(500ms);
emit("task 3 is done");
},
use_future)
.then([](auto f) {
try {
f.get();
}
catch(std::exception const& e) {
emit("task 3 threw an exception: ", e.what());
}
});

// block on the future of the second example
try {
f.get();
}
catch (std::exception &e) {
emit("task 2 threw: ", e.what());
}
}

这将导致以下输出:

task 1 is doing something
task 2 doing something
task 3 doing something
task 3 is done
task 1 done
task 1 completed
task 2 is going to throw
task 2 threw: here is an error

这是完整的代码(在比 gcc 更混杂的 apple clang 上测试过,所以如果我错过了 lambda 中的 this->,我深表歉意):

#define BOOST_THREAD_PROVIDES_FUTURE 1
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION 1
#define BOOST_THREAD_PROVIDES_EXECUTORS 1

/* written by Richard Hodges 2017
* You're free to use the code, but please give credit where it's due :)
*/
#include <boost/thread/future.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp>
#include <thread>
#include <utility>
#include <unordered_map>
#include <stdexcept>
#include <condition_variable>

// I made a task an object because I thought I might want to store state in it.
// it turns out that this is not strictly necessary

struct task {

};

/*
* This is the implementation data for one task_manager
*/
struct task_manager_impl {

using mutex_type = std::mutex;
using lock_type = std::unique_lock<mutex_type>;

auto get_lock() -> lock_type {
return lock_type(mutex_);
}

auto add_task(lock_type const &lock, std::unique_ptr<task> t) {
auto id = t.get();
task_map_.emplace(id, std::move(t));
}

auto remove_task(lock_type lock, task *task_id) {
task_map_.erase(task_id);
if (task_map_.empty()) {
lock.unlock();
no_more_tasks_.notify_all();
}
}

auto wait(lock_type lock) {
no_more_tasks_.wait(lock, [this]() { return task_map_.empty(); });
}

// for this example I have chosen to express errors as exceptions
using error_type = std::exception_ptr;

mutex_type mutex_;
std::condition_variable no_more_tasks_;


std::unordered_map<task *, std::unique_ptr<task>> task_map_;
};

/*
* This stuff is the protocol to figure out whether to return a future
* or just invoke a callback.
* Total respect to Christopher Kohlhoff of asio fame for figuring this out
* I merely step in his footsteps here, with some simplifications because of c++11
*/
struct use_future_t {
};
constexpr auto use_future = use_future_t();

template<class Handler>
struct make_async_handler {
auto wrap(Handler handler) {
return handler;
}

struct result_type {
auto get() -> void {}
};

struct result_type result;
};

template<>
struct make_async_handler<const use_future_t &> {
struct shared_state_type {
boost::promise<void> promise;
};

make_async_handler() {
}

template<class Handler>
auto wrap(Handler &&) {
return [shared_state = this->shared_state](auto error) {
// boost promises deal in terms of boost::exception_ptr so we need to marshal.
// this is a small price to pay for the extra utility of boost::promise over
// std::promise
if (error) {
try {
std::rethrow_exception(error);
}
catch (...) {
shared_state->promise.set_exception(boost::current_exception());
}
} else {
shared_state->promise.set_value();
}
};
}


struct result_type {
auto get() -> boost::future<void> { return shared_state->promise.get_future(); }

std::shared_ptr<shared_state_type> shared_state;
};

std::shared_ptr<shared_state_type> shared_state = std::make_shared<shared_state_type>();
result_type result{shared_state};

};

/*
* Provides the logic of a task manager. Also notice that it maintains a boost::basic_thread_pool
* The destructor of a basic_thread_pool will not complete until all tasks are complete. So our
* program will not crash horribly at exit time.
*/
struct task_manager_service {

/*
* through this function, the service has full control over how it is created and destroyed.
*/

static auto use() -> task_manager_service&
{
static task_manager_service me {};
return me;
}

using impl_class = task_manager_impl;

struct deleter {
void operator()(impl_class *p) {
service_->destroy(p);
}

task_manager_service *service_;
};

/*
* defining impl_type in terms of a unique_ptr ensures that the handle will be
* moveable but not copyable.
* Had we used a shared_ptr, the handle would be copyable with shared semantics.
* That can be useful too.
*/
using impl_type = std::unique_ptr<impl_class, deleter>;

auto construct() -> impl_type {
return impl_type(new impl_class(),
deleter {this});
}

auto destroy(impl_class *impl) -> void {
wait(*impl);
delete impl;
}

template<class Job, class Handler>
auto submit(impl_class &impl, Job &&job, Handler &&handler) {

auto make_handler = make_async_handler<Handler>();


auto async_handler = make_handler.wrap(std::forward<Handler>(handler));

auto my_task = std::make_unique<task>();
auto task_ptr = my_task.get();

auto task_done = [
this,
task_id = task_ptr,
&impl,
async_handler
](auto error) {
async_handler(error);
this->remove_task(impl, task_id);
};
auto lock = impl.get_lock();
impl.add_task(lock, std::move(my_task));
launch(impl, task_ptr, std::forward<Job>(job), task_done);

return make_handler.result.get();
};

template<class F, class Handler>
auto launch(impl_class &, task *task_ptr, F &&f, Handler &&handler) -> void {
this->thread_pool_.submit([f, handler] {
auto error = std::exception_ptr();
try {
f();
}
catch (...) {
error = std::current_exception();
}
handler(error);
});
}


auto wait(impl_class &impl) -> void {
impl.wait(impl.get_lock());
}

auto remove_task(impl_class &impl, task *task_id) -> void {
impl.remove_task(impl.get_lock(), task_id);
}


boost::basic_thread_pool thread_pool_{std::thread::hardware_concurrency()};

};

/*
* The task manage handle. Holds the task_manager implementation plus provides access to the
* owning task_manager_service. In this case, the service is a global static object. In an io loop environment
* for example, asio, the service would be owned by the io loop.
*/
struct task_manager {

using service_type = task_manager_service;
using impl_type = service_type::impl_type;
using impl_class = decltype(*std::declval<impl_type>());

task_manager()
: service_(std::addressof(service_type::use()))
, impl_(get_service().construct()) {}

template<class Job, class Handler>
auto submit(Job &&job, Handler &&handler) {
return get_service().submit(get_impl(),
std::forward<Job>(job),
std::forward<Handler>(handler));
}

auto get_service() -> service_type & {
return *service_;
}

auto get_impl() -> impl_class & {
return *impl_;
}

private:

service_type* service_;
impl_type impl_;
};


/*
* helpful thread-safe emitter
*/
std::mutex thing_mutex;

template<class...Things>
void emit(Things &&...things) {
auto lock = std::unique_lock<std::mutex>(thing_mutex);
using expand = int[];
void(expand{0,
((std::cout << things), 0)...
});
std::cout << std::endl;
}

using namespace std::literals;

int main() {
task_manager mgr;

// an example of using async callbacks to indicate completion and error
mgr.submit([] {
emit("task 1 is doing something");
std::this_thread::sleep_for(1s);
emit("task 1 done");
},
[](auto err) {
if (not err) {
emit("task 1 completed");
} else {
emit("task 1 failed");
}
});

// an example of returning a future (see later)
auto f = mgr.submit([] {
emit("task 2 doing something");
std::this_thread::sleep_for(1500ms);
emit("task 2 is going to throw");
throw std::runtime_error("here is an error");
}, use_future);

// an example of returning a future and then immediately using its continuation.
// note that the continuation happens on the task_manager's thread pool
mgr.submit([] {
emit("task 3 doing something");
std::this_thread::sleep_for(500ms);
emit("task 3 is done");
},
use_future)
.then([](auto f) {
try {
f.get();
}
catch (std::exception const &e) {
emit("task 3 threw an exception: ", e.what());
}
});

// block on the future of the second example
try {
f.get();
}
catch (std::exception &e) {
emit("task 2 threw: ", e.what());
}
}

关于c++ - 如何从 std::vector 中自动删除已完成的 future ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45117450/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com