gpt4 book ai didi

c - sem_wait 上的线程阻塞导致其他线程挂起

转载 作者:行者123 更新时间:2023-12-03 09:23:10 26 4
gpt4 key购买 nike

我用 C 编写了一个通用队列,用于各种负载类型。它是一个阻塞队列,因此消费者线程将阻塞等待队列被生产者线程填充。

我已经使用 check 单独测试了队列代码,包括线程阻塞等待将值添加到队列的行为。所有这些测试都通过了,但是,当将队列集成到代码的其余部分时,我遇到了这样一种情况:线程第一次尝试阻塞队列时,所有其他线程都挂起。

具体来说,我正在集成的程序是一个更大生态系统的成员,因此有一个启动脚本来初始化程序,然后进行守护进程。守护线程然后创建几个分离的线程来执行各种功能。其中一个线程调用 sem_wait,所有线程都挂起,包括生成守护进程的线程。

为确认此调用是问题所在,我使用调试器以非守护程序模式运行程序,确认 sem_wait 已挂起。在生成在队列中等待的线程之前,我还添加了一个 sleep。在这种情况下,其他线程继续前进,然后在调用 sem_wait 时挂起。

有问题的队列只对这个程序可见。它的引用存储为全局变量。调用 sem_wait 时,队列肯定是空的。

队列代码如下:

//Queue.h
#include <pthread.h>
#include <semaphore.h>

typedef void (*freeFunction)(void *);

typedef struct _queueNode {
void *data;
struct _queueNode *next;
} queueNode;


typedef struct queue {
sem_t *logicalLength;
size_t elementSize;

queueNode *head;
queueNode *tail;

freeFunction freeFn;
pthread_mutex_t *queueLock;
} queue_t;

void queue_initialize(queue_t *queue, size_t elementSize, freeFunction freeFn);
void queue_destroy(queue_t *queue); // Removes all elements from the queue

int queue_size(queue_t *queue); // Returns the number of elements in the queue

void queue_add(queue_t *queue, void *element); // Adds to tail
int queue_take(queue_t *queue, void *elementBuffer); // Returns/removes head, blocks if empty


//Queue.c
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <time.h>

#include "Queue.h"

void queue_initialize(queue_t *queue, size_t elementSize, freeFunction freeFn) {

assert(elementSize > 0);
assert(queue != NULL);

queue->elementSize = elementSize;

queue->head = NULL;
queue->tail = NULL;

queue->freeFn = freeFn;

queue->logicalLength = calloc(1, sizeof(sem_t));
queue->queueLock = calloc(1, sizeof(pthread_mutex_t));

sem_init(queue->logicalLength, 0, 0);

pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(queue->queueLock, &attr);

}

void queue_destroy(queue_t *queue) {

assert(queue != NULL);

queueNode *current;

while(queue->head != NULL) {

current = queue->head;
queue->head = current->next;

if(queue->freeFn != NULL) {

queue->freeFn(current->data);

}

free(current->data);
free(current);

}

queue->head = NULL;
queue->tail = NULL;

pthread_mutex_destroy(queue->queueLock);
sem_destroy(queue->logicalLength);

free(queue->queueLock);
free(queue->logicalLength);

}

void queue_add(queue_t *queue, void *element) {

assert(queue != NULL);
assert(element != NULL);

pthread_mutex_lock(queue->queueLock);

queueNode *node = calloc(1, sizeof(queueNode));
node->data = calloc(1, queue->elementSize);

node->next = NULL;

memcpy(node->data, element, queue->elementSize);

if(queue->head == NULL) {

queue->head = queue->tail = node;

} else {

queue->tail->next = node;
queue->tail = node;

}

sem_post(queue->logicalLength);

pthread_mutex_unlock(queue->queueLock);

}

void queue_removeNode(queue_t *queue, void *elementBuffer) {

pthread_mutex_lock(queue->queueLock);

if( queue->head == NULL ) {

pthread_mutex_unlock(queue->queueLock);
return;
}

queueNode *node = queue->head;
memcpy(elementBuffer, node->data, queue->elementSize);

if(queue->head == queue->tail)
queue->tail = NULL;

queue->head = node->next;

if(queue->freeFn) {

queue->freeFn(node->data);
}

free(node->data);
free(node);

pthread_mutex_unlock(queue->queueLock);

}

int queue_take(queue_t *queue, void *elementBuffer) {

assert(queue != NULL);
assert(elementBuffer != NULL);

int result = EXIT_SUCCESS;

sem_wait(queue->logicalLength);

queue_removeNode(queue, elementBuffer);

return result;
}

下面是暴露问题的代码:

//fei.h
...
#include "Queue.h"
extern queue_t *commandQueue;
...

//fei.c
#include "fei.h"
#include "commandHandler.h"
#include "Queue.h"

queue_t *commandQueue;

int main (int argc, char **argv){

int debugFlag = handleOpts(argc, argv);

if(!debugFlag){
int rc = daemonize();
if(rc != 0){
exit(rc);
}
}

rc = setConfigValues();
if(rc){
exit(rc);
}

queue_t *commandQueue = calloc(1, sizeof(queue_t));
queue_initialize(commandQueue, sizeof(commandPack_t), commandFree);

if(getPortIsock() == 0){ // This is a simple config value
exit(EXIT_FAILURE);
}

signal(SIGPIPE, SIG_IGN);

pthread_t id;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&id, &attr, receiveCommands, NULL);
pthread_create(&id, &attr, processCommands, NULL);

if(!setSocketIsock()){
exit(1);
}
while(!checkIfConnectedToSct())
usleep(50000);

receiveCCSDSPackets();
exit (0);
}

// commandHandler.c
#include "Queue.h"
#include "fei.h"
#include "commandHandler.h"

queue_t *commandQueue;

void *receiveCommands(){

getNewCsockConnection();
connectedToSct = 1;

while(1){
commandPack_t cmd;
int validCommand = getCommand(CSOCKET, &cmd);
if(validCommand == RECEIVE_SUCCESS){

queue_add(commandQueue, &cmd);

} else{
usleep(5000);
}
}
return NULL;
}

void *processCommands(){
while(1){
commandPack_t cmdToProcess;

/* Blocking queue */
queue_take(commandQueue, &cmdToProcess);


switch(cmdToProcess.command){
// Command processing
}

commandFree(&cmdToProcess);
}
return NULL;
}

receiveCommands 函数是生产者线程,processCommands 函数是消费者线程。这些是代码库中唯一引用 commandQueue 的地方。虽然它是可变的,但主线程的执行很少超出 setSocketIsock() 条件检查。

欢迎任何见解。

最佳答案

main() 中,你有这一行:

queue_t *commandQueue = calloc(1, sizeof(queue_t));

这使得 commandQueue 成为 main 的局部变量。您的其他函数使用一个也名为 commandQueue 的全局变量。这使我认为您不打算在 main 中重新声明 commandQueue。因此,将上面的行更改为:

commandQueue = calloc(1, sizeof(queue_t));

关于c - sem_wait 上的线程阻塞导致其他线程挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28418298/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com