gpt4 book ai didi

c++ - 查询简单的 C++ 线程池实现

转载 作者:塔克拉玛干 更新时间:2023-11-02 23:46:23 26 4
gpt4 key购买 nike

Stackoverflow 对我帮助很大,我想回馈社区。我一直在使用 TinyThread++ 实现一个简单的线程池 website C++ 可移植线程库,使用我从 Stackoverflow 学到的知识。我是线程编程的新手,所以对互斥量等不太满意。在展示代码(在 Linux 下运行良好)之后,我有一个最好的问题:

// ThreadPool.h

class ThreadPool
{
public:

ThreadPool();
~ThreadPool();

// Creates a pool of threads and gets them ready to be used
void CreateThreads(int numOfThreads);

// Assigns a job to a thread in the pool, but doesn't start the job
// Each SubmitJob call will use up one thread of the pool.
// This operation can only be undone by calling StartJobs and
// then waiting for the jobs to complete. On completion,
// new jobs may be submitted.
void SubmitJob( void (*workFunc)(void *), void *workData );

// Begins execution of all the jobs in the pool.
void StartJobs();

// Waits until all jobs have completed.
// The wait will block the caller.
// On completion, new jobs may be submitted.
void WaitForJobsToComplete();

private:

enum typeOfWorkEnum { e_work, e_quit };

class ThreadData
{
public:

bool ready; // thread has been created and is ready for work
bool haveWorkToDo;
typeOfWorkEnum typeOfWork;

// Pointer to the work function each thread has to call.
void (*workFunc)(void *);

// Pointer to work data
void *workData;

ThreadData() : ready(false), haveWorkToDo(false) { };
};

struct ThreadArgStruct
{
ThreadPool *threadPoolInstance;
int threadId;
};

// Data for each thread
ThreadData *m_ThreadData;

ThreadPool(ThreadPool const&); // copy ctor hidden
ThreadPool& operator=(ThreadPool const&); // assign op. hidden

// Static function that provides the function pointer that a thread can call
// By including the ThreadPool instance in the void * parameter,
// we can use it to access other data and methods in the ThreadPool instance.
static void ThreadFuncWrapper(void *arg)
{
ThreadArgStruct *threadArg = static_cast<ThreadArgStruct *>(arg);
threadArg->threadPoolInstance->ThreadFunc(threadArg->threadId);
}

// The function each thread calls
void ThreadFunc( int threadId );

// Called by the thread pool destructor
void DestroyThreadPool();

// Total number of threads available
// (fixed on creation of thread pool)
int m_numOfThreads;
int m_NumOfThreadsDoingWork;
int m_NumOfThreadsGivenJobs;

// List of threads
std::vector<tthread::thread *> m_ThreadList;

// Condition variable to signal each thread has been created and executing
tthread::mutex m_ThreadReady_mutex;
tthread::condition_variable m_ThreadReady_condvar;

// Condition variable to signal each thread to start work
tthread::mutex m_WorkToDo_mutex;
tthread::condition_variable m_WorkToDo_condvar;

// Condition variable to signal the main thread that
// all threads in the pool have completed their work
tthread::mutex m_WorkCompleted_mutex;
tthread::condition_variable m_WorkCompleted_condvar;
};

cpp文件:

//
// ThreadPool.cpp
//

#include "ThreadPool.h"

// This is the thread function for each thread.
// All threads remain in this function until
// they are asked to quit, which only happens
// when terminating the thread pool.
void ThreadPool::ThreadFunc( int threadId )
{
ThreadData *myThreadData = &m_ThreadData[threadId];
std::cout << "Hello world: Thread " << threadId << std::endl;

// Signal that this thread is ready
m_ThreadReady_mutex.lock();
myThreadData->ready = true;
m_ThreadReady_condvar.notify_one(); // notify the main thread
m_ThreadReady_mutex.unlock();

while(true)
{
//tthread::lock_guard<tthread::mutex> guard(m);
m_WorkToDo_mutex.lock();

while(!myThreadData->haveWorkToDo) // check for work to do
m_WorkToDo_condvar.wait(m_WorkToDo_mutex); // if no work, wait here
myThreadData->haveWorkToDo = false; // need to do this before unlocking the mutex

m_WorkToDo_mutex.unlock();

// Do the work
switch(myThreadData->typeOfWork)
{
case e_work:
std::cout << "Thread " << threadId << ": Woken with work to do\n";

// Do work
myThreadData->workFunc(myThreadData->workData);

std::cout << "#Thread " << threadId << ": Work is completed\n";
break;

case e_quit:
std::cout << "Thread " << threadId << ": Asked to quit\n";
return; // ends the thread
}

// Now to signal the main thread that my work is completed
m_WorkCompleted_mutex.lock();
m_NumOfThreadsDoingWork--;

// Unsure if this 'if' would make the program more efficient
// if(m_NumOfThreadsDoingWork == 0)
m_WorkCompleted_condvar.notify_one(); // notify the main thread
m_WorkCompleted_mutex.unlock();
}

}


