gpt4 book ai didi

c - 线程池 - 处理任务多于线程的情况

转载 作者:行者123 更新时间:2023-11-30 15:19:06 25 4
gpt4 key购买 nike

我刚刚进入多线程编程,并作为尝试使用 pthread 实现简单线程池的练习的一部分。

我尝试使用条件变量向工作线程发出信号,表明队列中有作业在等待。但由于某种原因,我无法弄清楚该机制不起作用。

以下是相关代码片段:

<小时/>
typedef struct thread_pool_task
{
void (*computeFunc)(void *);
void *param;
} ThreadPoolTask;

typedef enum thread_pool_state
{
RUNNING = 0,
SOFT_SHUTDOWN = 1,
HARD_SHUTDOWN = 2
} ThreadPoolState;

typedef struct thread_pool
{
ThreadPoolState poolState;
unsigned int poolSize;
unsigned int queueSize;
OSQueue* poolQueue;
pthread_t* threads;
pthread_mutex_t q_mtx;
pthread_cond_t q_cnd;
} ThreadPool;
<小时/>
static void* threadPoolThread(void* threadPool){

ThreadPool* pool = (ThreadPool*)(threadPool);
for(;;)
{
/* Lock must be taken to wait on conditional variable */
pthread_mutex_lock(&(pool->q_mtx));

/* Wait on condition variable, check for spurious wakeups.
When returning from pthread_cond_wait(), we own the lock. */
while( (pool->queueSize == 0) && (pool->poolState == RUNNING) )
{
pthread_cond_wait(&(pool->q_cnd), &(pool->q_mtx));
}

printf("Queue size: %d\n", pool->queueSize);

/* --- */
if (pool->poolState != RUNNING){
break;
}

/* Grab our task */
ThreadPoolTask* task = osDequeue(pool->poolQueue);
pool->queueSize--;


/* Unlock */
pthread_mutex_unlock(&(pool->q_mtx));

/* Get to work */
(*(task->computeFunc))(task->param);
free(task);
}


pthread_mutex_unlock(&(pool->q_mtx));
pthread_exit(NULL);
return(NULL);
}
<小时/>
ThreadPool* tpCreate(int numOfThreads)
{
ThreadPool* threadPool = malloc(sizeof(ThreadPool));
if(threadPool == NULL) return NULL;


/* Initialize */
threadPool->poolState = RUNNING;
threadPool->poolSize = numOfThreads;
threadPool->queueSize = 0;

/* Allocate OSQueue and threads */
threadPool->poolQueue = osCreateQueue();
if (threadPool->poolQueue == NULL)
{

}
threadPool->threads = malloc(sizeof(pthread_t) * numOfThreads);
if (threadPool->threads == NULL)
{

}

/* Initialize mutex and conditional variable */
pthread_mutex_init(&(threadPool->q_mtx), NULL);
pthread_cond_init(&(threadPool->q_cnd), NULL);

/* Start worker threads */
for(int i = 0; i < threadPool->poolSize; i++)
{
pthread_create(&(threadPool->threads[i]), NULL, threadPoolThread, threadPool);
}

return threadPool;
}
<小时/>
int tpInsertTask(ThreadPool* threadPool, void (*computeFunc) (void *), void* param)
{
if(threadPool == NULL || computeFunc == NULL) {
return -1;
}

/* Check state and create ThreadPoolTask */
if (threadPool->poolState != RUNNING) return -1;
ThreadPoolTask* newTask = malloc(sizeof(ThreadPoolTask));
if (newTask == NULL) return -1;
newTask->computeFunc = computeFunc;
newTask->param = param;

/* Add task to queue */
pthread_mutex_lock(&(threadPool->q_mtx));
osEnqueue(threadPool->poolQueue, newTask);
threadPool->queueSize++;
pthread_cond_signal(&(threadPool->q_cnd));
pthread_mutex_unlock(&threadPool->q_mtx);

return 0;
}
<小时/>

问题是,当我创建一个包含 1 个线程的池并向其中添加大量作业时,它不会执行所有作业。

[编辑:]我尝试运行以下代码来测试基本功能:

void hello (void* a)
{
int i = *((int*)a);
printf("hello: %d\n", i);
}
void test_thread_pool_sanity()
{
int i;

ThreadPool* tp = tpCreate(1);

for(i=0; i<10; ++i)
{
tpInsertTask(tp,hello,(void*)(&i));
}
}

我希望输入如下内容:

hello: 0
hello: 1
hello: 2
hello: 3
hello: 4
hello: 5
hello: 6
hello: 7
hello: 8
hello: 9

相反,有时我会得到以下输出:

Queue size: 9 //printf added for debugging within threadPoolThread
hello: 9
Queue size: 9 //printf added for debugging within threadPoolThread
hello: 0

有时我根本没有得到任何输出。我缺少什么?

最佳答案

当您调用tpInsertTask(tp,hello,(void*)(&i));时您正在传递堆栈上 i 的地址。这有多个问题:

  1. 每个线程都获取相同的地址。我猜测 hello 函数会获取该地址并打印出 *param,它们都指向堆栈上的同一位置。
  2. 由于一旦 test_thread_pool_sanity 返回,i 就在堆栈上,最后一个值就会丢失,并将被其他代码覆盖,因此该值未定义。

根据工作线程完成任务的情况与主测试线程调度任务的情况,您将得到不同的结果。

您需要将传递的参数保存为任务的一部分,以保证每个任务的参数都是唯一的。

编辑:您还应该检查 pthread_create 的返回代码以查看它是否失败。

关于c - 线程池 - 处理任务多于线程的情况,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30854862/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com