使用 fork() 遍历目录

traversing directories using fork()

我尝试使用 fork() 进入文件夹并读取文件。我使用 file tree walk 函数递归地进入文件夹。基本思路是一个目录下会有children个文件和目录。 children 分别并发地读取每个文件。但是,如果有目录,children 将是 children 的 parents 来读取文件。

static int soner_each_time(const char *filepath, const struct stat *info,
                    int typeflag, struct FTW *ftwinfo)
{

    pid_t   pid = 0;
    char    buf[BUFSIZE];
    int     status;
    int     i = 0;

    /* The variables are related to functions of reading file  */

    int     totalLines;
    char    arr[TOTALNUMBEROFLINES][BUFSIZE];
    int     retval;


    const char *const filename = filepath + ftwinfo->base;

    if (( pid = fork()) < 0) {
        const int cause = errno;
        fprintf(stderr, "Fork error: %s\n", strerror(cause));
        errno = cause;
        return -1;
    }
    else if( pid > 0 ) // parent
    {
        if (typeflag == FTW_DP || typeflag == FTW_D)
        {
            sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
            write(1, buf, strlen(buf));
            pid = wait(&status);
            if (pid == -1)
                perror("Failed to wait for child");
            else if (WIFEXITED(status) && !WEXITSTATUS(status))
                printf("parent [%d] reaped child [%d]\n", getpid(), pid);
            else if (WIFEXITED(status))
                printf("Child %ld terminated with return status %d\n",
                       (long)pid, WEXITSTATUS(status));
            else if (WIFSIGNALED(status))
                printf("Child %ld terminated due to uncaught signal %d\n",
                       (long)pid, WTERMSIG(status));
            else if (WIFSTOPPED(status))
                printf("Child %ld stopped due to signal %d\n",
                       (long)pid, WSTOPSIG(status));
        }
    }


    if (pid == 0) // child
    {
        if (typeflag == FTW_F)
        {
            sprintf(buf, "||| Child [%d] of parent [%d]: %s |||\n", getpid(), getppid(), filename);
            write(1, buf, strlen(buf));

            /* Both of them are about reading function */
            totalLines = storeLinesInArray(filename, arr);

            retval = for_each_file(filename, totalLines, key, arr);

            sprintf(buf, "||| Child [%d] of parent [%d] is about to exit |||\n", getpid(), getppid());
            write(1, buf, strlen(buf));
        }

        else if (typeflag == FTW_DP || typeflag == FTW_D)
        {
            sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
            write(1, buf, strlen(buf));
        }

    }
        return 0;
}

FTW_DPFTW_D表示文件夹FTW_F表示文件。基本上,我每次都在代码 fork() 中尝试。如果是parent,则等待其children读取文件。因为该函数是递归的,所以它会分叉每次调用。但是,有一些关于它的东西我不能让它为一个文件分叉多个。例如 1a.txt 应该有一个 child 但对于这个方案它是 8。分叉主题真的很难。我做日常练习并尝试理解它。您的解释和帮助将提高我在该分支的技能。

@编辑:mcve 代码

#define _POSIX_C_SOURCE 200809L
#define _XOPEN_SOURCE 700
#include <unistd.h>
#include <dirent.h>
#include <stdlib.h>
#include <locale.h>
#include <string.h>
#include <ftw.h>
#include <stdio.h>

#define TOTALNUMBEROFLINES 1000
#define BUFSIZE 1000


void err_sys(const char *const str)
{
    perror(str);
    fflush(stdout);
    exit(1);
}

int storeLinesInArray(const char *file, char lines[][BUFSIZE])
{
    return 0;
}

static int for_each_file(const char *filepath, int totalLines, const char *key, const char arr[][BUFSIZE])
{

    fprintf(stdout, "File name is = %s\n", filepath);
    fflush(stdout);


    return 0;
}

