gpt4 book ai didi

c - 如何通过pthread管理两个或更多的使用者?

转载 作者:行者123 更新时间:2023-12-04 11:36:13 25 4
gpt4 key购买 nike

我要解决一个通用的问题,即从标准输入或常规文件流发送到应用程序的二进制数据块会依次将二进制数据转换为文本。使用线程,我想先处理文本,然后再将其传递到下一个应用程序,该应用程序将进一步修改该文本,依此类推。

作为一个简单的测试用例,我想通过gunzip提取压缩数据。具体来说,我正在研究使用gunzip -c -提取通过其(重新分配的)stdin文件描述符发送给它的二进制数据块,然后从其(重新分配的)stdout文件描述符中提取文本块。然后,我可以将这些文本大块打印到实际的stdoutstderr(或稍后做其他事情)。

(我意识到我可以在命令行上执行基于gzip的压缩和提取。我的目标是使用此测试用例来学习如何在线程之间正确地传递二进制和文本数据的通用块,或者通过二进制运行该数据,或进一步处理。)

就我的测试程序而言,我设置了三个pthread_t线程:

  • produce_gzip_chunk_thread
  • consume_gzip_chunk_thread
  • consume_gunzip_chunk_thread

  • 我为每个线程传递了一个共享数据实例 thread_data,它包含一个线程锁,两个条件以及一些缓冲区和计数器变量。我还为 gunzip打开的 popen3()进程提供了一组文件描述符:
    typedef struct pthread_data pthread_data_t;
    typedef struct popen3_desc popen3_desc_t;

    struct pthread_data {
    pthread_mutex_t in_lock;
    pthread_cond_t in_cond;
    pthread_cond_t out_cond;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes;
    size_t n_in_bytes_written_to_gunzip;
    size_t n_out_bytes_read_from_gunzip;
    FILE *in_file_ptr;
    boolean in_eof;
    char in_line[LINE_LENGTH_VALUE];
    popen3_desc_t *gunzip_ptr;
    };

    struct popen3_desc {
    int in;
    int out;
    int err;
    };
    produce_gzip_chunk_thread从一个名为 gzip的常规文件中读取一块1024字节的 foo.gz压缩字节。

    这些字节被写入称为 unsigned charin_buf缓冲区,这是我传递给每个线程的共享数据结构的一部分:
    void * produce_gzip_chunk(void *t_data)
    {
    #ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> produce_gzip_chunk()\n");
    #endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes = 0;

    d->in_eof = kFalse;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
    n_in_bytes = fread(in_buf, sizeof(in_buf[0]), sizeof(in_buf), d->in_file_ptr);
    if (n_in_bytes > 0) {
    while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0)
    pthread_cond_wait(&d->in_cond, &d->in_lock);
    memcpy(d->in_buf, in_buf, n_in_bytes);
    d->n_in_bytes = n_in_bytes;
    #ifdef DEBUG
    fprintf(stderr, "Debug: ######## [%07zu] produced chunk\n", d->n_in_bytes);
    #endif
    pthread_cond_signal(&d->in_cond);
    }
    else if (feof(d->in_file_ptr) || ferror(d->in_file_ptr))
    break;
    }
    d->in_eof = kTrue;
    pthread_mutex_unlock(&d->in_lock);
    pthread_cond_signal(&d->in_cond);

    #ifdef DEBUG
    fprintf(stderr, "Debug: Leaving --> produce_gzip_chunk()\n");
    #endif
    return NULL;
    }

    一旦在 n_bytes中存储了正数的字节-也就是说,我们从输入的 gzip文件中提取了需要使用 gunzip处理的数据-这将触发一个条件,该条件允许第二个线程 consume_gzip_chunk_thread运行:
    void * consume_gzip_chunk(void *t_data)
    {
    #ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gzip_chunk()\n");
    #endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_in_bytes_written_to_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
    while (d->n_in_bytes == 0 && !d->in_eof)
    pthread_cond_wait(&d->in_cond, &d->in_lock);
    if (d->n_in_bytes) {
    #ifdef DEBUG
    fprintf(stderr, "Debug: ........ [%07zu] processing chunk\n", d->n_in_bytes);
    #endif
    if (!d->gunzip_ptr) {
    #ifdef DEBUG
    fprintf(stderr, "Debug: * setting up gunzip ptr\n");
    #endif
    d->gunzip_ptr = malloc(sizeof(popen3_desc_t));
    if (!d->gunzip_ptr) {
    fprintf(stderr, "Error: Could not create gunzip file handle struct\n");
    exit(EXIT_FAILURE);
    }

    popen3("gunzip -c -",
    &(d->gunzip_ptr->in),
    &(d->gunzip_ptr->out),
    &(d->gunzip_ptr->err),
    kTrue,
    kTrue);
    memset(d->in_line, 0, LINE_LENGTH_VALUE);
    }
    n_in_bytes_written_to_gunzip = (long) write(d->gunzip_ptr->in, d->in_buf, d->n_in_bytes);
    #ifdef DEBUG
    fprintf(stderr, "Debug: ................ wrote [%07ld] bytes into the gunzip process\n", n_in_bytes_written_to_gunzip);
    #endif
    if (n_in_bytes_written_to_gunzip > 0)
    d->n_in_bytes_written_to_gunzip = n_in_bytes_written_to_gunzip;

    d->n_in_bytes = 0;
    pthread_cond_signal(&d->out_cond);
    }
    if (d->in_eof)
    break;
    }
    pthread_mutex_unlock(&d->in_lock);

    #ifdef DEBUG
    fprintf(stderr, "Debug: Leaving --> consume_gzip_chunk()\n");
    #endif
    return NULL;
    }

    当使用 gzip数据块时,我们使用 write函数将 n_bytesin_buf发送到 gunzip进程的输入文件描述符。最后,我们向 out_cond发送另一个线程信号,但这一次是为了帮助重新唤醒 consume_gunzip_chunk_thread,后者从 gunzip的输出中读取内容以做更多工作:
    void * consume_gunzip_chunk(void *t_data)
    {
    #ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gunzip_chunk()\n");
    #endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_out_bytes_read_from_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
    while (d->n_in_bytes_written_to_gunzip == 0) {
    pthread_cond_wait(&d->out_cond, &d->in_lock);
    }
    if (d->n_in_bytes_written_to_gunzip) {
    sleep(1);
    n_out_bytes_read_from_gunzip = read(d->gunzip_ptr->out, d->in_line, LINE_LENGTH_VALUE);
    #ifdef DEBUG
    fprintf(stderr, "Debug: ------------------------ read [%07ld] bytes out from the gunzip process\n", n_out_bytes_read_from_gunzip);
    fprintf(stderr, "Debug: ------------------------ gunzip output chunk:\n[%s]\n", d->in_line);
    #endif
    memset(d->in_line, 0, strlen(d->in_line));
    if (n_out_bytes_read_from_gunzip > 0)
    d->n_out_bytes_read_from_gunzip = n_out_bytes_read_from_gunzip;
    d->n_in_bytes_written_to_gunzip = 0;
    pthread_cond_signal(&d->in_cond);
    }
    if (d->in_eof && (d->n_in_bytes_written_to_gunzip == 0))
    break;
    }
    pthread_mutex_unlock(&d->in_lock);

    #ifdef DEBUG
    fprintf(stderr, "Debug: Leaving --> consume_gunzip_chunk()\n");
    #endif
    return NULL;
    }

    这将尝试对 read进程的输出文件描述符中的所有可用字节进行 gunzip。为了调试,我现在只想将它们打印到 stderr

    我面临的问题是,在执行 sleep(1)之前,我需要在 consume_gunzip_chunk中添加 read语句,以使事情正常进行。

    如果没有此 sleep(1)语句,我的测试程序通常将不会输出任何内容-除了每8-10次尝试一次,当正确提取压缩数据时。

    问题-我在条件安排方面做错了什么,因此需要 sleep(1)调用才能使 gzip -extraction正常工作?在生产方案中,使用大得多的输入文件,每1kB强制等待一秒钟似乎是个坏主意。

    为了具有完整源代码的可重复性,以下是两个相关文件。这是标题:
    /*
    * convert.h
    */

    #ifndef CONVERT_H
    #define CONVERT_H

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <pthread.h>
    #include <getopt.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <sys/types.h>
    #include <sys/stat.h>
    #include <errno.h>

    #define CB_VERSION "1.0"
    #define LINE_LENGTH_VALUE 65536
    #define BUF_LENGTH_VALUE 1024
    #define POPEN3_READ 0
    #define POPEN3_WRITE 1

    typedef int boolean;
    extern const boolean kTrue;
    extern const boolean kFalse;
    const boolean kTrue = 1;
    const boolean kFalse = 0;

    typedef enum {
    kGzip,
    kUnknown
    } format_t;

    typedef struct pthread_data pthread_data_t;
    typedef struct popen3_desc popen3_desc_t;

    struct pthread_data {
    pthread_mutex_t in_lock;
    pthread_cond_t in_cond;
    pthread_cond_t out_cond;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes;
    size_t n_in_bytes_written_to_gunzip;
    size_t n_out_bytes_read_from_gunzip;
    boolean in_eof;
    FILE *in_file_ptr;
    popen3_desc_t *gunzip_ptr;
    char in_line[LINE_LENGTH_VALUE];
    };

    struct popen3_desc {
    int in;
    int out;
    int err;
    };

    static const char *name = "convert";
    static const char *version = CB_VERSION;
    static const char *authors = "Alex Reynolds";
    static const char *usage = "\n" \
    "Usage: convert --input-format=str <input-file>\n" \
    " Process Flags:\n\n" \
    " --input-format=str | -f str Input format (str = [ gzip ]; required)\n" \
    " --help | -h Show this usage message\n";

    static struct convert_globals_t {
    char *input_format_str;
    format_t input_format;
    char **filenames;
    int num_filenames;
    } convert_globals;

    static struct option convert_client_long_options[] = {
    { "input-format", required_argument, NULL, 'f' },
    { "help", no_argument, NULL, 'h' },
    { NULL, no_argument, NULL, 0 }
    };

    static const char *convert_client_opt_string = "f:h?";

    void * consume_gunzip_chunk (void *t_data);
    void * consume_gzip_chunk (void *t_data);
    void * produce_gzip_chunk (void *t_data);
    FILE * new_file_ptr (const char *in_fn);
    void delete_file_ptr (FILE **file_ptr);
    pid_t popen3 (const char *command,
    int *in_desc,
    int *out_desc,
    int *err_desc,
    boolean nonblock_in,
    boolean nonblock_outerr);
    off_t fsize (const char *fn);
    void initialize_globals ();
    void parse_command_line_options (int argc,
    char **argv);
    void print_usage (FILE *stream);

    #endif

    这是实现:
    /*
    * convert.c
    */

    #include "convert.h"

    int main(int argc, char **argv)
    {
    #ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> main()\n");
    #endif

    pthread_t produce_gzip_chunk_thread = NULL;
    pthread_t consume_gzip_chunk_thread = NULL;
    pthread_t consume_gunzip_chunk_thread = NULL;
    pthread_data_t *thread_data = NULL;

    parse_command_line_options(argc, argv);

    /* initialize thread data */
    thread_data = malloc(sizeof(pthread_data_t));
    thread_data->n_in_bytes = 0;
    thread_data->n_in_bytes_written_to_gunzip = 0;
    thread_data->n_out_bytes_read_from_gunzip = 0;
    thread_data->in_eof = kFalse;
    thread_data->in_file_ptr = new_file_ptr(convert_globals.filenames[0]);
    pthread_mutex_init(&(thread_data->in_lock), NULL);
    pthread_cond_init(&(thread_data->in_cond), NULL);
    pthread_cond_init(&(thread_data->out_cond), NULL);

    /* parse input */
    if (convert_globals.input_format == kGzip)
    {
    if (pthread_create(&produce_gzip_chunk_thread, NULL, produce_gzip_chunk, (void *) thread_data) != 0) {
    fprintf(stderr, "Error: Could not create gzip chunk production thread\n");
    return EXIT_FAILURE;
    }
    if (pthread_create(&consume_gzip_chunk_thread, NULL, consume_gzip_chunk, (void *) thread_data) != 0) {
    fprintf(stderr, "Error: Could not create gzip chunk consumption thread\n");
    return EXIT_FAILURE;
    }
    if (pthread_create(&consume_gunzip_chunk_thread, NULL, consume_gunzip_chunk, (void *) thread_data) != 0) {
    fprintf(stderr, "Error: Could not create gunzip chunk consumption thread\n");
    return EXIT_FAILURE;
    }
    if (pthread_join(produce_gzip_chunk_thread, NULL) != 0) {
    fprintf(stderr, "Error: Could not join gzip chunk production thread\n");
    return EXIT_FAILURE;
    }
    if (pthread_join(consume_gzip_chunk_thread, NULL) != 0) {
    fprintf(stderr, "Error: Could not join gzip chunk consumption thread\n");
    return EXIT_FAILURE;
    }
    if (pthread_join(consume_gunzip_chunk_thread, NULL) != 0) {
    fprintf(stderr, "Error: Could not join gunzip chunk consumption thread\n");
    return EXIT_FAILURE;
    }
    }
    else
    {
    /*
    handle text formats
    */
    }

    /* cleanup */
    delete_file_ptr(&thread_data->in_file_ptr);
    pthread_mutex_destroy(&(thread_data->in_lock));
    pthread_cond_destroy(&(thread_data->in_cond));
    pthread_cond_destroy(&(thread_data->out_cond));
    free(thread_data);

    #ifdef DEBUG
    fprintf(stderr, "Debug: Leaving --> main()\n");
    #endif
    return EXIT_SUCCESS;
    }

    void * consume_gunzip_chunk(void *t_data)
    {
    #ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gunzip_chunk()\n");
    #endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_out_bytes_read_from_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
    while (d->n_in_bytes_written_to_gunzip == 0) {
    pthread_cond_wait(&d->out_cond, &d->in_lock);
    }
    if (d->n_in_bytes_written_to_gunzip) {
    sleep(1);
    n_out_bytes_read_from_gunzip = read(d->gunzip_ptr->out, d->in_line, LINE_LENGTH_VALUE);
    #ifdef DEBUG
    fprintf(stderr, "Debug: ------------------------ read [%07ld] bytes out from the gunzip process\n", n_out_bytes_read_from_gunzip);
    fprintf(stderr, "Debug: ------------------------ gunzip output chunk:\n[%s]\n", d->in_line);
    #endif
    memset(d->in_line, 0, strlen(d->in_line));
    if (n_out_bytes_read_from_gunzip > 0)
    d->n_out_bytes_read_from_gunzip = n_out_bytes_read_from_gunzip;
    d->n_in_bytes_written_to_gunzip = 0;
    pthread_cond_signal(&d->in_cond);
    }
    if (d->in_eof && (d->n_in_bytes_written_to_gunzip == 0))
    break;
    }
    pthread_mutex_unlock(&d->in_lock);

    #ifdef DEBUG
    fprintf(stderr, "Debug: Leaving --> consume_gunzip_chunk()\n");
    #endif
    return NULL;
    }

    void * consume_gzip_chunk(void *t_data)
    {
    #ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gzip_chunk()\n");
    #endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_in_bytes_written_to_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
    while (d->n_in_bytes == 0 && !d->in_eof)
    pthread_cond_wait(&d->in_cond, &d->in_lock);
    if (d->n_in_bytes) {
    #ifdef DEBUG
    fprintf(stderr, "Debug: ........ [%07zu] processing chunk\n", d->n_in_bytes);
    #endif
    if (!d->gunzip_ptr) {
    #ifdef DEBUG
    fprintf(stderr, "Debug: * setting up gunzip ptr\n");
    #endif
    d->gunzip_ptr = malloc(sizeof(popen3_desc_t));
    if (!d->gunzip_ptr) {
    fprintf(stderr, "Error: Could not create gunzip file handle struct\n");
    exit(EXIT_FAILURE);
    }

    popen3("gunzip -c -",
    &(d->gunzip_ptr->in),
    &(d->gunzip_ptr->out),
    &(d->gunzip_ptr->err),
    kTrue,
    kTrue);
    memset(d->in_line, 0, LINE_LENGTH_VALUE);
    }
    n_in_bytes_written_to_gunzip = (long) write(d->gunzip_ptr->in, d->in_buf, d->n_in_bytes);
    #ifdef DEBUG
    fprintf(stderr, "Debug: ................ wrote [%07ld] bytes into the gunzip process\n", n_in_bytes_written_to_gunzip);
    #endif
    if (n_in_bytes_written_to_gunzip > 0)
    d->n_in_bytes_written_to_gunzip = n_in_bytes_written_to_gunzip;

    d->n_in_bytes = 0;
    /* pthread_cond_signal(&d->in_cond); */
    pthread_cond_signal(&d->out_cond);
    }
    if (d->in_eof)
    break;
    }
    pthread_mutex_unlock(&d->in_lock);

    #ifdef DEBUG
    fprintf(stderr, "Debug: Leaving --> consume_gzip_chunk()\n");
    #endif
    return NULL;
    }

    void * produce_gzip_chunk(void *t_data)
    {
    #ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> produce_gzip_chunk()\n");
    #endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes = 0;

    d->in_eof = kFalse;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
    n_in_bytes = fread(in_buf, sizeof(in_buf[0]), sizeof(in_buf), d->in_file_ptr);
    if (n_in_bytes > 0) {
    while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0)
    pthread_cond_wait(&d->in_cond, &d->in_lock);
    memcpy(d->in_buf, in_buf, n_in_bytes);
    d->n_in_bytes = n_in_bytes;
    #ifdef DEBUG
    fprintf(stderr, "Debug: ######## [%07zu] produced chunk\n", d->n_in_bytes);
    #endif
    pthread_cond_signal(&d->in_cond);
    }
    else if (feof(d->in_file_ptr) || ferror(d->in_file_ptr))
    break;
    }
    d->in_eof = kTrue;
    pthread_mutex_unlock(&d->in_lock);
    pthread_cond_signal(&d->in_cond);

    #ifdef DEBUG
    fprintf(stderr, "Debug: Leaving --> produce_gzip_chunk()\n");
    #endif
    return NULL;
    }

    FILE * new_file_ptr(const char *in_fn)
    {
    #ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> new_file_ptr()\n");
    #endif

    FILE *file_ptr = NULL;
    boolean not_stdin = kTrue;

    not_stdin = strcmp(in_fn, "-");
    file_ptr = (not_stdin) ? fopen(in_fn, "r") : stdin;

    if (!file_ptr) {
    fprintf(stderr, "Error: Could not open input stream\n");
    exit(EXIT_FAILURE);
    }

    #ifdef DEBUG
    fprintf(stderr, "Debug: Leaving --> new_file_ptr()\n");
    #endif
    return file_ptr;
    }

    void delete_file_ptr(FILE **file_ptr)
    {
    #ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> delete_file_ptr()\n");
    #endif

    fclose(*file_ptr);
    *file_ptr = NULL;

    #ifdef DEBUG
    fprintf(stderr, "Debug: Leaving --> delete_file_ptr()\n");
    #endif
    }

    pid_t popen3(const char *command, int *in_desc, int *out_desc, int *err_desc, boolean nonblock_in, boolean nonblock_outerr)
    {
    #ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> popen3()\n");
    #endif

    int p_stdin[2], p_stdout[2], p_stderr[2];
    pid_t pid;

    if (pipe(p_stdin) != 0 || pipe(p_stdout) != 0 || pipe(p_stderr) != 0)
    return -1;

    if (nonblock_in) {
    fcntl(p_stdin[POPEN3_WRITE], F_SETFL, fcntl(p_stdin[POPEN3_WRITE], F_GETFL) | O_NONBLOCK);
    }

    if (nonblock_outerr) {
    fcntl(p_stdout[POPEN3_READ], F_SETFL, fcntl(p_stdout[POPEN3_READ], F_GETFL) | O_NONBLOCK);
    fcntl(p_stderr[POPEN3_READ], F_SETFL, fcntl(p_stderr[POPEN3_READ], F_GETFL) | O_NONBLOCK);
    }

    pid = fork();
    if (pid < 0)
    return pid; /* error */

    if (pid == 0) {
    close(p_stdin[POPEN3_WRITE]);
    close(p_stdout[POPEN3_READ]);
    close(p_stderr[POPEN3_READ]);
    dup2(p_stdin[POPEN3_READ], fileno(stdin));
    dup2(p_stdout[POPEN3_WRITE], fileno(stderr));
    dup2(p_stdout[POPEN3_WRITE], fileno(stdout));
    execl("/bin/sh", "sh", "-c", command, NULL);
    fprintf(stderr, "Error: Could not execl [%s]\n", command);
    exit(EXIT_FAILURE);
    }

    if (in_desc == NULL)
    close(p_stdin[POPEN3_WRITE]);
    else
    *in_desc = p_stdin[POPEN3_WRITE];

    if (out_desc == NULL)
    close(p_stdout[POPEN3_READ]);
    else
    *out_desc = p_stdout[POPEN3_READ];

    if (err_desc == NULL)
    close(p_stderr[POPEN3_READ]);
    else
    *err_desc = p_stderr[POPEN3_READ];

    #ifdef DEBUG
    fprintf(stderr, "Debug: New *in_desc = %d\n", *in_desc);
    fprintf(stderr, "Debug: New *out_desc = %d\n", *out_desc);
    fprintf(stderr, "Debug: New *err_desc = %d\n", *err_desc);
    #endif

    #ifdef DEBUG
    fprintf(stderr, "Debug: Leaving --> popen3()\n");
    #endif
    return pid;
    }

    off_t fsize(const char *fn)
    {
    #ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> fsize()\n");
    #endif

    struct stat st;

    if (stat(fn, &st) == 0)
    return st.st_size;

    #ifdef DEBUG
    fprintf(stderr, "Debug: Leaving --> fsize()\n");
    #endif
    return EXIT_FAILURE;
    }

    void initialize_globals()
    {
    #ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> initialize_globals()\n");
    #endif

    convert_globals.input_format = kUnknown;
    convert_globals.filenames = NULL;
    convert_globals.num_filenames = 0;

    #ifdef DEBUG
    fprintf(stderr, "Debug: Leaving --> initialize_globals()\n");
    #endif
    }

    void parse_command_line_options(int argc, char **argv)
    {
    #ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> parse_command_line_options()\n");
    #endif

    int client_long_index;
    int client_opt = getopt_long(argc,
    argv,
    convert_client_opt_string,
    convert_client_long_options,
    &client_long_index);
    char *in_format_str = NULL;

    opterr = 0; /* disable error reporting by GNU getopt */
    initialize_globals();

    while (client_opt != -1)
    {
    switch (client_opt)
    {
    case 'f':
    in_format_str = optarg;
    break;
    case 'h':
    print_usage(stdout);
    exit(EXIT_SUCCESS);
    case '?':
    print_usage(stdout);
    exit(EXIT_SUCCESS);
    default:
    break;
    }
    client_opt = getopt_long(argc,
    argv,
    convert_client_opt_string,
    convert_client_long_options,
    &client_long_index);
    }

    convert_globals.filenames = argv + optind;
    convert_globals.num_filenames = argc - optind;

    if (!in_format_str) {
    fprintf(stderr, "Error: Specified input format was omitted; please specify one of required input formats\n");
    print_usage(stderr);
    exit(EXIT_FAILURE);
    }
    else if (convert_globals.num_filenames != 1) {
    fprintf(stderr, "Error: Please specify an input file (either a regular file or '-' for stdin\n");
    print_usage(stderr);
    exit(EXIT_FAILURE);
    }

    /* map format string to setting */
    if (strcmp(in_format_str, "gzip") == 0)
    convert_globals.input_format = kGzip;
    else {
    fprintf(stderr, "Error: Specified input format is unknown; please specify one of required input formats\n");
    print_usage(stderr);
    exit(EXIT_FAILURE);
    }

    #ifdef DEBUG
    fprintf(stderr, "Debug: Leaving --> parse_command_line_options()\n");
    #endif
    }

    void print_usage(FILE *stream)
    {
    #ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> print_usage()\n");
    #endif

    fprintf(stream,
    "%s\n" \
    " version: %s\n" \
    " author: %s\n" \
    "%s\n",
    name,
    version,
    authors,
    usage);

    #ifdef DEBUG
    fprintf(stderr, "Debug: Leaving --> print_usage()\n");
    #endif
    }

    这是构建过程:
    $ mkdir -p objects
    $ cc -Wall -Wextra -pedantic -std=c99 -D__STDC_CONSTANT_MACROS -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE=1 -DDEBUG=1 -g -O0 -fno-inline -c convert.c -o objects/convert.o -iquote${PWD}
    $ cc -Wall -Wextra -pedantic -std=c99 -D__STDC_CONSTANT_MACROS -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE=1 -DDEBUG=1 -g -O0 -fno-inline objects/convert.o -o convert -lpthread

    我已经能够在具有相当现代的编译环境的OS X和Linux主机上构建此测试代码。

    在此先感谢您提供任何有用的建议!

    最佳答案

    我首先要说的是,我觉得在这里并不需要pthread条件和互斥锁,对您描述的问题,非阻塞I/O也不是最好的 react 。

    我认为,您使用无条件和互斥锁的版本所描述的问题是忘记刻苦地在管道的末端close()的症状,结果是该管道的写入端文件描述符的副本为子进程的输出提供了帮助。 stdin活着泄漏(漏入那个 child 或其他人)。

    然后,假设仍然存在与stdin的读取端相对应的写入端,则系统不会给出EOF,而是会无限期地阻塞。

    在您的情况下,您确实防止了管道末端文件描述符泄露给生成的子对象(尽管您在close()fork()的子对象侧调用了正确的popen3(),但您忘记了close()却错误地将管道末端的管道末端家长一方)。但是,您并没有阻止这种泄漏给所有其他 child !如果您两次调用popen3(),则可以防止将这三个描述符的集合泄漏到子级中,但是由于父级仍然拥有它们,因此当下一次调用popen3()时,在fork()之后,现在有 6个文件描述符可以关闭(您刚刚创建的旧的三个和一组新的三个)。

    因此,在您的情况下,应在这些管道末端上设置close-on-exec标志,因此:

    fcntl(fdIn [PIPEWR], F_SETFD, fcntl(fdIn [PIPEWR], F_GETFD) | FD_CLOEXEC);
    fcntl(fdOut[PIPERD], F_SETFD, fcntl(fdOut[PIPERD], F_GETFD) | FD_CLOEXEC);
    fcntl(fdErr[PIPERD], F_SETFD, fcntl(fdErr[PIPERD], F_GETFD) | FD_CLOEXEC);

    这是在内部压缩然后解压缩后产生6个线程和3个进程,并将其输入未经修改地传递到输出的代码。它有效地实现了 gzip -c - | XOR 0x55 | XOR 0x55 | gunzip -c - | cat,其中:
  • 通过线程gzip将标准输入馈送到srcThrd
  • gzip的输出由a2xor0Thrd线程读取,并馈送到xor0Thrd线程。
  • 线程xor0Thrd将其输入与0x55进行异或,然后再将其传递到线程xor1Thrd上。
  • 线程xor1Thrd将其输入与0x55进行异或,然后再将其传递到线程xor22BThrd上。
  • 线程xor22BThrd馈送其输入以处理gunzip
  • 进程gunzip将其输出直接输入(无需通过线程)到cat
  • 进程cat的输出由dstThrd线程读取,并打印到标准输出。

  • 压缩是通过进程间管道通信完成的,而异或是通过进程内管道通信完成的。不使用互斥或​​条件变量。 main()非常容易理解。此代码应易于扩展到您的情况。
    /* Includes */
    #include <stdlib.h>
    #include <pthread.h>
    #include <unistd.h>
    #include <stdio.h>
    #include <fcntl.h>



    /* Defines */
    #define PIPERD 0
    #define PIPEWR 1




    /* Data structures */
    typedef struct PIPESET{
    int Ain[2];
    int Aout[2];
    int Aerr[2];
    int xor0[2];
    int xor1[2];
    int xor2[2];
    int Bin[2];
    int BoutCin[2];
    int Berr[2];
    int Cout[2];
    int Cerr[2];
    } PIPESET;




    /* Function Implementations */

    /**
    * Source thread main method.
    *
    * Slurps from standard input and feeds process A.
    */

    void* srcThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char c;
    while(read(0, &c, 1) > 0){
    write(pipeset->Ain[PIPEWR], &c, 1);
    }

    close(pipeset->Ain[PIPEWR]);

    pthread_exit(NULL);
    }

    /**
    * A to XOR0 thread main method.
    *
    * Manually pipes from standard output of process A to input of thread XOR0.
    */

    void* a2xor0ThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char buf[65536];
    ssize_t bytesRead;

    while((bytesRead = read(pipeset->Aout[PIPERD], buf, 65536)) > 0){
    write(pipeset->xor0[PIPEWR], buf, bytesRead);
    }

    close(pipeset->xor0[PIPEWR]);

    pthread_exit(NULL);
    }

    /**
    * XOR0 thread main method.
    *
    * XORs input with 0x55 and outputs to input of XOR1.
    */

    void* xor0ThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char c;
    while(read(pipeset->xor0[PIPERD], &c, 1) > 0){
    c ^= 0x55;
    write(pipeset->xor1[PIPEWR], &c, 1);
    }

    close(pipeset->xor1[PIPEWR]);

    pthread_exit(NULL);
    }

    /**
    * XOR1 thread main method.
    *
    * XORs input with 0x55 and outputs to input of process B.
    */

    void* xor1ThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char c;
    while(read(pipeset->xor1[PIPERD], &c, 1) > 0){
    c ^= 0x55;
    write(pipeset->xor2[PIPEWR], &c, 1);
    }

    close(pipeset->xor2[PIPEWR]);

    pthread_exit(NULL);
    }

    /**
    * XOR2 to B thread main method.
    *
    * Manually pipes from input (output of XOR1) to input of process B.
    */

    void* xor22BThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char buf[65536];
    ssize_t bytesRead;

    while((bytesRead = read(pipeset->xor2[PIPERD], buf, 65536)) > 0){
    write(pipeset->Bin[PIPEWR], buf, bytesRead);
    }

    close(pipeset->Bin[PIPEWR]);

    pthread_exit(NULL);
    }

    /**
    * Destination thread main method.
    *
    * Manually copies the standard output of process C to the standard output.
    */

    void* dstThrdMain(void* arg){
    PIPESET* pipeset = (PIPESET*)arg;

    char c;
    while(read(pipeset->Cout[PIPERD], &c, 1) > 0){
    write(1, &c, 1);
    }

    pthread_exit(NULL);
    }

    /**
    * Set close on exec flag on given descriptor.
    */

    void setCloExecFlag(int fd){
    fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
    }

    /**
    * Set close on exec flag on given descriptor.
    */

    void unsetCloExecFlag(int fd){
    fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) & ~FD_CLOEXEC);
    }

    /**
    * Pipe4.
    *
    * Create a pipe with some ends possibly marked close-on-exec.
    */

    #define PIPE4_FLAG_NONE (0U)
    #define PIPE4_FLAG_RD_CLOEXEC (1U << 0)
    #define PIPE4_FLAG_WR_CLOEXEC (1U << 1)

    int pipe4(int fd[2], int flags){
    int ret = pipe(fd);

    if(flags&PIPE4_FLAG_RD_CLOEXEC){setCloExecFlag(fd[PIPERD]);}
    if(flags&PIPE4_FLAG_WR_CLOEXEC){setCloExecFlag(fd[PIPEWR]);}

    return ret;
    }

    /**
    * Pipe4 explicit derivatives.
    */

    #define pipe4_cloexec(fd) pipe4((fd), PIPE4_FLAG_RD_CLOEXEC|PIPE4_FLAG_WR_CLOEXEC)

    /**
    * Popen4.
    *
    * General-case for spawning a process and tethering it with cloexec pipes on stdin,
    * stdout and stderr.
    *
    * @param [in] cmd The command to execute.
    * @param [in/out] pin The pointer to the cloexec pipe for stdin.
    * @param [in/out] pout The pointer to the cloexec pipe for stdout.
    * @param [in/out] perr The pointer to the cloexec pipe for stderr.
    * @param [in] flags A bitwise OR of flags to this function. Available
    * flags are:
    *
    * POPEN4_FLAG_NONE:
    * Explicitly specify no flags.
    * POPEN4_FLAG_NOCLOSE_PARENT_STDIN,
    * POPEN4_FLAG_NOCLOSE_PARENT_STDOUT,
    * POPEN4_FLAG_NOCLOSE_PARENT_STDERR:
    * Don't close pin[PIPERD], pout[PIPEWR] and perr[PIPEWR] in the parent,
    * respectively.
    * POPEN4_FLAG_CLOSE_CHILD_STDIN,
    * POPEN4_FLAG_CLOSE_CHILD_STDOUT,
    * POPEN4_FLAG_CLOSE_CHILD_STDERR:
    * Close the respective streams in the child. Ignores pin, pout and perr
    * entirely. Overrides a NOCLOSE_PARENT flag for the same stream.
    */

    #define POPEN4_FLAG_NONE (0U)
    #define POPEN4_FLAG_NOCLOSE_PARENT_STDIN (1U << 0)
    #define POPEN4_FLAG_NOCLOSE_PARENT_STDOUT (1U << 1)
    #define POPEN4_FLAG_NOCLOSE_PARENT_STDERR (1U << 2)
    #define POPEN4_FLAG_CLOSE_CHILD_STDIN (1U << 3)
    #define POPEN4_FLAG_CLOSE_CHILD_STDOUT (1U << 4)
    #define POPEN4_FLAG_CLOSE_CHILD_STDERR (1U << 5)

    pid_t popen4(const char* cmd, int pin[2], int pout[2], int perr[2], int flags){
    /********************
    ** FORK PROCESS **
    ********************/
    pid_t ret = fork();

    if(ret < 0){
    /**
    * Error in fork(), still in parent.
    */

    fprintf(stderr, "fork() failed!\n");
    return ret;
    }else if(ret == 0){
    /**
    * Child-side of fork
    */

    if(flags & POPEN4_FLAG_CLOSE_CHILD_STDIN){
    close(0);
    }else{
    unsetCloExecFlag(pin [PIPERD]);
    dup2(pin [PIPERD], 0);
    }
    if(flags & POPEN4_FLAG_CLOSE_CHILD_STDOUT){
    close(1);
    }else{
    unsetCloExecFlag(pout[PIPEWR]);
    dup2(pout[PIPEWR], 1);
    }
    if(flags & POPEN4_FLAG_CLOSE_CHILD_STDERR){
    close(2);
    }else{
    unsetCloExecFlag(perr[PIPEWR]);
    dup2(perr[PIPEWR], 2);
    }

    execl("/bin/sh", "sh", "-c", cmd, NULL);

    fprintf(stderr, "exec() failed!\n");
    exit(-1);
    }else{
    /**
    * Parent-side of fork
    */

    if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDIN &&
    ~flags & POPEN4_FLAG_CLOSE_CHILD_STDIN){
    close(pin [PIPERD]);
    }
    if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDOUT &&
    ~flags & POPEN4_FLAG_CLOSE_CHILD_STDOUT){
    close(pout[PIPEWR]);
    }
    if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDERR &&
    ~flags & POPEN4_FLAG_CLOSE_CHILD_STDERR){
    close(perr[PIPEWR]);
    }

    return ret;
    }

    /* Unreachable */
    return ret;
    }

    /**
    * Main Function.
    *
    * Sets up the whole piping scheme.
    */

    int main(int argc, char* argv[]){
    pthread_t srcThrd, a2xor0Thrd, xor0Thrd, xor1Thrd, xor22BThrd, dstThrd;
    pid_t gzip, gunzip, cat;
    PIPESET pipeset;

    pipe4_cloexec(pipeset.Ain);
    pipe4_cloexec(pipeset.Aout);
    pipe4_cloexec(pipeset.Aerr);
    pipe4_cloexec(pipeset.Bin);
    pipe4_cloexec(pipeset.BoutCin);
    pipe4_cloexec(pipeset.Berr);
    pipe4_cloexec(pipeset.Cout);
    pipe4_cloexec(pipeset.Cerr);
    pipe4_cloexec(pipeset.xor0);
    pipe4_cloexec(pipeset.xor1);
    pipe4_cloexec(pipeset.xor2);

    /* Spawn processes */
    gzip = popen4("gzip -c -", pipeset.Ain, pipeset.Aout, pipeset.Aerr, POPEN4_FLAG_NONE);
    gunzip = popen4("gunzip -c -", pipeset.Bin, pipeset.BoutCin, pipeset.Berr, POPEN4_FLAG_NONE);
    cat = popen4("cat", pipeset.BoutCin, pipeset.Cout, pipeset.Cerr, POPEN4_FLAG_NONE);


    /* Spawn threads */
    pthread_create(&srcThrd, NULL, srcThrdMain, &pipeset);
    pthread_create(&a2xor0Thrd, NULL, a2xor0ThrdMain, &pipeset);
    pthread_create(&xor0Thrd, NULL, xor0ThrdMain, &pipeset);
    pthread_create(&xor1Thrd, NULL, xor1ThrdMain, &pipeset);
    pthread_create(&xor22BThrd, NULL, xor22BThrdMain, &pipeset);
    pthread_create(&dstThrd, NULL, dstThrdMain, &pipeset);
    pthread_join(srcThrd, (void**)NULL);
    pthread_join(a2xor0Thrd, (void**)NULL);
    pthread_join(xor0Thrd, (void**)NULL);
    pthread_join(xor1Thrd, (void**)NULL);
    pthread_join(xor22BThrd, (void**)NULL);
    pthread_join(dstThrd, (void**)NULL);
    return 0;
    }

    您自己的代码的注释

    您的代码有很多问题,其中大多数与线程无关。
  • 您不close()文件描述符d->gunzip_ptr->in。这意味着gunzip永远不会知道stdin上将不再有输入,因此它将永远不会退出。
  • 由于gunzip永远不会退出,因此它永远也不会对其标准输出进行close(),因此在另一端阻塞的read()将永远不会解除阻塞。相反,非阻塞读取将始终提供-1errno == EAGAIN
  • 您的popen3()close()的父级上不是p_stdin[POPEN3_READ] p_stdout[POPEN3_WRITE]p_stderr[POPEN3_WRITE]fork()。只有 child 应该具有这些描述符。未能关闭这些意味着当父级本身尝试读取子级的stdout和stderr时,由于与上述相同的原因,它将再也看不到EOF:因为它本身仍然拥有一个可以在其中写入的写端管道。 ,使新数据显示在读取端。
  • 您的代码隐式依赖gunzip每写入1024个字节至少写出一个字节。不能保证会出现这种情况,因为gunzip可能会在内部进行缓冲。
  • 这是因为您的代码读取然后将最多BUF_LENGTH_VALUE个字节的块复制到d->in_buf中。然后,您将通过fread()读取的字节数分配给d->n_in_bytes。在您的d->n_in_bytes调用中使用了相同的write()来写入gunzip的stdin。然后,您发出信号consume_gunzip_chunk()唤醒,然后发出pthread_cond_wait()表示下一个gzip压缩的块。但是这个gzip压缩的块可能永远不会出现,因为不能保证gunzip能够仅从输入的前1024个字节中解压缩有用的输出,甚至不能保证它将write()出来而不是对其进行缓冲,直到它被缓存为止。 ,例如4096字节(整页)的输出。因此,read()中的consume_gunzip_chunk()调用可能永远不会成功(如果read()被阻止,甚至会返回)。如果read()从不返回,则consume_gunzip_chunk()不会表示d->in_cond,因此所有三个线程都被卡住了。而且即使read()是非阻塞的,gzip的输出的最后一块也可能永远不会到来,因为gzip的输入从未关闭,因此它不会通过write()清空它们的缓冲区来冲洗缓冲区,因此read()在另一端如果没有close(),end将永远不会得到有用的数据,没有任何恳求会引起它。
  • (可能吗?)错误的原因:d->n_out_bytes_read_from_gunzip一旦变成非0,就再也不会成为0。这意味着极其莫名其妙
    while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0)
    pthread_cond_wait(&d->in_cond, &d->in_lock);

    一旦在produce_gzip_chunk()中输入d->n_out_bytes_read_from_gunzip != 0,它将永远保持原状。通过在设置sleep(1)consume_gunzip_chunk()中调用d->n_out_bytes_read_from_gunzip,您可以通过在consume_gunzip_chunk()通过将d->n_out_bytes_read_from_gunzip设置为非零值来锁定系统之前读取所有输入来解决此问题。
  • 有两个调用pthread_cond_wait(&d->in_cond, &d->in_lock);的线程,分别是produce_gzip_chunk()consume_gzip_chunk()。绝对不能保证当consume_gunzip_chunk()调用pthread_cond_signal(&d->in_cond);时,“正确的”线程(无论您使用的是设计中的线程)都将接收到该信号。为了确保所有人都能使用,请使用pthread_cond_broadcast(),然后将自己暴露给thundering herd problem。在我看来,在这种情况下需要使用pthread_cond_broadcast()再次是不良设计的征兆。
  • 相关,您在调用pthread_cond_signal(&d->in_cond)的线程(实际上是一个函数)中调用pthread_cond_wait(&d->in_cond, &d->in_lock)。这有什么目的?
  • 您将d->in_lock用于太多不同的目的,使自己暴露于死锁的可能性,或由于过度保护而导致的性能低下。特别是,您可以将其用作d->in_condd->out_cond的保护。这太过强大的保护作用– gunzipd->in_line的输出应该能够与gunzip的输入写入d->in_buf和从consume_gunzip_chunk()写入的同时发生。
  • if中,您有
    while (d->n_in_bytes_written_to_gunzip == 0) {
    pthread_cond_wait(&d->out_cond, &d->in_lock);
    }
    if (d->n_in_bytes_written_to_gunzip) {
    ...

    struct pthread_data永远不会失败!您可能会想到一种情况吗?
  • 考虑使整个close()易失(或至少使多个线程使用的那些整数元素)易失,因为编译器可能会决定优化实际上应该保留的负载和存储。

  • 表扬

    为了避免听起来太消极,我想说的是,您的问题通常不是由于错误地使用了pthreads API,而是由于错误的使用者生产者逻辑和缺少 pthread_cond_wait()。另外,您似乎了解 pipe()可能会唤醒 spuriously,因此您将其包装在检查不变式的循环中。

    将来:

    我会使用管道,即使在线程之间也是如此。这使您无需实现自己的消费者生产者方案;内核已经为您解决了该问题,并为您提供了 read()write()fork()原语,而您需要利用这些现成的解决方案。它还使代码更整洁,并且互斥锁和条件变量无效。一个人必须简单地勤奋地结束两端,并且一个人必须在存在 read()的情况下在管道周围格外小心。规则很简单:
  • 如果管道的写端存在,则打开的读取端上的EAGAIN不会给出EOF,但会阻塞或read()
  • 如果所有管道的写端都已关闭,则打开的读端上的write()将给出EOF。
  • 如果管道的读取端已全部关闭,则任何写入端的SIGPIPE将导致fork()
  • pthread_atfork()复制了整个过程,包括所有描述符(ojit_code中可能是疯狂的东西)!
  • 关于c - 如何通过pthread管理两个或更多的使用者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24427599/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com