- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
在下面的例子中,mEventExecutors 是一个 std::vector<std::future<void>>
.我希望能够在完成时从 vector 中删除 future 。这能做到吗?
void RaiseEvent(EventID messageID)
{
mEventExecutors.push_back(std::move(std::async([=]{
auto eventObject = mEventListeners.find(messageID);
if (eventObject != mEventListeners.end())
{
for (auto listener : eventObject->second)
{
listener();
}
}
})
));
}
最佳答案
这个问题本身已经被另一个人回答了,但它激起了我的好奇心,想知道如何用最少的代码行实现一个功能齐全、线程安全的任务管理器。
我还想知道是否可以将任务作为 futures 等待,或者选择性地提供回调函数。
当然,这引出了一个问题,即这些 futures 是否可以使用 .then(xxx)
的性感延续语法而不是阻塞代码。
这是我的尝试。
非常感谢 Christopher Kohlhoff,boost::asio
的作者。通过研究他的出色工作,我了解了将类分成以下部分的值(value):
下面是一个调用代码的例子:
int main() {
task_manager mgr;
// an example of using async callbacks to indicate completion and error
mgr.submit([] {
emit("task 1 is doing something");
std::this_thread::sleep_for(1s);
emit("task 1 done");
},
[](auto err) {
if (not err) {
emit("task 1 completed");
} else {
emit("task 1 failed");
}
});
// an example of returning a future (see later)
auto f = mgr.submit([] {
emit("task 2 doing something");
std::this_thread::sleep_for(1500ms);
emit("task 2 is going to throw");
throw std::runtime_error("here is an error");
}, use_future);
// an example of returning a future and then immediately using its continuation.
// note that the continuation happens on the task_manager's thread pool
mgr.submit([]
{
emit("task 3 doing something");
std::this_thread::sleep_for(500ms);
emit("task 3 is done");
},
use_future)
.then([](auto f) {
try {
f.get();
}
catch(std::exception const& e) {
emit("task 3 threw an exception: ", e.what());
}
});
// block on the future of the second example
try {
f.get();
}
catch (std::exception &e) {
emit("task 2 threw: ", e.what());
}
}
这将导致以下输出:
task 1 is doing something
task 2 doing something
task 3 doing something
task 3 is done
task 1 done
task 1 completed
task 2 is going to throw
task 2 threw: here is an error
这是完整的代码(在比 gcc 更混杂的 apple clang 上测试过,所以如果我错过了 lambda 中的 this->,我深表歉意):
#define BOOST_THREAD_PROVIDES_FUTURE 1
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION 1
#define BOOST_THREAD_PROVIDES_EXECUTORS 1
/* written by Richard Hodges 2017
* You're free to use the code, but please give credit where it's due :)
*/
#include <boost/thread/future.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp>
#include <thread>
#include <utility>
#include <unordered_map>
#include <stdexcept>
#include <condition_variable>
// I made a task an object because I thought I might want to store state in it.
// it turns out that this is not strictly necessary
struct task {
};
/*
* This is the implementation data for one task_manager
*/
struct task_manager_impl {
using mutex_type = std::mutex;
using lock_type = std::unique_lock<mutex_type>;
auto get_lock() -> lock_type {
return lock_type(mutex_);
}
auto add_task(lock_type const &lock, std::unique_ptr<task> t) {
auto id = t.get();
task_map_.emplace(id, std::move(t));
}
auto remove_task(lock_type lock, task *task_id) {
task_map_.erase(task_id);
if (task_map_.empty()) {
lock.unlock();
no_more_tasks_.notify_all();
}
}
auto wait(lock_type lock) {
no_more_tasks_.wait(lock, [this]() { return task_map_.empty(); });
}
// for this example I have chosen to express errors as exceptions
using error_type = std::exception_ptr;
mutex_type mutex_;
std::condition_variable no_more_tasks_;
std::unordered_map<task *, std::unique_ptr<task>> task_map_;
};
/*
* This stuff is the protocol to figure out whether to return a future
* or just invoke a callback.
* Total respect to Christopher Kohlhoff of asio fame for figuring this out
* I merely step in his footsteps here, with some simplifications because of c++11
*/
struct use_future_t {
};
constexpr auto use_future = use_future_t();
template<class Handler>
struct make_async_handler {
auto wrap(Handler handler) {
return handler;
}
struct result_type {
auto get() -> void {}
};
struct result_type result;
};
template<>
struct make_async_handler<const use_future_t &> {
struct shared_state_type {
boost::promise<void> promise;
};
make_async_handler() {
}
template<class Handler>
auto wrap(Handler &&) {
return [shared_state = this->shared_state](auto error) {
// boost promises deal in terms of boost::exception_ptr so we need to marshal.
// this is a small price to pay for the extra utility of boost::promise over
// std::promise
if (error) {
try {
std::rethrow_exception(error);
}
catch (...) {
shared_state->promise.set_exception(boost::current_exception());
}
} else {
shared_state->promise.set_value();
}
};
}
struct result_type {
auto get() -> boost::future<void> { return shared_state->promise.get_future(); }
std::shared_ptr<shared_state_type> shared_state;
};
std::shared_ptr<shared_state_type> shared_state = std::make_shared<shared_state_type>();
result_type result{shared_state};
};
/*
* Provides the logic of a task manager. Also notice that it maintains a boost::basic_thread_pool
* The destructor of a basic_thread_pool will not complete until all tasks are complete. So our
* program will not crash horribly at exit time.
*/
struct task_manager_service {
/*
* through this function, the service has full control over how it is created and destroyed.
*/
static auto use() -> task_manager_service&
{
static task_manager_service me {};
return me;
}
using impl_class = task_manager_impl;
struct deleter {
void operator()(impl_class *p) {
service_->destroy(p);
}
task_manager_service *service_;
};
/*
* defining impl_type in terms of a unique_ptr ensures that the handle will be
* moveable but not copyable.
* Had we used a shared_ptr, the handle would be copyable with shared semantics.
* That can be useful too.
*/
using impl_type = std::unique_ptr<impl_class, deleter>;
auto construct() -> impl_type {
return impl_type(new impl_class(),
deleter {this});
}
auto destroy(impl_class *impl) -> void {
wait(*impl);
delete impl;
}
template<class Job, class Handler>
auto submit(impl_class &impl, Job &&job, Handler &&handler) {
auto make_handler = make_async_handler<Handler>();
auto async_handler = make_handler.wrap(std::forward<Handler>(handler));
auto my_task = std::make_unique<task>();
auto task_ptr = my_task.get();
auto task_done = [
this,
task_id = task_ptr,
&impl,
async_handler
](auto error) {
async_handler(error);
this->remove_task(impl, task_id);
};
auto lock = impl.get_lock();
impl.add_task(lock, std::move(my_task));
launch(impl, task_ptr, std::forward<Job>(job), task_done);
return make_handler.result.get();
};
template<class F, class Handler>
auto launch(impl_class &, task *task_ptr, F &&f, Handler &&handler) -> void {
this->thread_pool_.submit([f, handler] {
auto error = std::exception_ptr();
try {
f();
}
catch (...) {
error = std::current_exception();
}
handler(error);
});
}
auto wait(impl_class &impl) -> void {
impl.wait(impl.get_lock());
}
auto remove_task(impl_class &impl, task *task_id) -> void {
impl.remove_task(impl.get_lock(), task_id);
}
boost::basic_thread_pool thread_pool_{std::thread::hardware_concurrency()};
};
/*
* The task manage handle. Holds the task_manager implementation plus provides access to the
* owning task_manager_service. In this case, the service is a global static object. In an io loop environment
* for example, asio, the service would be owned by the io loop.
*/
struct task_manager {
using service_type = task_manager_service;
using impl_type = service_type::impl_type;
using impl_class = decltype(*std::declval<impl_type>());
task_manager()
: service_(std::addressof(service_type::use()))
, impl_(get_service().construct()) {}
template<class Job, class Handler>
auto submit(Job &&job, Handler &&handler) {
return get_service().submit(get_impl(),
std::forward<Job>(job),
std::forward<Handler>(handler));
}
auto get_service() -> service_type & {
return *service_;
}
auto get_impl() -> impl_class & {
return *impl_;
}
private:
service_type* service_;
impl_type impl_;
};
/*
* helpful thread-safe emitter
*/
std::mutex thing_mutex;
template<class...Things>
void emit(Things &&...things) {
auto lock = std::unique_lock<std::mutex>(thing_mutex);
using expand = int[];
void(expand{0,
((std::cout << things), 0)...
});
std::cout << std::endl;
}
using namespace std::literals;
int main() {
task_manager mgr;
// an example of using async callbacks to indicate completion and error
mgr.submit([] {
emit("task 1 is doing something");
std::this_thread::sleep_for(1s);
emit("task 1 done");
},
[](auto err) {
if (not err) {
emit("task 1 completed");
} else {
emit("task 1 failed");
}
});
// an example of returning a future (see later)
auto f = mgr.submit([] {
emit("task 2 doing something");
std::this_thread::sleep_for(1500ms);
emit("task 2 is going to throw");
throw std::runtime_error("here is an error");
}, use_future);
// an example of returning a future and then immediately using its continuation.
// note that the continuation happens on the task_manager's thread pool
mgr.submit([] {
emit("task 3 doing something");
std::this_thread::sleep_for(500ms);
emit("task 3 is done");
},
use_future)
.then([](auto f) {
try {
f.get();
}
catch (std::exception const &e) {
emit("task 3 threw an exception: ", e.what());
}
});
// block on the future of the second example
try {
f.get();
}
catch (std::exception &e) {
emit("task 2 threw: ", e.what());
}
}
关于c++ - 如何从 std::vector 中自动删除已完成的 future ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45117450/
我正在通过这个示例https://www.rusoto.org/futures.html学习Rust和Rusoto 而且我发现许多代码已经过时了。所以我改变了这样的代码: use rusoto_cor
这是一个理论问题。我有一个服务可以调用来完成工作,但该服务可能无法完成所有工作,因此我需要调用第二个服务来完成它。 我想知道是否有办法在没有 Await.result 的情况下做类似的事情map 函数
这个问题是关于如何阅读 Rust 文档并提高我对 Rust 的理解,从而了解如何解决这个特定的编译器错误。 我读过 tokio docs并试验了许多 examples .在编写自己的代码时,我经常遇到
我有一个使用分页的 HTTP api,我想将它包装到一个通用的 Rust 流中,以便所有端点都可以使用相同的接口(interface),这样我就可以使用 Stream 附带的特征函数特征。 我收到了这
我正在查看 AKKA 的 Java Futures API,我看到了很多处理同一类型的多个 future 的方法,但我没有看到任何处理不同类型的 future 的方法。我猜我让事情变得更加复杂了。 无
环境:Akka 2.1,scala 版本 2.10.M6,JDK 1.7,u5 现在是我的问题: 我有: future1 = Futures.future(new Callable>(){...});
我有一些代码可以将请求提交给另一个线程,该线程可能会也可能不会将该请求提交给另一个线程。这会产生 Future> 的返回类型.是否有一些非令人发指的方法可以立即将其变成 Future等待整个 futu
如果我有以下代码: Future a = new Future(() { print('a'); return 1; }); Future b = new Future.error('Error!')
我一直试图简化我在 Scala 中做 future 的方式。我有一次收到了 Future[Option[Future[Option[Boolean]]但我在下面进一步简化了它。有没有更好的方法来简化这
Scala 中从 Future[Option[Future[Int]]] 转换的最干净的方法是什么?至 Future[Option[Int]] ?甚至有可能吗? 最佳答案 有两个嵌套Future s
使用下面的示例,future2 如何在 future1 完成后使用 future1 的结果(不阻塞 future3 从被提交)? from concurrent.futures import Proc
这两个类代表了并发编程的优秀抽象,因此它们不支持相同的 API 有点令人不安。 具体根据docs : asyncio.Future is almost compatible with concurre
我正在尝试使用 wasm_bindgen 实现 API 类使用异步调用。 #![allow(non_snake_case)] use std::future::Future; use serde::{
这个问题在这里已经有了答案: Futures / Success race (3 个回答) 去年关闭。 所有的 future 最终可能会成功(有些可能会失败),但我们希望第一个成功。并希望将这一结果表
我在练习asyncio在编写多线程代码多年之后。 注意到一些我觉得很奇怪的东西。都在 asyncio在 concurrent有一个Future目的。 from asyncio import Futur
如何将Future[Option[Future[Option[X]]]]转换为Future[Option[X]]? 如果它是 TraversableOnce 而不是 Option 我会使用 Futur
我正在尝试同时发送 HTTP 请求。为此,我使用 concurrent.futures 这是简单的代码: import requests from concurrent import futures
我们在 vertx 中使用 Futures 的例子如下: Future fetchVehicle = getUserBookedVehicle(routingContext, client);
下面的函数,取自 here : fn connection_for( &self, pool_key: PoolKey, ) -> impl Future>, ClientError>
我正在围绕Java库编写一个小的Scala包装器。 Java库有一个对象QueryExecutor,它公开了2种方法: execute(query):结果 asyncExecute(query):Li
我是一名优秀的程序员,十分优秀!