static int soner_each_time(const char *filepath, const struct stat *info,
                           int typeflag, struct FTW *ftwinfo)
{

    pid_t   pid = 0;
    char    buf[BUFSIZE];
    int     status;

    /* The variables are related to functions of reading file  */

    int     totalLines;
    char    arr[TOTALNUMBEROFLINES][BUFSIZE];
    int     retval;


    const char *const filename = filepath + ftwinfo->base;

    if (( pid = fork()) < 0) {
        perror("failed fork");
        exit(-1);
    }
    else if( pid > 0 ) // parent
    {
        if (typeflag == FTW_DP || typeflag == FTW_D)
        {
            sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
            write(1, buf, strlen(buf));
            pid = wait(&status);
            if (pid == -1)
                perror("Failed to wait for child");
            else if (WIFEXITED(status) && !WEXITSTATUS(status))
                printf("parent [%d] reaped child [%d]\n", getpid(), pid);
            else if (WIFEXITED(status))
                printf("Child %ld terminated with return status %d\n",
                       (long)pid, WEXITSTATUS(status));
            else if (WIFSIGNALED(status))
                printf("Child %ld terminated due to uncaught signal %d\n",
                       (long)pid, WTERMSIG(status));
            else if (WIFSTOPPED(status))
                printf("Child %ld stopped due to signal %d\n",
                       (long)pid, WSTOPSIG(status));
        }
    }


    if (pid == 0) // child
    {
        if (typeflag == FTW_F)
        {
            sprintf(buf, "||| Child [%d] of parent [%d]: %s |||\n", getpid(), getppid(), filename);
            write(1, buf, strlen(buf));

            /* Both of them are about reading function */
            totalLines = storeLinesInArray(filename, arr);

            retval = for_each_file(filename, totalLines, "not needed now", arr);

            sprintf(buf, "||| Child [%d] of parent [%d] is about to exit |||\n", getpid(), getppid());
            write(1, buf, strlen(buf));
        }

        else if (typeflag == FTW_DP || typeflag == FTW_D)
        {
            sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
            write(1, buf, strlen(buf));
        }

    }
    return 0;
}



int main(int argc, char *argv[])
{

    if (nftw("here is directory path", soner_each_time, 15, FTW_CHDIR)) {
        fprintf(stderr, "Failed directory.\n");
        exit(-1);
    }

    return 0;
}

你有一些错误。更正后的代码如下。

child没有调用exit,所以它会继续它自己的nftw,所以很多文件都在冗余处理。我添加了 exit(0).

forks 的执行速度如此之快,以至于系统将 运行 耗尽 pids。

我添加了三项内容来解决此问题:

  1. 一个 "reap" 循环 waitpid(0,&status,WNOHANG) 以捕获完成的例程 children
  2. fork 周围添加了一个循环以捕获 "out of slots" 问题
  3. 添加了限制机制以将活动 children 的数量限制为 sane/useful 值

我已经对来源进行了注释,以指出错误所在的位置。

虽然不是硬错误,但对 每个 文件执行 fork 会增加大量开销。磁盘带宽将因大约四个活动 child 线程而饱和,因此使用更多只会减慢速度。为目录分叉 child 并没有多大作用,因为 "meaty" 处理将针对文件。

无论如何,这是更正后的代码[请原谅不必要的样式清理]:

#define _POSIX_C_SOURCE 200809L
#define _XOPEN_SOURCE 700
#include <unistd.h>
#include <dirent.h>
#include <stdlib.h>
#include <locale.h>
#include <string.h>
#include <errno.h>
#include <ftw.h>
#include <stdio.h>
#include <sys/wait.h>

#define TOTALNUMBEROFLINES 1000
#define BUFSIZE 1000

// output verbose/debug messages
int opt_v;

// limit of number of children that can be used at one time (if non-zero)
int opt_T;

int pendcnt;                            // number of active children

void
err_sys(const char *const str)
{
    perror(str);
    fflush(stdout);
    exit(1);
}

