gpt4 book ai didi

两端都有等待的消费者/生产者

转载 作者:行者123 更新时间:2023-11-30 20:25:02 25 4
gpt4 key购买 nike

我使用互斥体条件编写了一个生产者/消费者程序。它使用全局 int 来生成和使用值。有 1 个消费者线程和多个生产者线程。

规则:

  1. 当值太小时,消费者会等待。

  2. 当值太大时,生产者就会等待。

我的问题是:

我们知道消费者通常需要等待,但生产者则取决于。
在我的示例中,他们都需要检查条件,并且可能互相等待,这是一个好的做法吗?
我的以下实现可能会导致死锁吗?

代码:

// condition test, a producer/consumer program,

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

static int glob = 0; // global variable, shared by threads,
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

/**
* increase once, with lock, it's producer,
*
* @param arg
* {max, point}
* where:
* max, max value that would increase to,
* point, is min value that would trigger consume,
*
* @return
* 0, not changed,
* >0, increased,
* <0, error,
*/
static void *inc(void *arg) {
int *args = (int *)arg;
int max = args[0];
int point = args[1];

int result;
int n = 0;

if((result = pthread_mutex_lock(&mtx)) != 0) { // lock
printf("error to get lock: %d\n", result);
pthread_exit(NULL); // terminate if error,
} else {
while(glob >= max) {
if((result = pthread_cond_wait(&cond, &mtx)) != 0) { // wait
printf("failed to wait for condition: %d\n", result);
return (void *)-1;
}
}

// do jobs,
glob++; // this will be compiled into multiple lines in machine code, so it's not automic,
n = 1;
/*
printf("inc by 1, %d\n", glob);
fflush(stdout);
*/

if(glob >= point) { // condition signal
if((result = pthread_cond_signal(&cond)) !=0 ) {
printf("error to condition signal: %d\n", result);
return (void *)-1;
} else {
// printf("condition signal, from thread [%d], value: %d\n", (int)pthread_self(), glob);
}
}

if((result = pthread_mutex_unlock(&mtx)) != 0) { // unlock
printf("error to unlock: %d\n", result);
return (void *)-1;
}
}

return (void *)n;
}

// increase loop,
static void *inc_loop(void *arg) {
int result;
while(1) {
if((result = (int)inc(arg)) < 0) {
return (void *)result;
}
}
}

/**
* consumer, with lock,
*
* @param arg
* {point, steps}
* where:
* point, is min value that would trigger consume,
* steps, is the count each consume would take,
*
* @return
* 0, not consumed,
* >0, consumed,
* <0, error,
*/
static void *consume(void *arg) {
int *args = (int *)arg;
int point = args[0];
int step = args[1];
int result;
int n = 0;

if((result = pthread_mutex_lock(&mtx)) != 0) { // lock
printf("error to get lock: %d\n", result);
pthread_exit(NULL); // terminate if error,
} else {
while(glob < point) {
pthread_cond_broadcast(&cond); // broadcast
printf("broadcast, and sleep,\n");

if((result = pthread_cond_wait(&cond, &mtx)) != 0) { // wait
printf("failed to wait for condition: %d\n", result);
return (void *)-1;
}
}

// do job
printf("going to perform consume: %d -> ", glob);
glob-=(glob>=step?step:glob);
printf("%d\n", glob);
n = 1;

if((result = pthread_mutex_unlock(&mtx)) != 0) { // unlock
printf("error to unlock: %d\n", result);
}
}

return (void *)n;
}

// condition test
int condition_test(void *(*func_inc_loop) (void *), void *(*func_consume) (void *), int thread_count, int max, int point, int consume_count, int step) {
pthread_t threads[thread_count];

int result, i;
int inc_args[] = {
max, // max value that would increase to,
point // min value that would trigger consume,
};

// start threads
for(i=0; i<thread_count; i++) {
if((result = pthread_create(threads+i, NULL, func_inc_loop, inc_args)) != 0) {
printf("error create thread [%d]: %d\n", i, result);
}
}

int loops = 0;
int consume_args[] = {
point, // min point to trigger consume,
step // consume steps
};

// begin consume loop,
while(loops < consume_count) {
if(func_consume(consume_args) > 0) {
loops++;
}
}

printf("\nDone.\n");

return 0;
}

/**
* command line:
* ./a.out <[thread_count]> <[max]> <[point]> <[consume_count]>
*/
int main(int argc, char *argv[]) {
int thread_count = 3;
int max = 1000;
int point = 100;
int consume_count = 10; // how many times consume execute,
int step = 200; // max count in each consume,

if(argc >= 2) {
thread_count = atoi(argv[1]);
}
if(argc >= 3) {
max = atoi(argv[2]);
}
if(argc >= 4) {
point = atoi(argv[3]);
}
if(argc >= 5) {
consume_count = atoi(argv[4]);
}
if(argc >= 6) {
step = atoi(argv[5]);
}

condition_test(&inc_loop, &consume, thread_count, max, point, consume_count, step);

return 0;
}

编译:

gcc -pthread xxx.c

执行:

./a.out

最佳答案

在实践中,您不应该使用互斥锁来解决生产者/消费者或读写器问题。它不一定会引起僵局,但可能会导致生产者或消费者挨饿。

我使用类似的方法来编写读取器/写入器锁。

你可以检查一下: https://github.com/prathammalik/OS161/blob/master/kern/thread/synch.c

关于两端都有等待的消费者/生产者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31078790/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com