ThreadPool::ThreadPool()
{
m_numOfThreads = 0; m_NumOfThreadsDoingWork = 0; m_NumOfThreadsGivenJobs = 0;
}


ThreadPool::~ThreadPool()
{
if(m_numOfThreads)
{
DestroyThreadPool();
delete [] m_ThreadData;
}
}


void ThreadPool::CreateThreads(int numOfThreads)
{
// Check if a thread pool has already been created
if(m_numOfThreads > 0)
return;

m_NumOfThreadsGivenJobs = 0;
m_NumOfThreadsDoingWork = 0;
m_numOfThreads = numOfThreads;
m_ThreadData = new ThreadData[m_numOfThreads];
ThreadArgStruct threadArg;

for(int i=0; i<m_numOfThreads; ++i)
{
threadArg.threadId = i;
threadArg.threadPoolInstance = this;

// Creates the thread and saves it in a list so we can destroy it later
m_ThreadList.push_back( new tthread::thread( ThreadFuncWrapper, (void *)&threadArg ) );

// It takes a little time for a thread to get established.
// Best wait until it gets established before creating the next thread.
m_ThreadReady_mutex.lock();
while(!m_ThreadData[i].ready) // Check if thread is ready
m_ThreadReady_condvar.wait(m_ThreadReady_mutex); // If not, wait here
m_ThreadReady_mutex.unlock();
}
}


// Assigns a job to a thread, but doesn't start the job
void ThreadPool::SubmitJob(void (*workFunc)(void *), void *workData)
{
// Check if the thread pool has been created
if(!m_numOfThreads)
return;

if(m_NumOfThreadsGivenJobs >= m_numOfThreads)
return;

m_ThreadData[m_NumOfThreadsGivenJobs].workFunc = workFunc;
m_ThreadData[m_NumOfThreadsGivenJobs].workData = workData;

std::cout << "Submitted job " << m_NumOfThreadsGivenJobs << std::endl;

m_NumOfThreadsGivenJobs++;
}

void ThreadPool::StartJobs()
{
// Check that the thread pool has been created
// and some jobs have been assigned
if(!m_numOfThreads || !m_NumOfThreadsGivenJobs)
return;

// Set 'haveworkToDo' flag for all threads
m_WorkToDo_mutex.lock();
for(int i=0; i<m_NumOfThreadsGivenJobs; ++i)
{
m_ThreadData[i].typeOfWork = e_work; // forgot to do this !
m_ThreadData[i].haveWorkToDo = true;
}
m_NumOfThreadsDoingWork = m_NumOfThreadsGivenJobs;

// Reset this counter so we can resubmit jobs later
m_NumOfThreadsGivenJobs = 0;

// Notify all threads they have work to do
m_WorkToDo_condvar.notify_all();
m_WorkToDo_mutex.unlock();
}


void ThreadPool::WaitForJobsToComplete()
{
// Check that a thread pool has been created
if(!m_numOfThreads)
return;

m_WorkCompleted_mutex.lock();
while(m_NumOfThreadsDoingWork > 0) // Check if all threads have completed their work
m_WorkCompleted_condvar.wait(m_WorkCompleted_mutex); // If not, wait here
m_WorkCompleted_mutex.unlock();
}


void ThreadPool::DestroyThreadPool()
{
std::cout << "Ask threads to quit\n";
m_WorkToDo_mutex.lock();
for(int i=0; i<m_numOfThreads; ++i)
{
m_ThreadData[i].haveWorkToDo = true;
m_ThreadData[i].typeOfWork = e_quit;
}
m_WorkToDo_condvar.notify_all();
m_WorkToDo_mutex.unlock();

// As each thread terminates, catch them here
for(int i=0; i<m_numOfThreads; ++i)
{
tthread::thread *t = m_ThreadList[i];

// Wait for thread to complete
t->join();
}
m_numOfThreads = 0;
}

