父进程无法读取 C 中 4 个不同管道的所有消息

Parent process can't read all messages from 4 different pipes in C

最后在此处查看并提出问题后,我能够编写生成 4 个子进程并通过 4 个不同管道进行通信的代码。 4 个子进程中的每一个都写入一个管道 (fd_log),但是当父进程从管道读取时,它只读取来自进程 A 的第一条消息,来自进程 [=13] 的第一条消息=],来自进程 B 的所有消息和来自进程 D 的 none。这是代码:

int main(int argc, char *argv[]) {

    printf("\nWritten by Nawar Youssef\n");
    int i, x=0, fd_log[2], fd_A_B[2], fd_B_C[2], fd_B_D[2], pipe_size=500;
    char ch, msg_from_A[pipe_size], msg_to_B[pipe_size], msg_to_log[pipe_size],
        msg_to_C[pipe_size], msg_to_D[pipe_size], msg_B_C[pipe_size], msg_B_D[pipe_size],
        msg_from_log[pipe_size], temp_char[pipe_size], msg_C_log[pipe_size], msg_D_log[pipe_size];

    pipe(fd_log);
    pipe(fd_A_B);
    pipe(fd_B_C);
    pipe(fd_B_D);

    if (fork()==0) { //child A
        for (i=0; i < 10; i++) {
            x = (rand() % 2);
            if (x == 1)
                ch='C';
            else
                ch='D';

            //write records to A-B pipe
            close(fd_A_B[READ]);
            sprintf(msg_to_B, "%c %d", ch, i);
            write(fd_A_B[WRITE], msg_to_B, strlen(msg_to_B)+1);

            //write records to log pipe
            close(fd_log[READ]);
            sprintf(msg_to_log, "A sent to B: %c %d", ch, i);
            write(fd_log[WRITE], msg_to_log, strlen(msg_to_log)+1);
        }//for
        close(fd_A_B[WRITE]);
        close(fd_log[WRITE]);
        _exit(1);
    }//if

    if (fork()==0) { //child B
        //read A-B pipe
        close(fd_A_B[WRITE]);
        int n_bytes = read(fd_A_B[READ], msg_from_A, sizeof(msg_from_A));
        for(i=0; i < pipe_size; i++) {

            if ( msg_from_A[i] == 'C') {

                //write the message from B to C pipe
                sprintf(msg_to_C, "%c %c", msg_from_A[i], msg_from_A[i+2]);
                //printf("--%s\n", msg_to_C);
                close(fd_B_C[READ]);
                write(fd_B_C[WRITE], msg_to_C, strlen(msg_to_C)+1);

                //write C message to log pipe
                close(fd_log[READ]);
                sprintf(msg_to_log, "B sent to C: %s", msg_to_C);
                write(fd_log[WRITE], msg_to_log, strlen(msg_to_log)+1);
                sleep(1);

            }else if (msg_from_A[i] == 'D') {

                //write the message from B to D pipes
                sprintf(msg_to_D, "%c %c", msg_from_A[i], msg_from_A[i+2]);
                //printf("--%s\n", msg_to_D);
                close(fd_B_D[READ]);
                write(fd_B_D[WRITE], msg_to_D, strlen(msg_to_D)+1);

                //write D message to log pipe
                close(fd_log[READ]);
                sprintf(msg_to_log, "B sent to D: %s", msg_to_D);
                write(fd_log[WRITE], msg_to_log, strlen(msg_to_log)+1);
                sleep(5);

            }else
                continue;
        }//for
        close(fd_B_C[WRITE]); close(fd_B_D[WRITE]); close(fd_log[WRITE]);
        _exit(1); //process B
    }//if

    if (fork()==0) { //child C
        //read from B-C pipe
        close(fd_B_C[WRITE]);
        int n_bytes = read(fd_B_C[READ], msg_B_C, sizeof(msg_B_C));
        for (i=0; i < pipe_size; i++) {

            //write to log pipe
            if (msg_B_C[i] == 'C') {
                sprintf(msg_C_log, "%c %c", msg_B_C[i], msg_B_C[i+2]);
                close(fd_log[READ]);
                sprintf(msg_to_log, "C sent to log: %s", msg_C_log);
                write(fd_log[WRITE], msg_to_log, strlen(msg_to_log)+1);
            }else
                continue;
        }
        _exit(1); //process C
    }

    if (fork()==0) { //child D
        //read from fd_B_D
        close(fd_B_D[WRITE]);
        int n_bytes = read(fd_B_D[READ], msg_B_D, sizeof(msg_B_D));
        for (i=0; i < pipe_size; i++) {

            //write to log pipe
            if (msg_B_D[i] == 'D') {
                sprintf(msg_D_log, "%c %c", msg_B_D[i], msg_B_C[i+2]);
                close(fd_log[READ]);
                sprintf(msg_to_log, "D sent to log: %s", msg_D_log);
                write(fd_log[WRITE], msg_to_log, strlen(msg_to_log)+1);
            }
        }
        _exit(1);
    }//if

    //parent
    close(fd_log[WRITE]);
    int n_bytes;
    while( (n_bytes = read(fd_log[READ], msg_from_log, sizeof(msg_from_log)-1)) > 0){
        printf("Log pipe reads (%d) bytes: %s\n", n_bytes, msg_from_log);
        sleep(1);
    }
    close(fd_log[READ]);
    return (0);
}

