gpt4 book ai didi

c++11 - Boost asio thread_pool join 不等待任务完成

转载 作者:行者123 更新时间:2023-12-03 16:11:21 25 4
gpt4 key购买 nike

考虑函数

#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>

void foo(const uint64_t begin, uint64_t *result)
{
uint64_t prev[] = {begin, 0};
for (uint64_t i = 0; i < 1000000000; ++i)
{
const auto tmp = (prev[0] + prev[1]) % 1000;
prev[1] = prev[0];
prev[0] = tmp;
}
*result = prev[0];
}

void batch(boost::asio::thread_pool &pool, const uint64_t a[])
{
uint64_t r[] = {0, 0};
boost::asio::post(pool, boost::bind(foo, a[0], &r[0]));
boost::asio::post(pool, boost::bind(foo, a[1], &r[1]));

pool.join();
std::cerr << "foo(" << a[0] << "): " << r[0] << " foo(" << a[1] << "): " << r[1] << std::endl;
}

在哪里 foo是一个简单的“纯”函数,对 begin 执行计算。并将结果写入指针 *result .
使用来自 batch 的不同输入调用此函数.在这里将每个调用分派(dispatch)到另一个 CPU 内核可能是有益的。

现在假设批处理函数被调用了几万次。因此,一个线程池会很好,它在所有顺序批处理调用之间共享。

尝试这个(为了简单起见,只有 3 个调用)

int main(int argn, char **)
{
boost::asio::thread_pool pool(2);

const uint64_t a[] = {2, 4};
batch(pool, a);

const uint64_t b[] = {3, 5};
batch(pool, b);

const uint64_t c[] = {7, 9};
batch(pool, c);
}

导致结果

foo(2): 2 foo(4): 4
foo(3): 0 foo(5): 0
foo(7): 0 foo(9): 0



其中所有三行同时出现,而 foo 的计算大约需要 3 秒。
我假设只有第一个 join真正等待池完成所有工作。
其他的结果无效。 (未初始化的值)
这里重用线程池的最佳实践是什么?

最佳答案

最佳实践是不要重用池(如果您不断创建新池,池的用途是什么?)。

如果您想确保将批处理“计时”在一起,我建议使用 when_all关于 future :

Live On Coliru

#define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

uint64_t foo(uint64_t begin) {
uint64_t prev[] = {begin, 0};
for (uint64_t i = 0; i < 1000000000; ++i) {
const auto tmp = (prev[0] + prev[1]) % 1000;
prev[1] = prev[0];
prev[0] = tmp;
}
return prev[0];
}

void batch(boost::asio::thread_pool &pool, const uint64_t a[2])
{
using T = boost::packaged_task<uint64_t>;

T tasks[] {
T(boost::bind(foo, a[0])),
T(boost::bind(foo, a[1])),
};

auto all = boost::when_all(
tasks[0].get_future(),
tasks[1].get_future());

for (auto& t : tasks)
post(pool, std::move(t));

auto [r0, r1] = all.get();
std::cerr << "foo(" << a[0] << "): " << r0.get() << " foo(" << a[1] << "): " << r1.get() << std::endl;
}

int main() {
boost::asio::thread_pool pool(2);

const uint64_t a[] = {2, 4};
batch(pool, a);

const uint64_t b[] = {3, 5};
batch(pool, b);

const uint64_t c[] = {7, 9};
batch(pool, c);
}

打印
foo(2): 2 foo(4): 4
foo(3): 503 foo(5): 505
foo(7): 507 foo(9): 509

