gpt4 book ai didi

c++ - 如何使 boost::thread_group 更小,并在其线程中运行 boost::asio::io_service::run?

转载 作者:行者123 更新时间:2023-11-30 02:08:09 28 4
gpt4 key购买 nike

我看到的一般是common way to create thread pools via "io_service + thread_group" .它非常适合 const 大小的线程池。或者只能变大的游泳池。但我想知道如何在不停止所有 io_service 的情况下缩小这样的池?

所以我们有 shown

// class variables
asio::io_service io_service;
boost::thread_group threads;
asio::io_service::work *work;

// some pool init function
work = new asio::io_service::work(io_service);
int cores_number = boost::thread::hardware_concurrency();
for (std::size_t i = 0; i < cores_number; ++i)
threads.create_thread(boost::bind(&asio::io_service::run, &io_service));

// and now we can simply post tasks
io_service.post(boost::bind(&class_name::an_expensive_calculation, this, 42));
io_service.post(boost::bind(&class_name::a_long_running_task, this, 123));

// and it is really eazy to make pool biger - just call (mutexes may be required)
threads.create_thread(boost::bind(&asio::io_service::run, &io_service));

但是如果我们想从线程池中删除线程怎么办?我们不能简单地调用 threads.remove_thread(thread* thrd); 因为它不会停止在其中运行 &asio::io_service::run (恕我直言) 所以我想知道- 是否有可能以及如何真正从此类池中删除线程? (不只是打断他们,而是等到当前线程任务超出范围)?

更新:

这里是一些简单的可编译代码:线程池,以及线程所需的生命周期。

#include <stdio.h>
#include <iostream>
#include <fstream>

//Boost
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/locks.hpp>

boost::asio::io_service io_service;
boost::asio::io_service::work *work;
boost::thread_group threads;
boost::mutex threads_creation;
int time_limit;

int calculate_the_answer_to_life_the_universe_and_everything(int i)
{
boost::this_thread::sleep(boost::posix_time::milliseconds(i));
std::cout << i << std::endl;
return i;
}

void run(boost::shared_ptr<boost::thread> thread_ptr)
{
try
{
io_service.run();
}
catch(std::exception &e)
{
std::cout << "exeption: " << e.what() << std::endl;
boost::mutex::scoped_lock lock(threads_creation);
threads.remove_thread(thread_ptr.get());
lock.unlock();
std::cout << "thread removed from group" << std::endl;
return;
}

}

void pool_item( int i)
{
boost::packaged_task<int> pt(boost::bind(calculate_the_answer_to_life_the_universe_and_everything, i));
boost::unique_future<int> fi=pt.get_future();

boost::thread *task = new boost::thread(std::move(pt)); // launch task on a thread

if(fi.timed_wait(boost::posix_time::milliseconds(time_limit)))
{
std::cout << "sucsess function returned: " << fi.get() << std::endl;
}
else
{
std::cout << "request took way 2 long!" << std::endl;

std::cout << "current group size:" << threads.size() << std::endl;

boost::shared_ptr<boost::thread> thread;
boost::packaged_task<void> pt(boost::bind(run, thread));
thread = boost::shared_ptr<boost::thread>( new boost::thread(std::move(pt)));

boost::mutex::scoped_lock lock(threads_creation);
threads.add_thread(thread.get());
lock.unlock();

task->join();

throw std::runtime_error("killed joined thread");

}
}

int main()
{
time_limit = 500;

work = new boost::asio::io_service::work(io_service);
int cores_number = boost::thread::hardware_concurrency();
for (std::size_t i = 0; i < cores_number; ++i)
{

boost::shared_ptr<boost::thread> thread;
boost::packaged_task<void> pt(boost::bind(run, thread));
thread = boost::shared_ptr<boost::thread>( new boost::thread(std::move(pt)));
threads.add_thread(thread.get());
}

int i = 800;
io_service.post(boost::bind(pool_item, i));

boost::this_thread::sleep(boost::posix_time::milliseconds(i*2));
std::cout << "thread should be removed by now." << std::endl
<< "group size:" << threads.size() << std::endl;

std::cin.get();
return 0;
}