int
storeLinesInArray(const char *file, char lines[][BUFSIZE])
{
    return 0;
}

static int
for_each_file(const char *filepath, int totalLines, const char *key, const char arr[][BUFSIZE])
{

    fprintf(stdout, "File name is = %s\n", filepath);
    fflush(stdout);

    return 0;
}

// reap_some -- reap a few processes
int
reap_some(int final)
{
    pid_t pid;
    int status;
    int reapcnt;

    reapcnt = 0;

    // reap all completed children
    while (1) {
        pid = waitpid(0,&status,WNOHANG);
        if (pid == 0)
            break;

        if (pid == -1) {
            if (errno != ECHILD)
                perror("Failed to wait for child");
            break;
        }

        if (WIFSIGNALED(status)) {
            printf("Child %ld terminated due to uncaught signal %d\n",
                (long) pid, WTERMSIG(status));
            ++reapcnt;
            continue;
        }

        if (WIFSTOPPED(status)) {
            printf("Child %ld stopped due to signal %d\n",
                (long) pid, WSTOPSIG(status));
            continue;
        }

        if (WIFEXITED(status)) {
            ++reapcnt;
            if (WEXITSTATUS(status) == 0) {
                if (opt_v)
                    printf("parent [%d] reaped child [%d]\n", getpid(), pid);
            }
            else
                printf("Child %ld terminated with return status %d\n",
                    (long) pid, WEXITSTATUS(status));
            continue;
        }
    }

    // bump down the number of children that are "in-flight"
    pendcnt -= reapcnt;

    return reapcnt;
}

static int
soner_each_time(const char *filepath, const struct stat *info, int typeflag, struct FTW *ftwinfo)
{
    pid_t pid = 0;
    char *bp;
    int lvl;
    char buf[BUFSIZE];

    /* The variables are related to functions of reading file */

    int totalLines;
    char arr[TOTALNUMBEROFLINES][BUFSIZE];
    int retval;

    const char *const filename = filepath + ftwinfo->base;

    switch (typeflag) {
    case FTW_DP:
    case FTW_D:
        bp = buf;
        for (lvl = 0;  lvl < ftwinfo->level;  ++lvl)
            bp += sprintf(bp,"    ");
        bp += sprintf(bp, "%s\n\n",filepath);
        write(1, buf, strlen(buf));
        //reap_some(0);
        break;

    case FTW_F:
        // BUGFIX:
        // limit the number of in-flight children
        // too many children serves no purpose -- they saturate the system
        // resources and performance actually goes _down_ because the system
        // spends more time doing context switches between them than the actual
        // work. more than a few children to process files produces little
        // benefit after the disk I/O is running at maximum
        if (opt_T) {
            while (pendcnt > opt_T)
                reap_some(0);
        }

        // BUGFIX:
        // without a throttle, we spawn children so fast we're going to get
        // [many] failures here (i.e. we use up _all_ available pids)
        while (1) {
            pid = fork();
            if (pid >= 0)
                break;
            reap_some(0);
        }

        // parent
        // keep track of the child count
        if (pid > 0) {
            ++pendcnt;
            break;
        }

        // child
        sprintf(buf, "||| Child [%d] of parent [%d]: %s |||\n",
            getpid(), getppid(), filename);
        if (opt_v)
            write(1, buf, strlen(buf));

        /* Both of them are about reading function */
        totalLines = storeLinesInArray(filename, arr);

        retval = for_each_file(filename, totalLines, "not needed now", arr);

        sprintf(buf, "||| Child [%d] of parent [%d] is about to exit (RETVAL: %d) |||\n", getpid(), getppid(), retval);
        if (opt_v)
            write(1, buf, strlen(buf));

        // BUGFIX:
        // child won't exit without this -- causing multiple children to redo
        // the same files (i.e. they would continue the nftw -- only parent
        // should do that)
        exit(0);
        break;
    }

    return 0;
}

