如何用 fork 和 mkfifo 制作一个进程环?

How to make a process ring with fork and mkfifo?

目前我正在学习 C,我想用 forks 和 pipes 创建一个 n childs 进程环,其中 n 是在参数中输入的数字。每个 child 都可以在两个方向上与下一个 child 通信,例如 this:

p1 --->--- p2
   ---<---
||         ||
^v         ^v
||         ||
   --->---
p4 ---<--- p3

我尝试这样做,每个 child 发送给下一个 child 它的 pid 或消息,但是我没有得到我想要的东西。当我 运行 程序时,我有两个问题:

怎么了?

我不太适应循环中的多个 fifo。

#define _XOPEN_SOURCE 500
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <fcntl.h>


#define MIN_PROC 2      // Nombre minimum de processus
#define MAX_PROC 20     // Nombre maximum de processus

int main (int argc,char ** argv)
 {
int n;  // Nombre de processus dans l'anneau

/* Lecture du paramètre */
if (argc<2 || (n = (int)strtol(argv[1],NULL,0))<MIN_PROC || n>MAX_PROC)
 {
    fprintf (stderr,"Usage : %s <nombre>\n"
                    "Avec <nombre> compris entre %d et %d.\n",
                    argv[0],MIN_PROC,MAX_PROC);
    exit(1);
 }
else
 {
    char tubes[2*n];
    pid_t   pid;
            // Réception du PID lors de fork()
    int     p[n][4];    // Place pour n paires de descripteurs (VLA)
    int     i,pos =0,k;          // Indice de boucle

    /* Indication du nombre de processus à engendrer */
    for (int i = 0; i < n*2; i++)
    {
        pos += sprintf(&tubes[pos],",tube%d",i);
    }
    printf("String is now:%s\n",tubes);
    printf ("Nombre de processus à engendrer : %d\n",n);

    /* Création des n tubes anonymes.             */
    /* On considère que l'appel réussit toujours. */
    for (k=0;k<n;k++){
        for (int i = 0; i < 4; i++)
            {
                p[k][i] = mkfifo(tubes[i],0666);

            } 
        }

    /* Génération des n processus fils, qui vont communiquer */
    /* entre eux. Le processus père reste en superviseur.    */
    for (i=0;i<n;i++)
    {
        pid = fork();

        if (pid>0)
        {
            printf ("Création du processus fils #%d : PID %d\n",i,pid);
        }
        else if (!pid)
        {


            int     in[2];     /* Descripteurs d'entrée     */
            int     out[2];    /* Descripteurs de sortie    */
            int     data;   /* Donnée à émettre         */

            int     j;      /* Autre indice de boucle (puisque i reste */
                            /* de fait notre numéro de processus fils. */

            /* Temporisation pour que la boucle du processus père ait   */
            /* le temps de se terminer avant de commencer le traitement */
            sleep(1);

            for (int i = 0; i < n; i++)
            {   
                if (i%2 == 0)
                {
                    for (int i = 0; i < 2; ++i)
                    {
                       in[i]= open(tubes[i],O_RDONLY);
                       out[i]= open(tubes[i],O_WRONLY); 
                    }

                }
                else {
                    for (int i = 0; i < 2; ++i)
                    {
                       in[i]= open(tubes[i],O_WRONLY);
                       out[i]= open(tubes[i],O_RDONLY); 
                    }
                }

            }

            /* Récupération des descripteurs adéquats */
            in[0]  = p[ i       ][0];
            out[0] = p[(i+1) % n][1];
            in[1]  = p[ i      ][2];
            out[1] = p[(i+1) % n][3];

            /* Fermeture des descripteurs inutiles */
            for (j=0;j<n;++j)
            {
                if (p[j][0] != in[0])  close(p[j][0]);
                if (p[j][1] != out[0]) close(p[j][1]);
                if (p[j][2] != in[1])  close(p[j][2]);
                if (p[j][3] != out[2]) close(p[j][3]);
            }

            /* Récupération et émission de notre PID */
            data = (int)getpid();
            printf ("Processus #%d : émission de %d\n",i,data);
            write (out[0],&data,sizeof data);
            close (out[0]);

            /* Réception de la donnée de l'autre processus */
            data = (int)getpid();
            read (in[0],&data,sizeof data);
            printf ("Processus #%d : réception de %d\n",i,data);
            close (in[0]);

            data = (int)getpid();
            printf ("Processus #%d : émission de %d\n",i,data);
            write (out[1],&data,sizeof data);
            close (out[1]);

            /* Réception de la donnée de l'autre processus */
            data = (int)getpid();
            read (in[1],&data,sizeof data);
            printf ("Processus #%d : réception de %d\n",i,data);
            close (in[1]);

            /* En fin de traitement, un break pour quitter la boucle */
            /* démarrée par le processus père.                       */
            break;
        }
        else perror ("Erreur à l'appel de fork()");
    }

    /* Si PID est non nul à l'issue de la boucle, c'est qu'on est  */
    /* toujours dans le processus père. On en profite pour faire n */
    /* wait() successifs pour attendre tous nos fils.              */
    if (pid>0)
    for (i=0;i<2*n;i++) {
        wait(NULL);
        unlink(tubes[i]);
    }
 }

  return 0;
 }

