gpt4 book ai didi

c - 生产者消费者信号量线程安全

转载 作者:太空狗 更新时间:2023-10-29 12:41:17 26 4
gpt4 key购买 nike

我有一个关于 Linux 上的 C 的编程问题。此处显示的代码是我尝试编写代码的大型服务器的“快照”。

为了解释,我想:

  1. fork() 服务器中的多个子进程来处理由父进程或客户端或任何人生成的请求
  2. 每个子进程有多个生产者(写入者)线程和1个消费者(读取者)线程
  3. 指针的环形缓冲区保存线程写入和读取的信息
  4. 生产者线程从链表(节点->字符串)中检索字符串并将节点->字符串“推送”到消费者读取的指针环形缓冲区
  5. 消费者线程从环形缓冲区“弹出”并写入一些流(文件、标准输出等)
  6. 我使用 POSIX 信号量来尝试在线程之间同步对 Ring Buffer 的访问(这可能就是问题所在)

我的 Ring Buffer 不工作。我不确定我是否对缓冲区编码不当,或者信号量是否没有像我希望的那样工作。

我包含了 'server.c' 和 'circbuff.c' 还有一个 linkedlist.h 库,但我相信它可以工作

可能的问题?

  1. 我全局声明了 buffer_t 缓冲区(环形缓冲区)。我没有 malloc 它。

  2. ????

任何愿意提供帮助的人都将不胜感激。

关于如何让线程与链表或环形缓冲区对话的建议是最受欢迎的。

输出总是不同的,但它插入环形缓冲区的总是比弹出的多。

OUTPUT: ..format = 'PUSH: child line# thread'

PUSH: 2264 line 0      thread: 139746191480576
PUSH: 2264 line 0 thread: 139746183087872
PUSH: 2264 line 1 thread: 139746183087872
Buffer underflow
pop 0
Buffer underflow
pop 0
Buffer underflow
pop 0
PUSH: 2275 line 0 thread: 139746191480576
PUSH: 2275 line 0 thread: 139746191480576

更多输出

PUSH: 4208 line 0      thread: 140707316717312
PUSH: 4208 line 1 thread: 140707316717312
PUSH: 4208 line 2 thread: 140707316717312
PUSH: 4208 line 3 thread: 140707316717312
PUSH: 4208 line 4 thread: 140707316717312
PUSH: 4208 line 5 thread: 140707316717312
pop 4208 line 0
pop 4208 line 0

服务器.C

    #include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <unistd.h>
#include <sys/types.h>
#include <semaphore.h>
#include <dirent.h>
#include <sys/wait.h>
#include <errno.h>

#include "circbuff.h"
#include "server.h"


#define SNAME "/OS"

#define MAXDIRPATH 1024
#define MAXKEYWORD 256
#define MAXLINESIZE 1024
#define MAXOUTSIZE 2048


sem_t * sem_wrt;
sem_t * sem_empty;
sem_t * sem_full;
sem_t * sem_mutex;

buffer_t buffer; //a circular buffer pointer array that reads info from a linked list
void * producer(void *arg); //thread producer
void * consumer(void *arg); //thread consumer

