gpt4 book ai didi

multithreading - boost::asio 中的动态线程池示例

转载 作者:行者123 更新时间:2023-12-04 18:47:55 27 4
gpt4 key购买 nike

我将使用单个 io_service 实现带有线程池的 boost::asio 服务器(HTTP Server 3example)。 io_service将绑定(bind)到 unix 域套接字并将请求从该套接字上的连接传递到不同的线程。为了减少资源消耗,我想让线程池动态化。

这是一个概念。首先创建一个线程。当请求到达并且服务器发现池中没有空闲线程时,它会创建一个新线程并将请求传递给它。服务器最多可以创建某个最大数量的线程。理想情况下,它应该具有暂停空闲一段时间的线程的功能。

有人做过类似的东西吗?或者也许有人有一个相关的例子?

至于我,我想我应该以某种方式覆盖 io_service.dispatch实现这一目标。

最佳答案

最初的方法可能存在一些挑战:

  • boost::asio::io_service并非旨在派生或重新实现。注意缺少虚函数。
  • 如果您的线程库不提供查询线程状态的能力,则需要单独管理状态信息。

  • 另一种解决方案是将工作发布到 io_service ,然后检查它在 io_service 中的停留时间.如果它准备好运行和实际运行之间的时间差超过某个阈值,那么这表明队列中的作业比服务队列的线程多。这样做的一个主要好处是动态线程池增长逻辑与其他逻辑分离。
    这是一个使用 deadline_timer 完成此操作的示例。 .
  • 设置 deadline_timer到期3从现在开始的几秒钟。
  • 异步等待 deadline_timer .处理程序将准备好运行3deadline_timer 开始的秒数已设置。
  • 在异步处理程序中,检查与计时器设置为过期时间相关的当前时间。如果大于2秒,然后是 io_service队列正在备份,所以向线程池添加一个线程。

  • 例子:
    #include <boost/asio.hpp>
    #include <boost/bind.hpp>
    #include <boost/thread.hpp>
    #include <iostream>

    class thread_pool_checker
    : private boost::noncopyable
    {
    public:

    thread_pool_checker( boost::asio::io_service& io_service,
    boost::thread_group& threads,
    unsigned int max_threads,
    long threshold_seconds,
    long periodic_seconds )
    : io_service_( io_service ),
    timer_( io_service ),
    threads_( threads ),
    max_threads_( max_threads ),
    threshold_seconds_( threshold_seconds ),
    periodic_seconds_( periodic_seconds )
    {
    schedule_check();
    }

    private:

    void schedule_check();
    void on_check( const boost::system::error_code& error );

    private:

    boost::asio::io_service& io_service_;
    boost::asio::deadline_timer timer_;
    boost::thread_group& threads_;
    unsigned int max_threads_;
    long threshold_seconds_;
    long periodic_seconds_;
    };

    void thread_pool_checker::schedule_check()
    {
    // Thread pool is already at max size.
    if ( max_threads_ <= threads_.size() )
    {
    std::cout << "Thread pool has reached its max. Example will shutdown."
    << std::endl;
    io_service_.stop();
    return;
    }

    // Schedule check to see if pool needs to increase.
    std::cout << "Will check if pool needs to increase in "
    << periodic_seconds_ << " seconds." << std::endl;
    timer_.expires_from_now( boost::posix_time::seconds( periodic_seconds_ ) );
    timer_.async_wait(
    boost::bind( &thread_pool_checker::on_check, this,
    boost::asio::placeholders::error ) );
    }

    void thread_pool_checker::on_check( const boost::system::error_code& error )
    {
    // On error, return early.
    if ( error ) return;

    // Check how long this job was waiting in the service queue. This
    // returns the expiration time relative to now. Thus, if it expired
    // 7 seconds ago, then the delta time is -7 seconds.
    boost::posix_time::time_duration delta = timer_.expires_from_now();
    long wait_in_seconds = -delta.seconds();

    // If the time delta is greater than the threshold, then the job
    // remained in the service queue for too long, so increase the
    // thread pool.
    std::cout << "Job job sat in queue for "
    << wait_in_seconds << " seconds." << std::endl;
    if ( threshold_seconds_ < wait_in_seconds )
    {
    std::cout << "Increasing thread pool." << std::endl;
    threads_.create_thread(
    boost::bind( &boost::asio::io_service::run,
    &io_service_ ) );
    }

    // Otherwise, schedule another pool check.
    schedule_check();
    }

    // Busy work functions.
    void busy_work( boost::asio::io_service&,
    unsigned int );

    void add_busy_work( boost::asio::io_service& io_service,
    unsigned int count )
    {
    io_service.post(
    boost::bind( busy_work,
    boost::ref( io_service ),
    count ) );
    }

    void busy_work( boost::asio::io_service& io_service,
    unsigned int count )
    {
    boost::this_thread::sleep( boost::posix_time::seconds( 5 ) );

    count += 1;

    // When the count is 3, spawn additional busy work.
    if ( 3 == count )
    {
    add_busy_work( io_service, 0 );
    }
    add_busy_work( io_service, count );
    }

    int main()
    {
    using boost::asio::ip::tcp;

    // Create io service.
    boost::asio::io_service io_service;

    // Add some busy work to the service.
    add_busy_work( io_service, 0 );

    // Create thread group and thread_pool_checker.
    boost::thread_group threads;
    thread_pool_checker checker( io_service, threads,
    3, // Max pool size.
    2, // Create thread if job waits for 2 sec.
    3 ); // Check if pool needs to grow every 3 sec.

    // Start running the io service.
    io_service.run();

    threads.join_all();

    return 0;
    }
    输出:
    将检查池是否需要在 3 秒内增加。
    作业作业排队等待 7 秒。
    增加线程池。
    将检查池是否需要在 3 秒内增加。
    作业作业在队列中等待了 0 秒。
    将检查池是否需要在 3 秒内增加。
    作业作业在队列中等待了 4 秒。
    增加线程池。
    将检查池是否需要在 3 秒内增加。
    作业作业在队列中等待了 0 秒。
    将检查池是否需要在 3 秒内增加。
    作业作业在队列中等待了 0 秒。
    将检查池是否需要在 3 秒内增加。
    作业作业在队列中等待了 0 秒。
    将检查池是否需要在 3 秒内增加。
    作业作业在队列中等待了 3 秒。
    增加线程池。
    线程池已达到最大值。示例将关闭。

    关于multithreading - boost::asio 中的动态线程池示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11121238/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com