这是一个有效的实现。如评论中所述,但在此答案中不再重复,需要小心才能建立连接。写的顺序也要和开的顺序相匹配:

  • P0 将向前写入,向前读取,向后读取,向后写入。
  • PN 将向后读取,向后写入,向前写入,向前读取。

请注意,子代码基本上是同步的——不会发生太多的重新排序。 nanosleep() 调用只是确保启动消息是有序的。只有报告输出可能会更乱。

我已经使用我的标准错误报告包 "stderr.h" and stderr.c 来处理来自代码的大部分消息。 (目前,如果您使用 stderr.c,您还需要来自同一目录的 kludge.hkludge.c。)该软件包具有使其方便的功能(使用 err_setlogopts() 包括 PID 和微秒计时但省略程序名称,err_settimeformat() 仅打印时间而不打印信息的日期部分)。

我还使用了很多函数——我不够聪明,无法在单个函数中编写代码。通常,对于一个简单的父子关系,我会有一对函数 be_childish()be_parental() 来封装子进程和父进程所做的工作。在这段代码中,我并不真的需要一个单独的父函数。

下面的代码大约有 200 行 - 包括空行和注释。

#include "stderr.h"
#include <assert.h>
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>

#define MIN_PROC 2
#define MAX_PROC 20

enum { MAX_TUBENAMELEN = 20 };
enum { MAX_MSGLEN = 256 };
enum Mode { M_WF, M_RF, M_WB, M_RB };

/*
**  You have to split the connections into 'forward' and 'backward'
**  around the loop.  P0 (zeroth child) needs to connect forward first,
**  then connect backward; all other children need to connect backward
**  first, then connect forward.  Although the order is arbitrary, when
**  a child connects backward, it should open the read FIFO before the
**  write FIFO; when it connects forward, it should open the write FIFO
**  before the read FIFO.
**
**  Child process Pn (for n = 0; n < N; n++) connects forward to tube
**  T[(2n)%(2N)] for writing, T[(2n+1)%(2N)] for reading; and connects
**  backward to T[(2n-2+2N)%(2N)] for reading, and to T[(2n-1+2N)%(2N)]
**  for writing.  The +2N terms ensure that the LHS of % is not
**  negative.  This sequencing should ensure no blockage
**
**  When it comes to reading and writing, the rules will be similar.
**  P0 will write forward, read forward, read backward, write backward.
**  PN will read backward, write backward, write forward, read forward.
*/

static inline int tube_index(int num, int max)
{
    int idx = (num + 2 * max) % (2 * max);
    return idx;
}

static int open_fifo(enum Mode mode, int max, int num, char tubes[][MAX_TUBENAMELEN])
{
    int fd;
    int idx = 0;
    switch (mode)
    {
    case M_WF: idx = tube_index(2 * num + 0, max); break;
    case M_RF: idx = tube_index(2 * num + 1, max); break;
    case M_RB: idx = tube_index(2 * num - 2, max); break;
    case M_WB: idx = tube_index(2 * num - 1, max); break;
    default: assert(0);
    }
    const char *fifoname = tubes[idx];
    int o_mode = O_RDONLY;
    if (mode == M_WF || mode == M_WB)
        o_mode = O_WRONLY;
    err_remark("Opening FIFO %s with mode %d\n", fifoname, o_mode);
    if ((fd = open(fifoname, o_mode)) < 0)
        err_syserr("Failed to open %s with mode %d: ", fifoname, o_mode);
    err_remark("Opened  FIFO %s with mode %d - fd %d\n", fifoname, o_mode, fd);
    return fd;
}

static inline void recv_info(int num, int fd)
{
    char buffer[MAX_MSGLEN];
    int nbytes;
    if ((nbytes = read(fd, buffer, sizeof(buffer))) <= 0)
        err_syserr("P%d failed to read anything on fd %d: ", num, fd);
    err_remark("P%d received %d bytes: [%.*s]\n", num, nbytes, nbytes, buffer);
}