如您所见,即使在 .remove_thread(ptr); 调用之后,线程也不会从线程池中移除。=( 为什么?

更新 #2:

好吧,无论如何我最终得到了服装线程组......

#include <stdio.h>
#include <iostream>
#include <fstream>
#include <set>

//Boost
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/locks.hpp>

//cf service interface
//#include <service.hpp>

//cf-server
//#include <server.h>

#include <boost/foreach.hpp>

class thread_group
{
public:
void add( boost::shared_ptr<boost::thread> to_add)
{
boost::mutex::scoped_lock lock(m);
ds_.insert(to_add);
}
void remove( boost::shared_ptr<boost::thread> to_remove)
{
boost::mutex::scoped_lock lock(m);
ds_.erase(to_remove);
}

int size()
{
boost::mutex::scoped_lock lock(m);
return ds_.size();
}

void join_all(boost::posix_time::milliseconds interuption_time=boost::posix_time::milliseconds(1000))
{
boost::mutex::scoped_lock lock(m);
BOOST_FOREACH(boost::shared_ptr<boost::thread> t, ds_)
{
boost::thread interrupter(boost::bind(&thread_group::interupt_thread, this, t, interuption_time));
}
}

private:
std::set< boost::shared_ptr<boost::thread> > ds_;
boost::mutex m;
void interupt_thread(boost::shared_ptr<boost::thread> t, boost::posix_time::milliseconds interuption_time)
{
try
{
if(!t->timed_join(interuption_time))
t->interrupt();

}
catch(std::exception &e)
{
}
}
};

boost::asio::io_service io_service;
boost::asio::io_service::work *work;
thread_group threads;
int time_limit;



int calculate_the_answer_to_life_the_universe_and_everything(int i)
{
boost::this_thread::sleep(boost::posix_time::milliseconds(i));
std::cout << i << std::endl;
return i;
}

void run(boost::shared_ptr<boost::thread> thread_ptr)
{
try
{
io_service.run();
}
catch(std::exception &e)
{
std::cout << "exeption: " << e.what() << std::endl;
threads.remove(thread_ptr);
std::cout << "thread removed from group" << std::endl;
return;
}

}

void pool_item( int i)
{
boost::packaged_task<int> pt(boost::bind(calculate_the_answer_to_life_the_universe_and_everything, i));
boost::unique_future<int> fi=pt.get_future();

boost::thread *task = new boost::thread(std::move(pt)); // launch task on a thread

if(fi.timed_wait(boost::posix_time::milliseconds(time_limit)))
{
std::cout << "sucsess function returned: " << fi.get() << std::endl;
}
else
{
std::cout << "request took way 2 long!" << std::endl;

std::cout << "current group size:" << threads.size() << std::endl;
std::cout << "we want to add thread!" << std::endl;
boost::shared_ptr<boost::thread> thread;
boost::packaged_task<void> pt(boost::bind(run, thread));
threads.add(thread);
std::cout << "thread added" << std::endl
<< "current group size:" << threads.size() << std::endl;
task->join();

throw std::runtime_error("killed joined thread");

}
}

int main()
{
time_limit = 500;

work = new boost::asio::io_service::work(io_service);
int cores_number = boost::thread::hardware_concurrency();
for (std::size_t i = 0; i < cores_number; ++i)
{

boost::shared_ptr<boost::thread> thread;
boost::packaged_task<void> pt(boost::bind(run, thread));
thread = boost::shared_ptr<boost::thread>( new boost::thread(std::move(pt)));
threads.add(thread);
}

int i = 800;
io_service.post(boost::bind(pool_item, i));

boost::this_thread::sleep(boost::posix_time::milliseconds(i*2));
std::cout << "thread should be removed by now." << std::endl
<< "group size:" << threads.size() << std::endl;

std::cin.get();
return 0;
}

最佳答案

我过去已经能够通过利用 run() 将在回调抛出异常时退出这一事实来实现这一点。我没有直接在线程中启动 run(),而是调用一个实用函数,如果抛出适当的异常,该函数将退出线程:

void RunIOService()
{
try
{
io_service.run();
}
catch(std::exception ex)
{
}
}

然后您所要做的就是安排一个会抛出异常的回调:

static void KillThreadCallback() 
{
// throw some exception that you catch above
}

io_service.post(&KillThreadCallback);

这将导致执行此回调的线程退出,实质上将线程池计数大小减 1。使用它,您可以非常轻松地扩展和收缩 io_service 线程池。

关于c++ - 如何使 boost::thread_group 更小,并在其线程中运行 boost::asio::io_service::run?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7218884/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com