int main( int argc, char *argv[]){

if (argc != 3){
printf("Three arguments required .../client <arbitrary> <**buffer_size**>\n");
exit(0);
}

int reqsize= atoi(argv[1]);
int bbuff= atoi(argv[2]);
//buffer_t *buf;
int qbuffsize= reqsize*(MAXDIRPATH+MAXKEYWORD);

//char tok1[MAXDIRPATH];
//char tok2[MAXKEYWORD];
int child_num, status;

/*** Semaphore open */
sem_wrt=sem_open("/sem_wrt", O_CREAT, S_IRUSR | S_IWUSR, 1);
if (sem_wrt == SEM_FAILED) {
perror("server: sem_open failed for semaphone sem_wrt");
return EXIT_FAILURE;
}
sem_empty=sem_open("/sem_empty", O_CREAT, S_IRUSR | S_IWUSR, bbuff); //semaphore waits for bbuff number of things to stop process
if (sem_empty == SEM_FAILED) {
perror("server: sem_open failed for semaphone sem_empty");
return EXIT_FAILURE;
}
sem_full=sem_open("/sem_full", O_CREAT, S_IRUSR | S_IWUSR, 0);
if (sem_full == SEM_FAILED) {
perror("server: sem_open failed for semaphone sem_full");
return EXIT_FAILURE;
}
sem_mutex=sem_open("/sem_mutex", O_CREAT, S_IRUSR | S_IWUSR, 1);
if (sem_mutex == SEM_FAILED) {
perror("server: sem_poen failed for semaphone sem_mutex");
return EXIT_FAILURE;
}


/* Create 4 child processes (each will have 11 threads per below, 1 producer, 10 consumers) */
pid_t pid;
int ct;
for(ct=0; ct<4; ct++){

pid=fork(); //make 5 children
child_num++;
}




if(pid==0){//IF CHILD PROCESS

struct ThreadList* t_list=NULL; //delcare a list of threads
t_list= create_list(); //create the list of thread_info_nodes

init(&buffer, bbuff); //initial circular buffer (declared GLOBALLY), which to multiple threads write, 1 thread reads

pthread_t ctid; //consumer thread id
pthread_create(&ctid, NULL, consumer, t_list); //create one consumer/writer/pop-list thread


/* make n_cnt producer threads (10 threads) that take node->string from a linked list and push to a ring buffer of pointers */
int n_cnt;
for(n_cnt=0; n_cnt<10; n_cnt++){
struct thread_info_node *tnode = malloc(sizeof(struct thread_info_node));//make a node on the heap
init_tnode(tnode, n_cnt, NULL, " line "); //initialize node info ..a string
insert_tail( tnode, t_list); //insert node to tail of list
pthread_create( &(tnode->tid), NULL, producer, t_list); //create a few producer threads that read list
}



//When child process is done, close semaphores to release resources used by child
sem_close(sem_wrt);
sem_close(sem_empty);
sem_close(sem_full);
sem_close(sem_mutex);
}

else{// if the PARENT
/*wait for child processes to finish*/
int cnt;
for(cnt=0; cnt < child_num ; cnt++){
waitpid(-1, &status, 0);}


sem_close(sem_wrt);
sem_close(sem_empty);
sem_close(sem_full); //printf("close sem_full= %d\n", sem_close(sem_full) );
sem_close(sem_mutex);


sem_unlink("/sem_wrt");
sem_unlink("/sem_empty");
sem_unlink("/sem_full"); //printf("unlink sem_full= %d\n", sem_unlink("/sem_full") );
sem_unlink("/sem_mutex");

}


return 0;
}


void * producer(void *arg){

struct ThreadList *list = arg;
struct thread_info_node *tmp = list->head;
pthread_t tid= pthread_self();
char thread_id[MAXLINESIZE];

/* push all but last string from linked list*/
// I AM PUSHING ONLY THE 'node->string' value, not the whole node. IS THIS THE WR0NG WAY TO DO IT???
while(tmp->next != NULL){

sem_wait(sem_empty); //decrement empty. if 0 no more empty, stop process.
sem_wait(sem_mutex);

/*** CRITICAL SECTION ***/
snprintf(thread_id, 10, "%lu", tid);
//strcat(tmp->filename, thread_id);
printf("PUSH: %s thread: %lu\n", tmp->filename, tid);
push( &buffer, tmp->filename );
///--------------------//

sem_post(sem_mutex);
sem_post(sem_full);

tmp= tmp->next;
}

/* push last string left in linked list, and an exit message for the consumer */
sem_wait(sem_empty); //decrement empty. if 0 no more slots empty, stop process.
sem_wait(sem_mutex);

/*** CRITICAL SECTION ***/
snprintf(thread_id, 10, "%lu", tid);
strcat(tmp->filename, thread_id);
printf("PUSH: %s\n", tmp->filename);
push( &buffer, tmp->filename );

printf("SPECIAL_EXIT_CODE\n");
push( &buffer, "SPECIAL_EXIT_CODE");
///--------------------//

sem_post(sem_mutex);
sem_post(sem_full);

return NULL;
}