int
main(int argc, char **argv)
{
    char *cp;

    --argc;
    ++argv;

    opt_T = 10;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'T':  // throttle
            cp += 2;
            opt_T = (*cp != 0) ? atoi(cp) : 0;
            break;
        case 'v':  // verbose messages
            opt_v = 1;
            break;
        }
    }

    cp = *argv;
    printf("opt_T=%d opt_v=%d -- %s\n",opt_T,opt_v,cp);
    sleep(3);
    printf("away we go ...\n");

    if (nftw(cp, soner_each_time, 15, FTW_CHDIR)) {
        fprintf(stderr, "Failed directory.\n");
        exit(1);
    }

    // wait for all children to complete
    while (pendcnt > 0)
        reap_some(1);

    return 0;
}

更新:

更改代码以仅在 parent 中执行目录处理(即 child 仅针对文件进行分叉)。修复了一个错误。所以,现在,-T throttle 参数使用一个低得多的值,可以相当于 "number of workers"。更改程序以使用默认节流值。


更新#2:

I said parent because there is only one parent. I wonder whether I may trace wrong.

不,你是对的。只有一个parent。那是设计使然。

I would like to make parent for each directory like explained in the first scheme.

实际上,您不会't/won正确理解真正涉及的内容。 欧比万克诺比:"These are not the droids you're looking for"

在每个 目录 上执行 递归 分支存在许多技术、性能和系统饱和问题。我编写的示例以设计和性能的最佳折衷避免了所有这些问题。它还允许 master children 的 "run ahead" 并保持 children 尽可能忙,而不管给定目录中 files/subdirs 的数量。

旁注:我有 40 多年的经验,并且编写过许多 nftw 等效程序。因此,以下内容来自所有这些。

想要的最终结果是什么?

你只有框架代码,但你实际做的[打算做的]会影响架构。您的最终计划可能是:

  1. CPU 绑定[不断等待 CPU 乘法等操作]
  2. 内存受限[不断等待读取或写入 DRAM 完成]
  3. I/O 绑定[不断等待 I/O 操作完成]

还有,你要pre-order还是post-order[喜欢FTW_DEPTH]遍历?我假设 pre-order

您不能再使用nftw

您将需要使用 opendir/readdir/closedir [这就是 nftw 所做的]。

您需要的是一个在层次结构中执行 单个 级别的进程。让 nftw 中止并开始一个新的来实现它是一种折磨。

下面是一些伪代码。

但是... 实现变得更加复杂,不会提供更好的性能,实际上可能会降低性能。也可能导致无关程序崩溃,如Firefox、vlc、windowmanagers等

您现在需要进程间通信和共享内存

我上面的例子,只有一个控制进程。要保持节流,只需要 pendcnt 的简单 increment/decrement。

当您为目录添加 递归 分叉时,现在为目录分叉的 任何 子进程必须 increment/decrement [= 共享内存中pendcnt的110=]全局副本。它必须使用进程间信号量来控制对该变量的访问。或者,也许,一些原子 increment/decrement 原语 [ala C11 atomics].

现在,对该信号量的争用成为一个[延迟]因素。

性能:

拥有多个活动进程实际上 会降低 性能。换句话说,分叉目录实际上 运行 比单个进程慢

除了几个 "worker" 处理文件的进程外,磁盘 I/O 带宽将用完。通过添加更多进程,您将没有进一步受益。

对于许多进程,它们实际上可能会相互干扰。考虑进程 A 请求磁盘读取。但是,进程 B 也是如此。A 的读取在内核内部完成,但在它可以返回给 A 之前,用于 A 读取的内核缓冲区必须重新用于完成 B 的读取。 A的阅读将不得不重复。

这就是所谓的 [虚拟内存] 页 "thrashing"。

锁死系统

