- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我想同时使用 Boost.Asio 的 strand 和 prioritized wrapper。
在我编写代码之前,我阅读了以下信息:
Boost asio priority and strand
http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531
Why do I need strand per connection when using boost::asio?
我想使用包装方法,因为我想使用各种异步 API,例如 async_read、async_write 和 async_connect。根据http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531 , 貌似priority wrapper和strand wrapper可以结合起来。
所以我根据下面的例子写了代码:
这是我的代码:
#include <iostream>
#include <functional>
#include <queue>
#include <vector>
#include <thread>
#include <mutex>
#include <boost/asio.hpp>
#include <boost/optional.hpp>
#define ENABLE_STRAND 1
#define ENABLE_PRIORITY 1
class handler_priority_queue {
public:
template <typename Handler>
void add(int priority, Handler&& handler) {
std::cout << "add(" << priority << ")" << std::endl;
std::lock_guard<std::mutex> g(mtx_);
handlers_.emplace(priority, std::forward<Handler>(handler));
}
void execute_all() {
auto top = [&]() -> boost::optional<queued_handler> {
std::lock_guard<std::mutex> g(mtx_);
if (handlers_.empty()) return boost::none;
boost::optional<queued_handler> opt = handlers_.top();
handlers_.pop();
return opt;
};
while (auto h_opt = top()) {
h_opt.get().execute();
}
}
template <typename Handler>
class wrapped_handler {
public:
wrapped_handler(handler_priority_queue& q, int p, Handler h)
: queue_(q), priority_(p), handler_(std::move(h))
{
}
template <typename... Args>
void operator()(Args&&... args) {
std::cout << "operator() " << std::endl;
handler_(std::forward<Args>(args)...);
}
//private:
handler_priority_queue& queue_;
int priority_;
Handler handler_;
};
template <typename Handler>
wrapped_handler<Handler> wrap(int priority, Handler&& handler) {
return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler));
}
private:
class queued_handler {
public:
template <typename Handler>
queued_handler(int p, Handler&& handler)
: priority_(p), function_(std::forward<Handler>(handler))
{
std::cout << "queued_handler()" << std::endl;
}
void execute() {
std::cout << "execute(" << priority_ << ")" << std::endl;
function_();
}
friend bool operator<(
queued_handler const& lhs,
queued_handler const & rhs) {
return lhs.priority_ < rhs.priority_;
}
private:
int priority_;
std::function<void()> function_;
};
std::priority_queue<queued_handler> handlers_;
std::mutex mtx_;
};
// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
handler_priority_queue::wrapped_handler<Handler>* h) {
std::cout << "asio_handler_invoke " << std::endl;
h->queue_.add(h->priority_, std::forward<Function>(f));
}
//----------------------------------------------------------------------
int main() {
int const num_of_threads = 4;
int const num_of_tasks = 5;
boost::asio::io_service ios;
boost::asio::strand strand(ios);
handler_priority_queue pq;
for (int i = 0; i != num_of_tasks; ++i) {
ios.post(
#if ENABLE_STRAND
strand.wrap(
#endif
#if ENABLE_PRIORITY
pq.wrap(
i,
#endif
[=] {
std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl;
}
#if ENABLE_PRIORITY
)
#endif
#if ENABLE_STRAND
)
#endif
);
}
std::vector<std::thread> pool;
for (int i = 0; i != num_of_threads; ++i) {
pool.emplace_back([&]{
std::cout << "before run_one()" << std::endl;
while (ios.run_one()) {
std::cout << "before poll_one()" << std::endl;
while (ios.poll_one())
;
std::cout << "before execute_all()" << std::endl;
pq.execute_all();
}
}
);
}
for (auto& t : pool) t.join();
}
包装器由以下宏启用:
#define ENABLE_STRAND 1
#define ENABLE_PRIORITY 1
启用两个宏后,我得到以下结果:
before run_one()
asio_handler_invoke
add(0)
queued_handler()
before poll_one()
asio_handler_invoke
add(1)
queued_handler()
asio_handler_invoke
add(2)
queued_handler()
asio_handler_invoke
add(3)
queued_handler()
asio_handler_invoke
add(4)
queued_handler()
before execute_all()
execute(4)
execute(3)
execute(2)
execute(1)
execute(0)
before run_one()
before run_one()
before run_one()
我希望我得到
[called] priority,thread_id
输出为
[called] 1,140512649541376
但我没有得到它。
似乎在函数execute()
中,调用了function_()
,但没有调用wrapped_handler::operator()
。 (函数 execute()
在我的代码中从 pq.execute_all();
调用。)
void execute() {
std::cout << "execute(" << priority_ << ")" << std::endl;
function_(); // It is called.
}
template <typename Handler>
class wrapped_handler {
public:
template <typename... Args>
void operator()(Args&&... args) { // It is NOT called
std::cout << "operator() " << std::endl;
handler_(std::forward<Args>(args)...);
}
我在 function_()
被调用后跟踪了序列。
调用了以下函数:
https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L191 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L76 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/strand.hpp#L158 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.hpp#L55 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.ipp#L94
然后在函数 bool strand_service::do_dispatch(implementation_type& impl, operation* op)
中,操作 op
没有被调用,而是被插入了下一行的队列中:
我不确定为什么将 function_()
分派(dispatch)给 strand_service。我认为 strand wrapper 已经在我的代码中的以下位置展开:
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
handler_priority_queue::wrapped_handler<Handler>* h) {
std::cout << "asio_handler_invoke " << std::endl;
h->queue_.add(h->priority_, std::forward<Function>(f));
}
如果我只启用优先包装器,我会得到以下结果。看起来像我预期的那样工作。
before run_one()
asio_handler_invoke
add(0)
queued_handler()
before poll_one()
asio_handler_invoke
add(1)
queued_handler()
asio_handler_invoke
add(2)
queued_handler()
asio_handler_invoke
add(3)
queued_handler()
asio_handler_invoke
add(4)
queued_handler()
before execute_all()
execute(4)
operator()
[called] 4,140512649541376
execute(3)
operator()
[called] 3,140512649541376
execute(2)
operator()
[called] 2,140512649541376
execute(1)
operator()
[called] 1,140512649541376
execute(0)
operator()
[called] 0,140512649541376
before run_one()
before run_one()
before run_one()
如果我只启用 strand wrapper,我会得到以下结果。似乎也按我的预期工作。
before run_one()
[called] 0,140127385941760
before poll_one()
[called] 1,140127385941760
[called] 2,140127385941760
[called] 3,140127385941760
[called] 4,140127385941760
before execute_all()
before run_one()
before run_one()
before run_one()
有什么想法吗?
最佳答案
我解决了这个问题。
I'm not sure why the function_() is dispatched to strand_service. I think that strand wrapper has already been unwraped at the following point in my code:
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
handler_priority_queue::wrapped_handler<Handler>* h) {
std::cout << "asio_handler_invoke " << std::endl;
h->queue_.add(h->priority_, std::forward<Function>(f));
}
参数f
是原始处理程序。这意味着优先队列包装和链包装处理程序。绞线包装在外面。所以当调用f
时,它被分派(dispatch)给strand_service。此过程发生在同一个 strand_service 中,因此不会调用处理程序。
为了解决这个问题,将h->handler_
加入优先级队列而不是f
如下:
// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
handler_priority_queue::wrapped_handler<Handler>* h) {
std::cout << "asio_handler_invoke " << std::endl;
h->queue_.add(h->priority_, h->handler_);
}
handler_
是类模板wrapped_handler
的成员变量。它包含未包装的处理程序。
完整代码如下:
#include <iostream>
#include <functional>
#include <queue>
#include <vector>
#include <thread>
#include <mutex>
#include <boost/asio.hpp>
#include <boost/optional.hpp>
#define ENABLE_STRAND 1
#define ENABLE_PRIORITY 1
class handler_priority_queue {
public:
template <typename Handler>
void add(int priority, Handler&& handler) {
std::cout << "add(" << priority << ")" << std::endl;
std::lock_guard<std::mutex> g(mtx_);
handlers_.emplace(priority, std::forward<Handler>(handler));
}
void execute_all() {
auto top = [&]() -> boost::optional<queued_handler> {
std::lock_guard<std::mutex> g(mtx_);
if (handlers_.empty()) return boost::none;
boost::optional<queued_handler> opt = handlers_.top();
handlers_.pop();
return opt;
};
while (auto h_opt = top()) {
h_opt.get().execute();
}
}
template <typename Handler>
class wrapped_handler {
public:
template <typename HandlerArg>
wrapped_handler(handler_priority_queue& q, int p, HandlerArg&& h)
: queue_(q), priority_(p), handler_(std::forward<HandlerArg>(h))
{
}
template <typename... Args>
void operator()(Args&&... args) {
std::cout << "operator() " << std::endl;
handler_(std::forward<Args>(args)...);
}
//private:
handler_priority_queue& queue_;
int priority_;
Handler handler_;
};
template <typename Handler>
wrapped_handler<Handler> wrap(int priority, Handler&& handler) {
return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler));
}
private:
class queued_handler {
public:
template <typename Handler>
queued_handler(int p, Handler&& handler)
: priority_(p), function_(std::forward<Handler>(handler))
{
std::cout << "queued_handler()" << std::endl;
}
void execute() {
std::cout << "execute(" << priority_ << ")" << std::endl;
function_();
}
friend bool operator<(
queued_handler const& lhs,
queued_handler const & rhs) {
return lhs.priority_ < rhs.priority_;
}
private:
int priority_;
std::function<void()> function_;
};
std::priority_queue<queued_handler> handlers_;
std::mutex mtx_;
};
// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
handler_priority_queue::wrapped_handler<Handler>* h) {
std::cout << "asio_handler_invoke " << std::endl;
h->queue_.add(h->priority_, h->handler_);
}
//----------------------------------------------------------------------
int main() {
int const num_of_threads = 4;
int const num_of_tasks = 5;
boost::asio::io_service ios;
boost::asio::strand strand(ios);
handler_priority_queue pq;
for (int i = 0; i != num_of_tasks; ++i) {
ios.post(
#if ENABLE_STRAND
strand.wrap(
#endif
#if ENABLE_PRIORITY
pq.wrap(
i,
#endif
[=] {
std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl;
}
#if ENABLE_STRAND
)
#endif
#if ENABLE_PRIORITY
)
#endif
);
}
std::vector<std::thread> pool;
for (int i = 0; i != num_of_threads; ++i) {
pool.emplace_back([&]{
std::cout << "before run_one()" << std::endl;
while (ios.run_one()) {
std::cout << "before poll_one()" << std::endl;
while (ios.poll_one())
;
std::cout << "before execute_all()" << std::endl;
pq.execute_all();
}
}
);
}
for (auto& t : pool) t.join();
}
这是一个输出:
before run_one()
asio_handler_invoke
add(0)
queued_handler()
before poll_one()
asio_handler_invoke
add(1)
queued_handler()
asio_handler_invoke
add(2)
queued_handler()
asio_handler_invoke
add(3)
queued_handler()
asio_handler_invoke
add(4)
queued_handler()
before execute_all()
execute(4)
[called] 4,139903315736320
execute(3)
[called] 3,139903315736320
execute(2)
[called] 2,139903315736320
execute(1)
[called] 1,139903315736320
execute(0)
[called] 0,139903315736320
before run_one()
before run_one()
before run_one()
关于c++ - 如何在 Boost Asio 上结合 strand wrapper 和 priority wrapper,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43385897/
我正在尝试使用以下命令将 slingbox 连接设置为 1:20 类: iptables -t mangle -A to-cable -p tcp --dport 5001 -j CLASSIFY -
我使用 MailKit 作为 SMTP 客户端来发送电子邮件。 我看到有两个属性 XMessagePriority和 MessagePriority 这两者有什么区别,一个会覆盖另一个吗? 最佳答案
我有一个 5 列的布局(1 和 5 大小相同,2 和 4 大小相同): | 1 | 2 | 3 | 4 | 5 | 我们允许用户水平调整应用程序的大小(缩小它),我试图实现的行为如下:缩小第 1 列和
在 Websphere 中,可以创建“共享库”并与应用程序关联。我想知道,就类路径中的位置而言,添加共享库的位置是什么?这意味着,我想知道,在重复资源的情况下,将具有更高的“优先级”加载、共享库或 E
这个问题在这里已经有了答案: Scope of variables in "for" loop (2 个答案) New block scope for selection and iteration
我想了解优先级是如何运作的。更具体地说,设置 stub 优先级的预期输出是什么。关于此的文档有限,并且可用的文档并不能真正解释输出的样子,因此我无法验证我是否已正确实现它。 这是我当前的代码:
我正在处理优先级队列,我想检查在弹出操作期间堆属性是如何维护的。这是我的代码。 #include #include using namespace std; class g { publi
下面是典型的读写模式(读多写少) private ReadWriteLock lock = new ReentrantReadWriteLock(); private int value;
我有一个java程序是这样的 公共(public)类 PriorityQueueExample { public static void main(String[] args) { Prior
我是 java 和线程世界的新手..我只是在看下面的示例代码:- package com.alice.learnthread; class NewThread implements Runnable{
我正在研究优先级队列,我想检查如何使用可比较的类来比较这些值。这是我的代码。 #include #include using namespace std; class g { public
我正在尝试实现 Node* 的优先级队列,其中 Node 是我自己定义的一个类。我意识到拥有一个优先级的指针队列意味着它会根据地址而不是节点持有的值进行排序,所以我搜索了很多讨论论坛来找到一个解决方案
如何在 kafka 上添加延迟作业?据我了解,它不是按消息处理,而是按主题处理。我的工作有不同的时间表,我希望它们被消耗。假设一个将在接下来的 4 小时内,另一个将是我 12 月 1 日,等等。 ka
任何人都想一想。 OpenMP功能可调节cpu肌肉以处理dumbbel。在我对openmp的研究中,我们无法设置线程优先级来执行功能强大的块代码。只有一种方法(_beginthreadex或具有5.参
我有 2 个线程,其优先级已使用 setPriority() 函数设置,但它仍然显示相同的优先级? 这是代码片段: public class threadtest extends Thread {
我的数据库中有一个 photos 表,它使用 status 列来检测照片是否经过验证... 因此,如果照片的 status = 0 则其未经过验证,但如果 status = 1 则该照片已验证。 我想
我有下表用于本地化: Key | Value | lang MainTitle | Welcome to my page |
我的新老板认为你可以每年向 App Store 提交更新并将其标记为优先级一两次。如果你滥用它,你会失去你的特权吗? 他是对的。我多年来一直在提交更新,但从未听说过它? 干杯 最佳答案 您可以申请加急
当我设置 qualityOfService 时,我的 iOS 应用程序有多个线程(在 8 到 50 之间)在线程开始之前,.userInitiated线程在 .background 之前完成线程。但我
在各种浏览器中,当使用特定字体时(例如 - Helvetica Neue),如果未找到该字体,则使用直系字体中的第一个字体。因此,如果我要指定 Arial Narrow 是元素的基本字体样式,而我的读
我是一名优秀的程序员,十分优秀!