void * consumer(void *arg){

//struct ThreadList *list = arg;

char outline[MAXOUTSIZE];

while(1){

sem_wait(sem_full);
sem_wait(sem_mutex);
/******* CRITICAL SECTION ******/

//printf("==>CONSUMER THREAD\n");
strcpy(outline, popqueue(&buffer));
printf("pop %s \n", outline );

if (strcmp(outline,"SPECIAL_EXIT_CODE")==0) break;


/********************************/
sem_post(sem_mutex);
sem_post(sem_empty);


}//end WHILE*/

return NULL;
}

CIRCBUFF.H

#ifndef __CIRCBUFF_h__
#define __CIRCBUFF_h__


#include <stdio.h>
#include <stdlib.h>

#define MAXDIRPATH 1024
#define MAXKEYWORD 256
#define MAXLINESIZE 1024
#define MAXOUTSIZE 2048


struct buffer {
int size;
int start;
//int end; // position of last element
/* Tracking start and end of buffer would waste
* one position. A full buffer would always have
* to leave last position empty or otherwise
* it would look empty. Instead this buffer uses
* count to track if buffer is empty or full
*/
int count; // number of elements in buffer
/* Two ways to make buffer element type opaque
* First is by using typedef for the element
* pointer. Second is by using void pointer.
*/
/* different types of buffer:
int *element; // array of integers
char *element; // array of characters
void *element; // array of void type (could cast to int, char, etc)
char **element; //array of char pointers (array of strings)
void **element; // array of void pointers
Choosing array of void pointers since it's the most flexible */
void **element;

};

typedef struct buffer buffer_t;

void init(buffer_t *buffer, int size) {
buffer->size = size;
buffer->start = 0;
buffer->count = 0;
//buffer->element = malloc(sizeof(buffer->element)*size);
/* allocated array of void pointers. Same as below */
//buffer->element = malloc(sizeof(void *) * size);

/* Allocate array of char poitners*/
buffer->element = malloc(sizeof(char **) * size);

}

int full(buffer_t *buffer) {
if (buffer->count == buffer->size) {
return 1;
} else {
return 0;
}
}

int empty(buffer_t *buffer) {
if (buffer->count == 0) {
return 1;
} else {
return 0;
}
}

void push(buffer_t *buffer, void *data) {
int index;
if (full(buffer)) {
printf("Buffer overflow\n");
} else {
index = buffer->start + buffer->count;
if (index >= buffer->size) {
index = 0;
//index= index - buffer->size;//6
}
buffer->element[index] = data;
buffer->count++;
}
}


void * popqueue(buffer_t *buffer) {
void * element;
if (empty(buffer)) {
printf("Buffer underflow\n");
return "0";
} else {
/* FIFO implementation */
element = buffer->element[buffer->start];
buffer->start++;
buffer->count--;
if (buffer->start == buffer->size) {
buffer->start = 0;
}

return element;
}
}

void * popstack(buffer_t *buffer) {
int index;
if (empty(buffer)) {
printf("Buffer underflow\n");
return "0";
} else {
/* LIFO implementation */
index = buffer->start + buffer->count - 1;
if (index >= buffer->size) {
index = buffer->count - buffer->size - 1;
//index= index-buffer->size
//index = buffer->count – buffer->size – 1;
}
buffer->count--;
return buffer->element[index];
}
}

#endif

最佳答案

问题是 fork() 不共享全局变量。子进程和父进程都有自己的缓冲区副本。为此,您应该使用共享内存。

关于c - 生产者消费者信号量线程安全,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43221532/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com