我会考虑
  • 概括
  • 消息队列

  • 广义的

    通过不对批量大小进行硬编码,使其更加灵活。毕竟,池大小已经固定,我们不需要“确保批处理适合”或其他东西:

    Live On Coliru
    #define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
    #include <iostream>
    #include <boost/bind.hpp>
    #include <boost/asio.hpp>
    #include <boost/thread.hpp>
    #include <boost/thread/future.hpp>

    struct Result { uint64_t begin, result; };

    Result foo(uint64_t begin) {
    uint64_t prev[] = {begin, 0};
    for (uint64_t i = 0; i < 1000000000; ++i) {
    const auto tmp = (prev[0] + prev[1]) % 1000;
    prev[1] = prev[0];
    prev[0] = tmp;
    }
    return { begin, prev[0] };
    }

    void batch(boost::asio::thread_pool &pool, std::vector<uint64_t> const a)
    {
    using T = boost::packaged_task<Result>;
    std::vector<T> tasks;
    tasks.reserve(a.size());

    for(auto begin : a)
    tasks.emplace_back(boost::bind(foo, begin));

    std::vector<boost::unique_future<T::result_type> > futures;
    for (auto& t : tasks) {
    futures.push_back(t.get_future());
    post(pool, std::move(t));
    }

    for (auto& fut : boost::when_all(futures.begin(), futures.end()).get()) {
    auto r = fut.get();
    std::cerr << "foo(" << r.begin << "): " << r.result << " ";
    }
    std::cout << std::endl;
    }

    int main() {
    boost::asio::thread_pool pool(2);

    batch(pool, {2});
    batch(pool, {4, 3, 5});
    batch(pool, {7, 9});
    }

    打印
    foo(2): 2 
    foo(4): 4 foo(3): 503 foo(5): 505
    foo(7): 507 foo(9): 509

    Generalized2:可变参数简化

    与普遍认为的相反(老实说,通常发生的事情)这次我们可以利用可变参数来摆脱所有中间向量(每一个):

    Live On Coliru
    void batch(boost::asio::thread_pool &pool, T... a)
    {
    auto launch = [&pool](uint64_t begin) {
    boost::packaged_task<Result> pt(boost::bind(foo, begin));
    auto fut = pt.get_future();
    post(pool, std::move(pt));
    return fut;
    };

    for (auto& r : {launch(a).get()...}) {
    std::cerr << "foo(" << r.begin << "): " << r.result << " ";
    }

    std::cout << std::endl;
    }

    如果坚持要及时输出结果,还是可以加 when_all混合起来(需要更多的英雄来解开元组):

    Live On Coliru
    template <typename...T>
    void batch(boost::asio::thread_pool &pool, T... a)
    {
    auto launch = [&pool](uint64_t begin) {
    boost::packaged_task<Result> pt(boost::bind(foo, begin));
    auto fut = pt.get_future();
    post(pool, std::move(pt));
    return fut;
    };

    std::apply([](auto&&... rfut) {
    Result results[] {rfut.get()...};
    for (auto& r : results) {
    std::cerr << "foo(" << r.begin << "): " << r.result << " ";
    }
    }, boost::when_all(launch(a)...).get());

    std::cout << std::endl;
    }

    两者仍然打印相同的结果

    消息队列

    这是非常自然的提升,并且可以跳过大多数复杂性。如果您还想按批处理组报告,则必须协调:

    Live On Coliru
    #include <iostream>
    #include <boost/asio.hpp>
    #include <memory>

    struct Result { uint64_t begin, result; };

    Result foo(uint64_t begin) {
    uint64_t prev[] = {begin, 0};
    for (uint64_t i = 0; i < 1000000000; ++i) {
    const auto tmp = (prev[0] + prev[1]) % 1000;
    prev[1] = prev[0];
    prev[0] = tmp;
    }
    return { begin, prev[0] };
    }

    using Group = std::shared_ptr<size_t>;
    void batch(boost::asio::thread_pool &pool, std::vector<uint64_t> begins) {
    auto group = std::make_shared<std::vector<Result> >(begins.size());

    for (size_t i=0; i < begins.size(); ++i) {
    post(pool, [i,begin=begins.at(i),group] {
    (*group)[i] = foo(begin);
    if (group.unique()) {
    for (auto& r : *group) {
    std::cout << "foo(" << r.begin << "): " << r.result << " ";
    std::cout << std::endl;
    }
    }
    });
    }
    }

    int main() {
    boost::asio::thread_pool pool(2);

    batch(pool, {2});
    batch(pool, {4, 3, 5});
    batch(pool, {7, 9});
    pool.join();
    }

    Note this is having concurrent access to group, which is safe due to the limitations on element accesses.



    打印:
    foo(2): 2 
    foo(4): 4 foo(3): 503 foo(5): 505
    foo(7): 507 foo(9): 509

    关于c++11 - Boost asio thread_pool join 不等待任务完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61328430/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com