使用示例:(通过对平方的倒数求和来计算 pi-squared/6)实际上,此用法示例并行运行相同的计算 10 次。更实际的用法是让每个线程计算一组不同的求和项。池作业完成后,将所有线程结果相加即可获得最终结果。

struct CalculationDataStruct
{
int inputVal;
double outputVal;
};

void LongCalculation( void *theSums )
{
CalculationDataStruct *sums = (CalculationDataStruct *)theSums;

int terms = sums->inputVal;
double sum;
for(int i=1; i<terms; i++)
sum += 1.0/( double(i)*double(i) );
sums->outputVal = sum;
}


int main(int argc, char** argv)
{
int numThreads = 10;

// Create pool
ThreadPool threadPool;
threadPool.CreateThreads(numThreads);

// Create thread workspace
CalculationDataStruct sums[numThreads];

// Set up jobs
for(int i=0; i<numThreads; i++)
{
sums[i].inputVal = 3000*(i+1);
threadPool.SubmitJob(LongCalculation, &sums[i]);
}

// Run the jobs
threadPool.StartJobs();
threadPool.WaitForJobsToComplete();

// Print results
for(int i=0; i<numThreads; i++)
std::cout << "Sum of " << sums[i].inputVal << " terms is " << sums[i].outputVal << std::endl;

return 0;
}

问题:在 ThreadPool::ThreadFunc 方法中,如果下面的 if 语句会获得更好的性能

if(NumOfThreadsDoingWork == 0)

被包括在内?此外,我将感谢批评和改进代码的方法。同时,希望代码对其他人有用。

最佳答案

首先,您可能想查看 C++11 的“std::thread”和“std::mutex”。您可能还想研究英特尔的“Threading Building Blocks”,它提供了多种工作分配模式。对于可移植的、跨平台的、C++ 封装的 API,我通常使用 OpenThreads library。 .最后,您可以使用消息传递库(例如 ZeroMQ)在没有互斥体的情况下构建可扩展的分布式工作负载。 .

查看您当前的代码,我最担心的是您似乎没有锁定用于将工作分配给线程的变量;我假设这是因为您已将 SubmitJob 和 StartWork 分开。

但归根结底,您的 ThreadPool 不是线程安全的。

它还有一些复杂的 API,包括工作类型等。您可能需要抽象出“工作”的概念。这是我这样做的一个示例,您可能希望将大部分代码封装回您的 ThreadPool 类中;终止方法(NULL 作业)也有点人为,您可能想使用 pthread_cancel,但这对演示非常有用。

#include <queue>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

static int jobNo = 0;
class Job {
public:
Job() : m_i(++jobNo) { printf("Created job %d.\n", m_i); }
int m_i;
void Execute() { printf("Job %d executing.\n", m_i); usleep(500 * 1000); }
};

std::queue<Job*> queue;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void AddJob(Job* job) {
pthread_mutex_lock(&mutex);
queue.push(job);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}

void* QueueWorker(void* /*threadInfo*/) {
Job* job = NULL;
for (;;) {
pthread_mutex_lock(&mutex);
while ( queue.empty() ) {
// unlock the mutex until the cond is signal()d or broadcast() to.
// if this call succeeds, we will have the mutex locked again on the other side.
pthread_cond_wait(&cond, &mutex);
}
// take the first task and then release the lock.
job = queue.front();
queue.pop();
pthread_mutex_unlock(&mutex);

if ( job == NULL ) {
// in this demonstration, NULL ends the run, so forward to any other threads.
AddJob(NULL);
break;
}
job->Execute();
delete job;
}
return NULL;
}

int main(int argc, const char* argv[]) {
pthread_t worker1, worker2;
pthread_create(&worker1, NULL, &QueueWorker, NULL);
pthread_create(&worker2, NULL, &QueueWorker, NULL);

srand(time(NULL));

// queue 5 jobs with delays.
for ( size_t i = 0; i < 5; ++i ) {
long delay = (rand() % 800) * 1000;
printf("Producer sleeping %fs\n", (float)delay / (1000*1000));
usleep(delay);
Job* job = new Job();
AddJob(job);
}
// 5 more without delays.
for ( size_t i = 0; i < 5; ++i ) {
AddJob(new Job);
}
// null to end the run.
AddJob(NULL);

printf("Done with jobs.\n");
pthread_join(worker1, NULL);
pthread_join(worker2, NULL);

return 0;
}

关于c++ - 查询简单的 C++ 线程池实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11234894/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com