随着越来越多的磁盘I/O 完成,越来越多的内核缓冲区必须用于包含数据。内核可能不得不驱逐页面缓冲区以腾出空间。其中一些可能是针对上述不相关的程序。

换句话说,您的程序的许多进程可能会独占CPU、磁盘和内存的使用。某些程序(如 Firefox)会超时 [并崩溃],因为它们会看到长时间的延迟,而这些延迟在其他情况下是看不到的,并认为它们内部的某些东西导致了延迟。

我有 运行 这样的 nftw 程序并且看到 Firefox 说:"Killing locked up javascript script".

更糟糕的是,我让 vlc 在时间上落后并开始跳帧。这导致 window 经理感到困惑,因为它认为这是由于某些逻辑错误,而不仅仅是 非常 缓慢的响应系统。最终结果是 window 管理器中止,必须手动重新启动。

这也会降低更多关键程序和内核守护进程的速度。

在某些情况下,这只能通过系统重启来清除。

此外,运行在您与他人共享的系统上设置多个进程会使您变成 "bad citizen",因此请注意不要消耗太多资源。


无论如何,这是伪代码:

// pseudo -- loose pseudo-code for non-nftw method
//
// NOTES:
// (1) pendcnt must now be a _shared_ memory variable (e.g. shmget, etc)
// (2) access must be locked by a shared memory semaphore
// (3) we must now have a list of our outstanding children
// (4) we can no longer do a blind waitpid(0,&status,WNOHANG) as we need to
//     keep track of when our direct children complete

struct entfile {
    struct dirent ent;
    struct stat st;
};

// dodir -- enter/exit directory and perform all actions
void
dodir(const char *subdir)
{
    // NOTE: you can create a wrapper struct for this that also has stat
    struct entfile dirlist[1000];

    // add subdir to directory stack ...
    dirstack_push(subdir);

    // enter directory
    chdir(subdir);

    // do whatever you'd like ...
    process_directory(subdir);

    // open directory
    dirctl = opendir(".");

    // pre-save all entries [skipping "." and ".."]
    // this prevents too many open directory descriptors
    // NOTE: we should stat(2) the file if d_type not supported
    while (1) {
        dirent = readdir(dirctl);
        stat(dirent->d_name,&st);
        add_to_dirent_list(dirlist,dirent,&st);
    }

    // close directory _before_ we process any entries
    closedir(dirctl);

    // process all file entries -- pre-order
    for (ALL_IN_DIRLIST(ent,dirlist)) {
        if (ent->ent.d_type == ISFILE)
            doentry(ent);
    }
    wait_for_all_on_pendlist();

    // process all directory entries -- pre-order
    for (ALL_IN_DIRLIST(dirent,dirlist)) {
        if (ent->ent.d_type == ISDIR)
            doentry(ent);
    }
    wait_for_all_on_pendlist();

    // remove directory from stack
    dirstack_pop();

    // exit directory
    chdir("..")
}

// doentry -- process a directory entry
void
doentry(struct entfile *ent)
{
    char *tail;

    tail = ent->ent.d_name;

    do {
        // does throttle, etc.
        pid = forkme();

        // parent
        // see notes above
        if (pid) {
            // NOTE: these semaphore waits can be costly
            sem_wait();
            ++pendcnt;
            sem_post();

            add_pid_to_pendlist(pid,tail,...);
            break;
        }

        // child
        switch (ent->st.st.st_mode & ...) {
        case ISFILE:
            process_file(tail);
            break;
        case ISDIR:
            dodir(tail);
            break;
        }

        exit(0);
    } while (0);
}

// wait for immediate children
void
wait_for_all_on_pendlist(void)
{

    while (MORE_IN_PENDLIST) {
        for (FORALL_IN_PENDLIST(tsk)) {
            pid = waitpid(tsk->pid,&tsk->status,WNOHANG);

            // check status like reap_some
            if (pid > 0)
                remove_pid_from_pendlist(tsk);
        }
    }
}