当我查看输出时,我知道所有消息都已读取(在输出中查看返回的字节数 read)。所以看起来问题出在显示消息上。我用更简单的代码遇到了类似的问题,我通过扩大 pipe_size 来解决它。但是这里不行。
这是 (#) 之间的输出数字是每次 while 循环读取的字节数:

Written by Nawar Youssef

Log pipe reads (187) bytes: A sent to B: C 0

Log pipe reads (36) bytes: C sent to log: C 0

Log pipe reads (17) bytes: B sent to C: C 2

Log pipe reads (35) bytes: B sent to D: D 3

Log pipe reads (17) bytes: B sent to D: D 4

Log pipe reads (17) bytes: B sent to D: D 5

Log pipe reads (17) bytes: B sent to D: D 6

Log pipe reads (17) bytes: B sent to D: D 7

Log pipe reads (17) bytes: B sent to C: C 8

Log pipe reads (17) bytes: B sent to C: C 9

代码有点混乱,因为您刚刚决定将所有内容都放在 main() 中。一些辅助功能会很有用,但我不是来评判的。

几个更重要的错误:

  • 没有错误处理。不是一个。 read(2)write(2)close(2)pipe(2)fork(2) 都可以 return 出错。你永远不会检查那个,你应该检查。如果你这样做了,你会发现在大多数情况下 close(2) 是 returning EBADF,因为你在循环中用相同的文件描述符一遍又一遍地调用它。例如,查看 child A 的代码。它关闭了循环内的两个管道。一旦循环到达第二次迭代,close(2) 开始 return 报错,因为您已经关闭了该描述符。
  • 您也永远不会关闭 parent 中的管道写入通道。这将使其他 child 进程保持活动状态并在 read(2) 中被阻塞,因为管道中仍有一个活动的写入器(parent)可以(但不会)写入。您必须关闭 parent 中的管道写入端。在分叉 children 之前立即在 parent 中创建管道总是一个好主意,并在分叉后立即关闭 parent 中未使用的通道。 child 进程也是如此。
  • 每个 child 都应该在循环中读取管道,类似于 parent 对日志所做的操作。否则,您将只阅读一次并丢弃同时写入的其他消息。您必须继续读取,直到管道的写入通道被另一端关闭(当 read(2) returns 0 时发生)。

  • 你在 children 中解析从管道读取的消息的循环条件应该是 i+2 < nbytes,而不是 i < pipe_size - 后者将退出当消息小于 pipe_size 字节时(这很有可能)。更新条件也应该是 i+= 3 因为每次迭代消耗 3 个字节。

  • else continue;在循环结束时完全没用。

  • 请在启用警告的情况下编译您的代码。您会看到您有一个从未使用过的变量 temp_char

  • Child D 应该从 msg_B_D 而不是 msg_B_C 读取数字。我认为这是复制/粘贴错误。

  • 您不能通过管道发送空终止符。您正在这样做,因此,当您将其打印到输出时,您不会看到发送到日志的每条消息 - 请记住 printf() 只打印一个字符串,直到它看到一个终止空字符。相反,我建议您将消息发送到带有换行符的日志中,以便 parent 打印它而无需担心。

这是解决了所有这些错误的代码。我注释掉了睡眠以快速获得输出:

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <time.h>

#define READ 0
#define WRITE 1

int main(int argc, char *argv[]) {
    printf("\nWritten by Nawar Youssef\n");
    int i, x=0, fd_log[2], fd_A_B[2], fd_B_C[2], fd_B_D[2], pipe_size=500;
    char ch, msg_from_A[pipe_size], msg_to_B[pipe_size], msg_to_log[pipe_size],
        msg_to_C[pipe_size], msg_to_D[pipe_size], msg_B_C[pipe_size], msg_B_D[pipe_size],
        msg_from_log[pipe_size], msg_C_log[pipe_size], msg_D_log[pipe_size];

    if (pipe(fd_log) < 0) {
        fprintf(stderr, "%d: pipe(): %s\n", __LINE__, strerror(errno));
        exit(EXIT_FAILURE);
    }

    if (pipe(fd_A_B) < 0) {
        fprintf(stderr, "%d: pipe(): %s\n", __LINE__, strerror(errno));
        exit(EXIT_FAILURE);
    }

    pid_t pid;

    if ((pid = fork()) < 0) {
        fprintf(stderr, "%d: fork(): %s\n", __LINE__, strerror(errno));
        exit(EXIT_FAILURE);
    }

    if (pid == 0) {
        //child A
        for (i = 0; i < 10; i++) {
            x = (rand() % 2);
            if (x == 1)
                ch = 'C';
            else
                ch = 'D';

            sprintf(msg_to_B, "%c %d", ch, i);
            size_t to_write = strlen(msg_to_B);
            if (write(fd_A_B[WRITE], msg_to_B, to_write) < 0) {
                fprintf(stderr, "%d: write(): %s\n", __LINE__, strerror(errno));
                exit(EXIT_FAILURE);
            }

            sprintf(msg_to_log, "A sent to B: %c %d\n", ch, i);
            to_write = strlen(msg_to_log);
            if (write(fd_log[WRITE], msg_to_log, to_write) < 0) {
                fprintf(stderr, "%d: write(): %s\n", __LINE__, strerror(errno));
                exit(EXIT_FAILURE);
            }
        }

        exit(EXIT_SUCCESS);
    }

    if (close(fd_A_B[WRITE]) < 0) {
        fprintf(stderr, "%d: close(): %s\n", __LINE__, strerror(errno));
        exit(EXIT_FAILURE);
    }

    if (pipe(fd_B_C) < 0) {
        fprintf(stderr, "%d: pipe(): %s\n", __LINE__, strerror(errno));
        exit(EXIT_FAILURE);
    }

    if (pipe(fd_B_D) < 0) {
        fprintf(stderr, "%d: pipe(): %s\n", __LINE__, strerror(errno));
        exit(EXIT_FAILURE);
    }

    if ((pid = fork()) < 0) {
        fprintf(stderr, "%d: fork(): %s\n", __LINE__, strerror(errno));
        exit(EXIT_FAILURE);
    }

    if (pid == 0) {
        // Child B

        int n_bytes;
        while ((n_bytes = read(fd_A_B[READ], msg_from_A, sizeof(msg_from_A)-1)) > 0) {
            size_t to_write;

            for (i = 0; i+2 < n_bytes; i += 3) {
                if (msg_from_A[i] == 'C') {
                    sprintf(msg_to_C, "%c %c", msg_from_A[i], msg_from_A[i+2]);
                    to_write = strlen(msg_to_C);
                    if (write(fd_B_C[WRITE], msg_to_C, to_write) != to_write) {
                        fprintf(stderr, "%d: write(): %s\n", __LINE__, strerror(errno));
                        exit(EXIT_FAILURE);
                    }

                    sprintf(msg_to_log, "B sent to C: %s\n", msg_to_C);
                    to_write = strlen(msg_to_log);
                    if (write(fd_log[WRITE], msg_to_log, to_write) < 0) {
                        fprintf(stderr, "%d: write(): %s\n", __LINE__, strerror(errno));
                        exit(EXIT_FAILURE);
                    }

                    //sleep(1);

                } else if (msg_from_A[i] == 'D') {
                    sprintf(msg_to_D, "%c %c", msg_from_A[i], msg_from_A[i+2]);
                    to_write = strlen(msg_to_D);
                    if (write(fd_B_D[WRITE], msg_to_D, to_write) != to_write) {
                        fprintf(stderr, "%d: write(): %s\n", __LINE__, strerror(errno));
                        exit(EXIT_FAILURE);
                    }

                    sprintf(msg_to_log, "B sent to D: %s\n", msg_to_D);
                    to_write = strlen(msg_to_log);
                    if (write(fd_log[WRITE], msg_to_log, to_write) < 0) {
                        fprintf(stderr, "%d: write(): %s\n", __LINE__, strerror(errno));
                        exit(EXIT_FAILURE);
                    }
                    //sleep(5);
                }
            }
        }

        if (n_bytes < 0) {
            fprintf(stderr, "%d: read(): %s\n", __LINE__, strerror(errno));
            exit(EXIT_FAILURE);
        }

        exit(EXIT_SUCCESS);
    }

    if ((pid = fork()) < 0) {
        fprintf(stderr, "%d: fork(): %s\n", __LINE__, strerror(errno));
        exit(EXIT_FAILURE);
    }

    if (close(fd_B_C[WRITE]) < 0) {
        fprintf(stderr, "%d: close(): %s\n", __LINE__, strerror(errno));
        exit(EXIT_FAILURE);
    }

    if (close(fd_B_D[WRITE]) < 0) {
        fprintf(stderr, "%d: close(): %s\n", __LINE__, strerror(errno));
        exit(EXIT_FAILURE);
    }

    if (pid == 0) {
        // Child C

        int n_bytes;
        while ((n_bytes = read(fd_B_C[READ], msg_B_C, sizeof(msg_B_C))) > 0) {
            for (i = 0; i+2 < n_bytes; i += 3) {
                if (msg_B_C[i] == 'C') {
                    sprintf(msg_C_log, "%c %c", msg_B_C[i], msg_B_C[i+2]);
                    sprintf(msg_to_log, "C sent to log: %s\n", msg_C_log);
                    size_t to_write = strlen(msg_to_log);
                    if (write(fd_log[WRITE], msg_to_log, to_write) != to_write) {
                        fprintf(stderr, "%d: write(): %s\n", __LINE__, strerror(errno));
                        exit(EXIT_FAILURE);
                    }
                }
            }
        }

        if (n_bytes < 0) {
            fprintf(stderr, "%d: read(): %s\n", __LINE__, strerror(errno));
            exit(EXIT_FAILURE);
        }

        exit(EXIT_SUCCESS);
    }

    if ((pid = fork()) < 0) {
        fprintf(stderr, "%d: fork(): %s\n", __LINE__, strerror(errno));
        exit(EXIT_FAILURE);
    }

    if (pid == 0) {
        // Child D

        int n_bytes;
        while ((n_bytes = read(fd_B_D[READ], msg_B_D, sizeof(msg_B_D))) > 0) {
            for (i = 0; i+2 < n_bytes; i += 3) {
                if (msg_B_D[i] == 'D') {
                    sprintf(msg_D_log, "%c %c", msg_B_D[i], msg_B_D[i+2]);
                    sprintf(msg_to_log, "D sent to log: %s\n", msg_D_log);
                    size_t to_write = strlen(msg_to_log);
                    if (write(fd_log[WRITE], msg_to_log, to_write) != to_write) {
                        fprintf(stderr, "%d: write(): %s\n", __LINE__, strerror(errno));
                        exit(EXIT_FAILURE);
                    }
                }
            }
        }

        if (n_bytes < 0) {
            fprintf(stderr, "%d: read(): %s\n", __LINE__, strerror(errno));
            exit(EXIT_FAILURE);
        }

        exit(EXIT_SUCCESS);
    }

    // Parent

    if (close(fd_log[WRITE]) < 0) {
        fprintf(stderr, "%d: close(): %s\n", __LINE__, strerror(errno));
        exit(EXIT_FAILURE);
    }

    int n_bytes;
    while ((n_bytes = read(fd_log[READ], msg_from_log, sizeof(msg_from_log)-1)) > 0) {
        msg_from_log[n_bytes] = '[=10=]';
        printf("Log pipe reads (%d) bytes:\n%s", n_bytes, msg_from_log);
        //sleep(1);
    }

    return 0;
}

还有很大的改进空间。您不需要那么多字符数组变量 - 请记住 parent 和 child 进程之间不共享内存。您可以只使用 1 或 2 个缓冲区来打印中间消息,然后再写入它们。但它似乎有效:

filipe@filipe-Kubuntu:~/dev$ ./a.out 

Written by Nawar Youssef
Log pipe reads (170) bytes:
A sent to B: C 0
A sent to B: D 1
A sent to B: C 2
A sent to B: C 3
A sent to B: C 4
A sent to B: C 5
A sent to B: D 6
A sent to B: D 7
A sent to B: C 8
A sent to B: C 9
Log pipe reads (170) bytes:
B sent to C: C 0
B sent to D: D 1
B sent to C: C 2
B sent to C: C 3
B sent to C: C 4
B sent to C: C 5
B sent to D: D 6
B sent to D: D 7
B sent to C: C 8
B sent to C: C 9
Log pipe reads (57) bytes:
D sent to log: D 1
D sent to log: D 6
D sent to log: D 7
Log pipe reads (133) bytes:
C sent to log: C 0
C sent to log: C 2
C sent to log: C 3
C sent to log: C 4
C sent to log: C 5
C sent to log: C 8
C sent to log: C 9
filipe@filipe-Kubuntu:~/dev$