gpt4 book ai didi

c++11 - RXCPP:阻塞功能超时

转载 作者:行者123 更新时间:2023-12-02 03:02:11 24 4
gpt4 key购买 nike

考虑一个阻塞函数:this_thread::sleep_for(milliseconds(3000));

我正在尝试获得以下行为:

Trigger Blocking Function               

|---------------------------------------------X

我想触发阻塞功能,如果它花费的时间太长(超过两秒),它应该超时。

我做了以下工作:
my_connection = observable<>::create<int>([](subscriber<int> s) {
auto s2 = observable<>::just(1, observe_on_new_thread()) |
subscribe<int>([&](auto x) {
this_thread::sleep_for(milliseconds(3000));
s.on_next(1);
});
}) |
timeout(seconds(2), observe_on_new_thread());

我不能让它工作。对于初学者,我认为 s 不能从不同的线程 on_next 。

所以我的问题是,这样做的正确 react 方式是什么?如何在 rxcpp 中包装阻塞函数并为其添加超时?

随后,我想获得一个行为如下的 RX 流:
Trigger                Cleanup

|------------------------X
(Delay) Trigger Cleanup
|-----------------X

最佳答案

好问题!上面已经很接近了。

这是一个如何使阻塞操作适应 rxcpp 的示例。确实如此 libcurl polling发出http请求。

以下应该做你想要的。

auto sharedThreads = observe_on_event_loop();

auto my_connection = observable<>::create<int>([](subscriber<int> s) {
this_thread::sleep_for(milliseconds(3000));
s.on_next(1);
s.on_completed();
}) |
subscribe_on(observe_on_new_thread()) |
//start_with(0) | // workaround bug in timeout
timeout(seconds(2), sharedThreads);
//skip(1); // workaround bug in timeout

my_connection.as_blocking().subscribe(
[](int){},
[](exception_ptr ep){cout << "timed out" << endl;}
);
  • subscribe_on将运行 create在专用线程上,因此 create允许阻塞该线程。
  • timeout将在不同的线程上运行计时器,可以与其他人共享,并转移所有 on_next/on_error/on_completed调用同一个线程。
  • as_blocking将确保 subscribe在完成之前不会返回。这仅用于防止main()从退出 - 最常见的是在测试或示例程序中。

  • 编辑:为 timeout 中的错误添加了解决方法.目前,它不会在第一个值到达之前安排第一次超时。

    编辑 2: timeout错误已修复,不再需要解决方法。

    关于c++11 - RXCPP:阻塞功能超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45051166/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com