static inline void send_info(int num, int fd, const char *dir)
{
    char buffer[MAX_MSGLEN];
    int buflen = snprintf(buffer, sizeof(buffer), "P%d (PID %d) sent this message %s",
                    num, (int)getpid(), dir);
    int nbytes;
    if ((nbytes = write(fd, buffer, buflen)) != buflen)
        err_syserr("Failed to write properly on fd %d (%d vs %d wanted): ", fd, nbytes, buflen);
    err_remark("P%d sent %d bytes: [%.*s]\n", num, nbytes, nbytes, buffer);
}

static void be_childish(int max, int num, char tubes[][MAX_TUBENAMELEN])
{
    int wf;     /* Descriptor for writing forwards */
    int wb;     /* Descriptor for writing backwards */
    int rf;     /* Descriptor for reading forwards */
    int rb;     /* Descriptor for reading backwards */

    if (num == 0)
    {
        /* Child zero connects forwards then backwards */
        wf = open_fifo(M_WF, max, num, tubes);
        rf = open_fifo(M_RF, max, num, tubes);
        rb = open_fifo(M_RB, max, num, tubes);
        wb = open_fifo(M_WB, max, num, tubes);
        send_info(num, wf, "forwards");
        recv_info(num, rf);
        recv_info(num, rb);
        send_info(num, wb, "backwards");
    }
    else
    {
        /* Other children connect backwards then forwards */
        rb = open_fifo(M_RB, max, num, tubes);
        wb = open_fifo(M_WB, max, num, tubes);
        wf = open_fifo(M_WF, max, num, tubes);
        rf = open_fifo(M_RF, max, num, tubes);
        recv_info(num, rb);
        send_info(num, wb, "backwards");
        send_info(num, wf, "forwards");
        recv_info(num, rf);
    }

    close(wf);
    close(wb);
    close(rf);
    close(rb);
}

int main(int argc, char **argv)
{
    int n;
    err_setarg0(argv[0]);
    err_setlogopts(ERR_NOARG0|ERR_PID|ERR_MICRO);
    err_settimeformat("%H:%M:%S");

    if (argc < 2 || (n = (int)strtol(argv[1], NULL, 0)) < MIN_PROC || n > MAX_PROC)
    {
        fprintf(stderr, "Usage : %s <nombre>\n"
                "Avec <nombre> compris entre %d et %d.\n",
                argv[0], MIN_PROC, MAX_PROC);
        exit(1);
    }

    char tubes[2 * n][MAX_TUBENAMELEN];
    pid_t pid;
    pid_t pids[n];

    for (int i = 0; i < n * 2; i++)
    {
        snprintf(tubes[i], sizeof(tubes[i]), "tube%d", i);
        printf("Fifo %d: [%s]\n", i, tubes[i]);
    }

    printf("Nombre de processus à engendrer : %d\n", n);

    for (int k = 0; k < 2*n; k++)
    {
        printf("Create fifo: %s\n", tubes[k]);
        if (mkfifo(tubes[k], 0666) != 0)
            err_syserr("Failed to create FIFO %s: ", tubes[k]);
    }
    fflush(0);

    for (int i = 0; i < n; i++)
    {
        pid = fork();
        if (pid > 0)
        {
            pids[i] = pid;
            err_remark("Création du processus fils #%d : PID %d\n", i, (int)pid);
        }
        else if (pid == 0)
        {
            usleep((i + 1) * 100000);   // Tenths of a second
            err_remark("Child process #%d (PID %d) at work\n", i, (int)getpid());
            be_childish(n, i, tubes);
            int status = (i + 1) * 16;
            err_remark("Child process #%d (PID %d) exiting with status 0x%.2X\n", i, (int)getpid(), status);
            exit(status);
        }
        else
        {
            err_sysrem("Failed to fork child %d: ", i);
            for (int j = 0; j < i; j++)
            {
                err_remark("Killing %d\n", pids[j]);
                kill(SIGTERM, pids[j]);
            }
            for (int j = 0; j < 2 * n; j++)
                unlink(tubes[j]);
            err_error("Terminating!\n");
        }
    }

    int corpse;
    int status;
    while ((corpse = wait(&status)) > 0)
        err_remark("Child %d died with status 0x%.4X\n", corpse, status);

    for (int j = 0; j < 2 * n; j++)
        unlink(tubes[j]);

    return 0;
}

示例输出:

Fifo 0: [tube0]
Fifo 1: [tube1]
Fifo 2: [tube2]
Fifo 3: [tube3]
Fifo 4: [tube4]
Fifo 5: [tube5]
Nombre de processus à engendrer : 3
Create fifo: tube0
Create fifo: tube1
Create fifo: tube2
Create fifo: tube3
Create fifo: tube4
Create fifo: tube5
16:19:57.312293 - pid=89807: Création du processus fils #0 : PID 89810
16:19:57.314294 - pid=89807: Création du processus fils #1 : PID 89811
16:19:57.314500 - pid=89807: Création du processus fils #2 : PID 89812
16:19:57.413772 - pid=89810: Child process #0 (PID 89810) at work
16:19:57.415148 - pid=89810: Opening FIFO tube0 with mode 1
16:19:57.515290 - pid=89811: Child process #1 (PID 89811) at work
16:19:57.515558 - pid=89811: Opening FIFO tube0 with mode 0
16:19:57.515771 - pid=89810: Opened  FIFO tube0 with mode 1 - fd 3
16:19:57.515788 - pid=89810: Opening FIFO tube1 with mode 0
16:19:57.515764 - pid=89811: Opened  FIFO tube0 with mode 0 - fd 3
16:19:57.515883 - pid=89811: Opening FIFO tube1 with mode 1
16:19:57.516011 - pid=89810: Opened  FIFO tube1 with mode 0 - fd 4
16:19:57.516020 - pid=89810: Opening FIFO tube4 with mode 0
16:19:57.516010 - pid=89811: Opened  FIFO tube1 with mode 1 - fd 4
16:19:57.516120 - pid=89811: Opening FIFO tube2 with mode 1
16:19:57.615230 - pid=89812: Child process #2 (PID 89812) at work
16:19:57.615451 - pid=89812: Opening FIFO tube2 with mode 0
16:19:57.615582 - pid=89812: Opened  FIFO tube2 with mode 0 - fd 3
16:19:57.615593 - pid=89811: Opened  FIFO tube2 with mode 1 - fd 5
16:19:57.615678 - pid=89812: Opening FIFO tube3 with mode 1
16:19:57.615747 - pid=89811: Opening FIFO tube3 with mode 0
16:19:57.615852 - pid=89811: Opened  FIFO tube3 with mode 0 - fd 6
16:19:57.615881 - pid=89812: Opened  FIFO tube3 with mode 1 - fd 4
16:19:57.615986 - pid=89812: Opening FIFO tube4 with mode 1
16:19:57.616078 - pid=89810: Opened  FIFO tube4 with mode 0 - fd 5
16:19:57.616090 - pid=89810: Opening FIFO tube5 with mode 1
16:19:57.616071 - pid=89812: Opened  FIFO tube4 with mode 1 - fd 5
16:19:57.616153 - pid=89812: Opening FIFO tube5 with mode 0
16:19:57.616240 - pid=89810: Opened  FIFO tube5 with mode 1 - fd 6
16:19:57.616277 - pid=89810: P0 sent 41 bytes: [P0 (PID 89810) sent this message forwards]
16:19:57.616236 - pid=89812: Opened  FIFO tube5 with mode 0 - fd 6
16:19:57.616312 - pid=89811: P1 received 41 bytes: [P0 (PID 89810) sent this message forwards]
16:19:57.616444 - pid=89810: P0 received 42 bytes: [P1 (PID 89811) sent this message backwards]
16:19:57.616437 - pid=89811: P1 sent 42 bytes: [P1 (PID 89811) sent this message backwards]
16:19:57.616530 - pid=89811: P1 sent 41 bytes: [P1 (PID 89811) sent this message forwards]
16:19:57.616535 - pid=89812: P2 received 41 bytes: [P1 (PID 89811) sent this message forwards]
16:19:57.616660 - pid=89812: P2 sent 42 bytes: [P2 (PID 89812) sent this message backwards]
16:19:57.616665 - pid=89811: P1 received 42 bytes: [P2 (PID 89812) sent this message backwards]
16:19:57.616772 - pid=89812: P2 sent 41 bytes: [P2 (PID 89812) sent this message forwards]
16:19:57.616881 - pid=89810: P0 received 41 bytes: [P2 (PID 89812) sent this message forwards]
16:19:57.616893 - pid=89810: P0 sent 42 bytes: [P0 (PID 89810) sent this message backwards]
16:19:57.616817 - pid=89811: Child process #1 (PID 89811) exiting with status 0x20
16:19:57.617243 - pid=89810: Child process #0 (PID 89810) exiting with status 0x10
16:19:57.617501 - pid=89812: P2 received 42 bytes: [P0 (PID 89810) sent this message backwards]
16:19:57.617726 - pid=89807: Child 89811 died with status 0x2000
16:19:57.618114 - pid=89812: Child process #2 (PID 89812) exiting with status 0x30
16:19:57.618313 - pid=89807: Child 89810 died with status 0x1000
16:19:57.618635 - pid=89807: Child 89812 died with status 0x3000

您可以在 GitHub 找到此代码。