使用 fork() 遍历目录
traversing directories using fork()
我尝试使用 fork()
进入文件夹并读取文件。我使用 file tree walk 函数递归地进入文件夹。基本思路是一个目录下会有children个文件和目录。 children 分别并发地读取每个文件。但是,如果有目录,children 将是 children 的 parents 来读取文件。
static int soner_each_time(const char *filepath, const struct stat *info,
int typeflag, struct FTW *ftwinfo)
{
pid_t pid = 0;
char buf[BUFSIZE];
int status;
int i = 0;
/* The variables are related to functions of reading file */
int totalLines;
char arr[TOTALNUMBEROFLINES][BUFSIZE];
int retval;
const char *const filename = filepath + ftwinfo->base;
if (( pid = fork()) < 0) {
const int cause = errno;
fprintf(stderr, "Fork error: %s\n", strerror(cause));
errno = cause;
return -1;
}
else if( pid > 0 ) // parent
{
if (typeflag == FTW_DP || typeflag == FTW_D)
{
sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
write(1, buf, strlen(buf));
pid = wait(&status);
if (pid == -1)
perror("Failed to wait for child");
else if (WIFEXITED(status) && !WEXITSTATUS(status))
printf("parent [%d] reaped child [%d]\n", getpid(), pid);
else if (WIFEXITED(status))
printf("Child %ld terminated with return status %d\n",
(long)pid, WEXITSTATUS(status));
else if (WIFSIGNALED(status))
printf("Child %ld terminated due to uncaught signal %d\n",
(long)pid, WTERMSIG(status));
else if (WIFSTOPPED(status))
printf("Child %ld stopped due to signal %d\n",
(long)pid, WSTOPSIG(status));
}
}
if (pid == 0) // child
{
if (typeflag == FTW_F)
{
sprintf(buf, "||| Child [%d] of parent [%d]: %s |||\n", getpid(), getppid(), filename);
write(1, buf, strlen(buf));
/* Both of them are about reading function */
totalLines = storeLinesInArray(filename, arr);
retval = for_each_file(filename, totalLines, key, arr);
sprintf(buf, "||| Child [%d] of parent [%d] is about to exit |||\n", getpid(), getppid());
write(1, buf, strlen(buf));
}
else if (typeflag == FTW_DP || typeflag == FTW_D)
{
sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
write(1, buf, strlen(buf));
}
}
return 0;
}
FTW_DP
和FTW_D
表示文件夹FTW_F
表示文件。基本上,我每次都在代码 fork() 中尝试。如果是parent,则等待其children读取文件。因为该函数是递归的,所以它会分叉每次调用。但是,有一些关于它的东西我不能让它为一个文件分叉多个。例如 1a.txt
应该有一个 child 但对于这个方案它是 8。分叉主题真的很难。我做日常练习并尝试理解它。您的解释和帮助将提高我在该分支的技能。
@编辑:mcve 代码
#define _POSIX_C_SOURCE 200809L
#define _XOPEN_SOURCE 700
#include <unistd.h>
#include <dirent.h>
#include <stdlib.h>
#include <locale.h>
#include <string.h>
#include <ftw.h>
#include <stdio.h>
#define TOTALNUMBEROFLINES 1000
#define BUFSIZE 1000
void err_sys(const char *const str)
{
perror(str);
fflush(stdout);
exit(1);
}
int storeLinesInArray(const char *file, char lines[][BUFSIZE])
{
return 0;
}
static int for_each_file(const char *filepath, int totalLines, const char *key, const char arr[][BUFSIZE])
{
fprintf(stdout, "File name is = %s\n", filepath);
fflush(stdout);
return 0;
}
static int soner_each_time(const char *filepath, const struct stat *info,
int typeflag, struct FTW *ftwinfo)
{
pid_t pid = 0;
char buf[BUFSIZE];
int status;
/* The variables are related to functions of reading file */
int totalLines;
char arr[TOTALNUMBEROFLINES][BUFSIZE];
int retval;
const char *const filename = filepath + ftwinfo->base;
if (( pid = fork()) < 0) {
perror("failed fork");
exit(-1);
}
else if( pid > 0 ) // parent
{
if (typeflag == FTW_DP || typeflag == FTW_D)
{
sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
write(1, buf, strlen(buf));
pid = wait(&status);
if (pid == -1)
perror("Failed to wait for child");
else if (WIFEXITED(status) && !WEXITSTATUS(status))
printf("parent [%d] reaped child [%d]\n", getpid(), pid);
else if (WIFEXITED(status))
printf("Child %ld terminated with return status %d\n",
(long)pid, WEXITSTATUS(status));
else if (WIFSIGNALED(status))
printf("Child %ld terminated due to uncaught signal %d\n",
(long)pid, WTERMSIG(status));
else if (WIFSTOPPED(status))
printf("Child %ld stopped due to signal %d\n",
(long)pid, WSTOPSIG(status));
}
}
if (pid == 0) // child
{
if (typeflag == FTW_F)
{
sprintf(buf, "||| Child [%d] of parent [%d]: %s |||\n", getpid(), getppid(), filename);
write(1, buf, strlen(buf));
/* Both of them are about reading function */
totalLines = storeLinesInArray(filename, arr);
retval = for_each_file(filename, totalLines, "not needed now", arr);
sprintf(buf, "||| Child [%d] of parent [%d] is about to exit |||\n", getpid(), getppid());
write(1, buf, strlen(buf));
}
else if (typeflag == FTW_DP || typeflag == FTW_D)
{
sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
write(1, buf, strlen(buf));
}
}
return 0;
}
int main(int argc, char *argv[])
{
if (nftw("here is directory path", soner_each_time, 15, FTW_CHDIR)) {
fprintf(stderr, "Failed directory.\n");
exit(-1);
}
return 0;
}
你有一些错误。更正后的代码如下。
child没有调用exit
,所以它会继续它自己的nftw
,所以很多文件都在冗余处理。我添加了 exit(0)
.
fork
s 的执行速度如此之快,以至于系统将 运行 耗尽 pid
s。
我添加了三项内容来解决此问题:
- 一个 "reap" 循环
waitpid(0,&status,WNOHANG)
以捕获完成的例程 children
- 在
fork
周围添加了一个循环以捕获 "out of slots" 问题
- 添加了限制机制以将活动 children 的数量限制为 sane/useful 值
我已经对来源进行了注释,以指出错误所在的位置。
虽然不是硬错误,但对 每个 文件执行 fork
会增加大量开销。磁盘带宽将因大约四个活动 child 线程而饱和,因此使用更多只会减慢速度。为目录分叉 child 并没有多大作用,因为 "meaty" 处理将针对文件。
无论如何,这是更正后的代码[请原谅不必要的样式清理]:
#define _POSIX_C_SOURCE 200809L
#define _XOPEN_SOURCE 700
#include <unistd.h>
#include <dirent.h>
#include <stdlib.h>
#include <locale.h>
#include <string.h>
#include <errno.h>
#include <ftw.h>
#include <stdio.h>
#include <sys/wait.h>
#define TOTALNUMBEROFLINES 1000
#define BUFSIZE 1000
// output verbose/debug messages
int opt_v;
// limit of number of children that can be used at one time (if non-zero)
int opt_T;
int pendcnt; // number of active children
void
err_sys(const char *const str)
{
perror(str);
fflush(stdout);
exit(1);
}
int
storeLinesInArray(const char *file, char lines[][BUFSIZE])
{
return 0;
}
static int
for_each_file(const char *filepath, int totalLines, const char *key, const char arr[][BUFSIZE])
{
fprintf(stdout, "File name is = %s\n", filepath);
fflush(stdout);
return 0;
}
// reap_some -- reap a few processes
int
reap_some(int final)
{
pid_t pid;
int status;
int reapcnt;
reapcnt = 0;
// reap all completed children
while (1) {
pid = waitpid(0,&status,WNOHANG);
if (pid == 0)
break;
if (pid == -1) {
if (errno != ECHILD)
perror("Failed to wait for child");
break;
}
if (WIFSIGNALED(status)) {
printf("Child %ld terminated due to uncaught signal %d\n",
(long) pid, WTERMSIG(status));
++reapcnt;
continue;
}
if (WIFSTOPPED(status)) {
printf("Child %ld stopped due to signal %d\n",
(long) pid, WSTOPSIG(status));
continue;
}
if (WIFEXITED(status)) {
++reapcnt;
if (WEXITSTATUS(status) == 0) {
if (opt_v)
printf("parent [%d] reaped child [%d]\n", getpid(), pid);
}
else
printf("Child %ld terminated with return status %d\n",
(long) pid, WEXITSTATUS(status));
continue;
}
}
// bump down the number of children that are "in-flight"
pendcnt -= reapcnt;
return reapcnt;
}
static int
soner_each_time(const char *filepath, const struct stat *info, int typeflag, struct FTW *ftwinfo)
{
pid_t pid = 0;
char *bp;
int lvl;
char buf[BUFSIZE];
/* The variables are related to functions of reading file */
int totalLines;
char arr[TOTALNUMBEROFLINES][BUFSIZE];
int retval;
const char *const filename = filepath + ftwinfo->base;
switch (typeflag) {
case FTW_DP:
case FTW_D:
bp = buf;
for (lvl = 0; lvl < ftwinfo->level; ++lvl)
bp += sprintf(bp," ");
bp += sprintf(bp, "%s\n\n",filepath);
write(1, buf, strlen(buf));
//reap_some(0);
break;
case FTW_F:
// BUGFIX:
// limit the number of in-flight children
// too many children serves no purpose -- they saturate the system
// resources and performance actually goes _down_ because the system
// spends more time doing context switches between them than the actual
// work. more than a few children to process files produces little
// benefit after the disk I/O is running at maximum
if (opt_T) {
while (pendcnt > opt_T)
reap_some(0);
}
// BUGFIX:
// without a throttle, we spawn children so fast we're going to get
// [many] failures here (i.e. we use up _all_ available pids)
while (1) {
pid = fork();
if (pid >= 0)
break;
reap_some(0);
}
// parent
// keep track of the child count
if (pid > 0) {
++pendcnt;
break;
}
// child
sprintf(buf, "||| Child [%d] of parent [%d]: %s |||\n",
getpid(), getppid(), filename);
if (opt_v)
write(1, buf, strlen(buf));
/* Both of them are about reading function */
totalLines = storeLinesInArray(filename, arr);
retval = for_each_file(filename, totalLines, "not needed now", arr);
sprintf(buf, "||| Child [%d] of parent [%d] is about to exit (RETVAL: %d) |||\n", getpid(), getppid(), retval);
if (opt_v)
write(1, buf, strlen(buf));
// BUGFIX:
// child won't exit without this -- causing multiple children to redo
// the same files (i.e. they would continue the nftw -- only parent
// should do that)
exit(0);
break;
}
return 0;
}
int
main(int argc, char **argv)
{
char *cp;
--argc;
++argv;
opt_T = 10;
for (; argc > 0; --argc, ++argv) {
cp = *argv;
if (*cp != '-')
break;
switch (cp[1]) {
case 'T': // throttle
cp += 2;
opt_T = (*cp != 0) ? atoi(cp) : 0;
break;
case 'v': // verbose messages
opt_v = 1;
break;
}
}
cp = *argv;
printf("opt_T=%d opt_v=%d -- %s\n",opt_T,opt_v,cp);
sleep(3);
printf("away we go ...\n");
if (nftw(cp, soner_each_time, 15, FTW_CHDIR)) {
fprintf(stderr, "Failed directory.\n");
exit(1);
}
// wait for all children to complete
while (pendcnt > 0)
reap_some(1);
return 0;
}
更新:
更改代码以仅在 parent 中执行目录处理(即 child 仅针对文件进行分叉)。修复了一个错误。所以,现在,-T throttle 参数使用一个低得多的值,可以相当于 "number of workers"。更改程序以使用默认节流值。
更新#2:
I said parent because there is only one parent. I wonder whether I may trace wrong.
不,你是对的。只有一个parent。那是设计使然。
I would like to make parent for each directory like explained in the first scheme.
实际上,您不会't/won正确理解真正涉及的内容。 欧比万克诺比:"These are not the droids you're looking for"
在每个 目录 上执行 递归 分支存在许多技术、性能和系统饱和问题。我编写的示例以设计和性能的最佳折衷避免了所有这些问题。它还允许 master children 的 "run ahead" 并保持 children 尽可能忙,而不管给定目录中 files/subdirs 的数量。
旁注:我有 40 多年的经验,并且编写过许多 nftw
等效程序。因此,以下内容来自所有这些。
想要的最终结果是什么?
你只有框架代码,但你实际做的[打算做的]会影响架构。您的最终计划可能是:
- CPU 绑定[不断等待 CPU 乘法等操作]
- 内存受限[不断等待读取或写入 DRAM 完成]
- I/O 绑定[不断等待 I/O 操作完成]
还有,你要pre-order还是post-order[喜欢FTW_DEPTH]遍历?我假设 pre-order
您不能再使用nftw
。
您将需要使用 opendir/readdir/closedir
[这就是 nftw
所做的]。
您需要的是一个在层次结构中执行 单个 级别的进程。让 nftw
中止并开始一个新的来实现它是一种折磨。
下面是一些伪代码。
但是... 实现变得更加复杂,不会提供更好的性能,实际上可能会降低性能。也可能导致无关程序崩溃,如Firefox、vlc、windowmanagers等
您现在需要进程间通信和共享内存
我上面的例子,只有一个控制进程。要保持节流,只需要 pendcnt
的简单 increment/decrement。
当您为目录添加 递归 分叉时,现在为目录分叉的 任何 子进程必须 increment/decrement [= 共享内存中pendcnt
的110=]全局副本。它必须使用进程间信号量来控制对该变量的访问。或者,也许,一些原子 increment/decrement 原语 [ala C11 atomics].
现在,对该信号量的争用成为一个[延迟]因素。
性能:
拥有多个活动进程实际上 会降低 性能。换句话说,分叉目录实际上 运行 比单个进程慢 。
除了几个 "worker" 处理文件的进程外,磁盘 I/O 带宽将用完。通过添加更多进程,您将没有进一步受益。
对于许多进程,它们实际上可能会相互干扰。考虑进程 A 请求磁盘读取。但是,进程 B 也是如此。A 的读取在内核内部完成,但在它可以返回给 A 之前,用于 A 读取的内核缓冲区必须重新用于完成 B 的读取。 A的阅读将不得不重复。
这就是所谓的 [虚拟内存] 页 "thrashing"。
锁死系统
随着越来越多的磁盘I/O 完成,越来越多的内核缓冲区必须用于包含数据。内核可能不得不驱逐页面缓冲区以腾出空间。其中一些可能是针对上述不相关的程序。
换句话说,您的程序的许多进程可能会独占CPU、磁盘和内存的使用。某些程序(如 Firefox)会超时 [并崩溃],因为它们会看到长时间的延迟,而这些延迟在其他情况下是看不到的,并认为它们内部的某些东西导致了延迟。
我有 运行 这样的 nftw
程序并且看到 Firefox 说:"Killing locked up javascript script".
更糟糕的是,我让 vlc 在时间上落后并开始跳帧。这导致 window 经理感到困惑,因为它认为这是由于某些逻辑错误,而不仅仅是 非常 缓慢的响应系统。最终结果是 window 管理器中止,必须手动重新启动。
这也会降低更多关键程序和内核守护进程的速度。
在某些情况下,这只能通过系统重启来清除。
此外,运行在您与他人共享的系统上设置多个进程会使您变成 "bad citizen",因此请注意不要消耗太多资源。
无论如何,这是伪代码:
// pseudo -- loose pseudo-code for non-nftw method
//
// NOTES:
// (1) pendcnt must now be a _shared_ memory variable (e.g. shmget, etc)
// (2) access must be locked by a shared memory semaphore
// (3) we must now have a list of our outstanding children
// (4) we can no longer do a blind waitpid(0,&status,WNOHANG) as we need to
// keep track of when our direct children complete
struct entfile {
struct dirent ent;
struct stat st;
};
// dodir -- enter/exit directory and perform all actions
void
dodir(const char *subdir)
{
// NOTE: you can create a wrapper struct for this that also has stat
struct entfile dirlist[1000];
// add subdir to directory stack ...
dirstack_push(subdir);
// enter directory
chdir(subdir);
// do whatever you'd like ...
process_directory(subdir);
// open directory
dirctl = opendir(".");
// pre-save all entries [skipping "." and ".."]
// this prevents too many open directory descriptors
// NOTE: we should stat(2) the file if d_type not supported
while (1) {
dirent = readdir(dirctl);
stat(dirent->d_name,&st);
add_to_dirent_list(dirlist,dirent,&st);
}
// close directory _before_ we process any entries
closedir(dirctl);
// process all file entries -- pre-order
for (ALL_IN_DIRLIST(ent,dirlist)) {
if (ent->ent.d_type == ISFILE)
doentry(ent);
}
wait_for_all_on_pendlist();
// process all directory entries -- pre-order
for (ALL_IN_DIRLIST(dirent,dirlist)) {
if (ent->ent.d_type == ISDIR)
doentry(ent);
}
wait_for_all_on_pendlist();
// remove directory from stack
dirstack_pop();
// exit directory
chdir("..")
}
// doentry -- process a directory entry
void
doentry(struct entfile *ent)
{
char *tail;
tail = ent->ent.d_name;
do {
// does throttle, etc.
pid = forkme();
// parent
// see notes above
if (pid) {
// NOTE: these semaphore waits can be costly
sem_wait();
++pendcnt;
sem_post();
add_pid_to_pendlist(pid,tail,...);
break;
}
// child
switch (ent->st.st.st_mode & ...) {
case ISFILE:
process_file(tail);
break;
case ISDIR:
dodir(tail);
break;
}
exit(0);
} while (0);
}
// wait for immediate children
void
wait_for_all_on_pendlist(void)
{
while (MORE_IN_PENDLIST) {
for (FORALL_IN_PENDLIST(tsk)) {
pid = waitpid(tsk->pid,&tsk->status,WNOHANG);
// check status like reap_some
if (pid > 0)
remove_pid_from_pendlist(tsk);
}
}
}
我尝试使用 fork()
进入文件夹并读取文件。我使用 file tree walk 函数递归地进入文件夹。基本思路是一个目录下会有children个文件和目录。 children 分别并发地读取每个文件。但是,如果有目录,children 将是 children 的 parents 来读取文件。
static int soner_each_time(const char *filepath, const struct stat *info,
int typeflag, struct FTW *ftwinfo)
{
pid_t pid = 0;
char buf[BUFSIZE];
int status;
int i = 0;
/* The variables are related to functions of reading file */
int totalLines;
char arr[TOTALNUMBEROFLINES][BUFSIZE];
int retval;
const char *const filename = filepath + ftwinfo->base;
if (( pid = fork()) < 0) {
const int cause = errno;
fprintf(stderr, "Fork error: %s\n", strerror(cause));
errno = cause;
return -1;
}
else if( pid > 0 ) // parent
{
if (typeflag == FTW_DP || typeflag == FTW_D)
{
sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
write(1, buf, strlen(buf));
pid = wait(&status);
if (pid == -1)
perror("Failed to wait for child");
else if (WIFEXITED(status) && !WEXITSTATUS(status))
printf("parent [%d] reaped child [%d]\n", getpid(), pid);
else if (WIFEXITED(status))
printf("Child %ld terminated with return status %d\n",
(long)pid, WEXITSTATUS(status));
else if (WIFSIGNALED(status))
printf("Child %ld terminated due to uncaught signal %d\n",
(long)pid, WTERMSIG(status));
else if (WIFSTOPPED(status))
printf("Child %ld stopped due to signal %d\n",
(long)pid, WSTOPSIG(status));
}
}
if (pid == 0) // child
{
if (typeflag == FTW_F)
{
sprintf(buf, "||| Child [%d] of parent [%d]: %s |||\n", getpid(), getppid(), filename);
write(1, buf, strlen(buf));
/* Both of them are about reading function */
totalLines = storeLinesInArray(filename, arr);
retval = for_each_file(filename, totalLines, key, arr);
sprintf(buf, "||| Child [%d] of parent [%d] is about to exit |||\n", getpid(), getppid());
write(1, buf, strlen(buf));
}
else if (typeflag == FTW_DP || typeflag == FTW_D)
{
sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
write(1, buf, strlen(buf));
}
}
return 0;
}
FTW_DP
和FTW_D
表示文件夹FTW_F
表示文件。基本上,我每次都在代码 fork() 中尝试。如果是parent,则等待其children读取文件。因为该函数是递归的,所以它会分叉每次调用。但是,有一些关于它的东西我不能让它为一个文件分叉多个。例如 1a.txt
应该有一个 child 但对于这个方案它是 8。分叉主题真的很难。我做日常练习并尝试理解它。您的解释和帮助将提高我在该分支的技能。
@编辑:mcve 代码
#define _POSIX_C_SOURCE 200809L
#define _XOPEN_SOURCE 700
#include <unistd.h>
#include <dirent.h>
#include <stdlib.h>
#include <locale.h>
#include <string.h>
#include <ftw.h>
#include <stdio.h>
#define TOTALNUMBEROFLINES 1000
#define BUFSIZE 1000
void err_sys(const char *const str)
{
perror(str);
fflush(stdout);
exit(1);
}
int storeLinesInArray(const char *file, char lines[][BUFSIZE])
{
return 0;
}
static int for_each_file(const char *filepath, int totalLines, const char *key, const char arr[][BUFSIZE])
{
fprintf(stdout, "File name is = %s\n", filepath);
fflush(stdout);
return 0;
}
static int soner_each_time(const char *filepath, const struct stat *info,
int typeflag, struct FTW *ftwinfo)
{
pid_t pid = 0;
char buf[BUFSIZE];
int status;
/* The variables are related to functions of reading file */
int totalLines;
char arr[TOTALNUMBEROFLINES][BUFSIZE];
int retval;
const char *const filename = filepath + ftwinfo->base;
if (( pid = fork()) < 0) {
perror("failed fork");
exit(-1);
}
else if( pid > 0 ) // parent
{
if (typeflag == FTW_DP || typeflag == FTW_D)
{
sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
write(1, buf, strlen(buf));
pid = wait(&status);
if (pid == -1)
perror("Failed to wait for child");
else if (WIFEXITED(status) && !WEXITSTATUS(status))
printf("parent [%d] reaped child [%d]\n", getpid(), pid);
else if (WIFEXITED(status))
printf("Child %ld terminated with return status %d\n",
(long)pid, WEXITSTATUS(status));
else if (WIFSIGNALED(status))
printf("Child %ld terminated due to uncaught signal %d\n",
(long)pid, WTERMSIG(status));
else if (WIFSTOPPED(status))
printf("Child %ld stopped due to signal %d\n",
(long)pid, WSTOPSIG(status));
}
}
if (pid == 0) // child
{
if (typeflag == FTW_F)
{
sprintf(buf, "||| Child [%d] of parent [%d]: %s |||\n", getpid(), getppid(), filename);
write(1, buf, strlen(buf));
/* Both of them are about reading function */
totalLines = storeLinesInArray(filename, arr);
retval = for_each_file(filename, totalLines, "not needed now", arr);
sprintf(buf, "||| Child [%d] of parent [%d] is about to exit |||\n", getpid(), getppid());
write(1, buf, strlen(buf));
}
else if (typeflag == FTW_DP || typeflag == FTW_D)
{
sprintf(buf, "%*s%s\n\n", ftwinfo->level * 4, "", filepath);
write(1, buf, strlen(buf));
}
}
return 0;
}
int main(int argc, char *argv[])
{
if (nftw("here is directory path", soner_each_time, 15, FTW_CHDIR)) {
fprintf(stderr, "Failed directory.\n");
exit(-1);
}
return 0;
}
你有一些错误。更正后的代码如下。
child没有调用exit
,所以它会继续它自己的nftw
,所以很多文件都在冗余处理。我添加了 exit(0)
.
fork
s 的执行速度如此之快,以至于系统将 运行 耗尽 pid
s。
我添加了三项内容来解决此问题:
- 一个 "reap" 循环
waitpid(0,&status,WNOHANG)
以捕获完成的例程 children - 在
fork
周围添加了一个循环以捕获 "out of slots" 问题 - 添加了限制机制以将活动 children 的数量限制为 sane/useful 值
我已经对来源进行了注释,以指出错误所在的位置。
虽然不是硬错误,但对 每个 文件执行 fork
会增加大量开销。磁盘带宽将因大约四个活动 child 线程而饱和,因此使用更多只会减慢速度。为目录分叉 child 并没有多大作用,因为 "meaty" 处理将针对文件。
无论如何,这是更正后的代码[请原谅不必要的样式清理]:
#define _POSIX_C_SOURCE 200809L
#define _XOPEN_SOURCE 700
#include <unistd.h>
#include <dirent.h>
#include <stdlib.h>
#include <locale.h>
#include <string.h>
#include <errno.h>
#include <ftw.h>
#include <stdio.h>
#include <sys/wait.h>
#define TOTALNUMBEROFLINES 1000
#define BUFSIZE 1000
// output verbose/debug messages
int opt_v;
// limit of number of children that can be used at one time (if non-zero)
int opt_T;
int pendcnt; // number of active children
void
err_sys(const char *const str)
{
perror(str);
fflush(stdout);
exit(1);
}
int
storeLinesInArray(const char *file, char lines[][BUFSIZE])
{
return 0;
}
static int
for_each_file(const char *filepath, int totalLines, const char *key, const char arr[][BUFSIZE])
{
fprintf(stdout, "File name is = %s\n", filepath);
fflush(stdout);
return 0;
}
// reap_some -- reap a few processes
int
reap_some(int final)
{
pid_t pid;
int status;
int reapcnt;
reapcnt = 0;
// reap all completed children
while (1) {
pid = waitpid(0,&status,WNOHANG);
if (pid == 0)
break;
if (pid == -1) {
if (errno != ECHILD)
perror("Failed to wait for child");
break;
}
if (WIFSIGNALED(status)) {
printf("Child %ld terminated due to uncaught signal %d\n",
(long) pid, WTERMSIG(status));
++reapcnt;
continue;
}
if (WIFSTOPPED(status)) {
printf("Child %ld stopped due to signal %d\n",
(long) pid, WSTOPSIG(status));
continue;
}
if (WIFEXITED(status)) {
++reapcnt;
if (WEXITSTATUS(status) == 0) {
if (opt_v)
printf("parent [%d] reaped child [%d]\n", getpid(), pid);
}
else
printf("Child %ld terminated with return status %d\n",
(long) pid, WEXITSTATUS(status));
continue;
}
}
// bump down the number of children that are "in-flight"
pendcnt -= reapcnt;
return reapcnt;
}
static int
soner_each_time(const char *filepath, const struct stat *info, int typeflag, struct FTW *ftwinfo)
{
pid_t pid = 0;
char *bp;
int lvl;
char buf[BUFSIZE];
/* The variables are related to functions of reading file */
int totalLines;
char arr[TOTALNUMBEROFLINES][BUFSIZE];
int retval;
const char *const filename = filepath + ftwinfo->base;
switch (typeflag) {
case FTW_DP:
case FTW_D:
bp = buf;
for (lvl = 0; lvl < ftwinfo->level; ++lvl)
bp += sprintf(bp," ");
bp += sprintf(bp, "%s\n\n",filepath);
write(1, buf, strlen(buf));
//reap_some(0);
break;
case FTW_F:
// BUGFIX:
// limit the number of in-flight children
// too many children serves no purpose -- they saturate the system
// resources and performance actually goes _down_ because the system
// spends more time doing context switches between them than the actual
// work. more than a few children to process files produces little
// benefit after the disk I/O is running at maximum
if (opt_T) {
while (pendcnt > opt_T)
reap_some(0);
}
// BUGFIX:
// without a throttle, we spawn children so fast we're going to get
// [many] failures here (i.e. we use up _all_ available pids)
while (1) {
pid = fork();
if (pid >= 0)
break;
reap_some(0);
}
// parent
// keep track of the child count
if (pid > 0) {
++pendcnt;
break;
}
// child
sprintf(buf, "||| Child [%d] of parent [%d]: %s |||\n",
getpid(), getppid(), filename);
if (opt_v)
write(1, buf, strlen(buf));
/* Both of them are about reading function */
totalLines = storeLinesInArray(filename, arr);
retval = for_each_file(filename, totalLines, "not needed now", arr);
sprintf(buf, "||| Child [%d] of parent [%d] is about to exit (RETVAL: %d) |||\n", getpid(), getppid(), retval);
if (opt_v)
write(1, buf, strlen(buf));
// BUGFIX:
// child won't exit without this -- causing multiple children to redo
// the same files (i.e. they would continue the nftw -- only parent
// should do that)
exit(0);
break;
}
return 0;
}
int
main(int argc, char **argv)
{
char *cp;
--argc;
++argv;
opt_T = 10;
for (; argc > 0; --argc, ++argv) {
cp = *argv;
if (*cp != '-')
break;
switch (cp[1]) {
case 'T': // throttle
cp += 2;
opt_T = (*cp != 0) ? atoi(cp) : 0;
break;
case 'v': // verbose messages
opt_v = 1;
break;
}
}
cp = *argv;
printf("opt_T=%d opt_v=%d -- %s\n",opt_T,opt_v,cp);
sleep(3);
printf("away we go ...\n");
if (nftw(cp, soner_each_time, 15, FTW_CHDIR)) {
fprintf(stderr, "Failed directory.\n");
exit(1);
}
// wait for all children to complete
while (pendcnt > 0)
reap_some(1);
return 0;
}
更新:
更改代码以仅在 parent 中执行目录处理(即 child 仅针对文件进行分叉)。修复了一个错误。所以,现在,-T throttle 参数使用一个低得多的值,可以相当于 "number of workers"。更改程序以使用默认节流值。
更新#2:
I said parent because there is only one parent. I wonder whether I may trace wrong.
不,你是对的。只有一个parent。那是设计使然。
I would like to make parent for each directory like explained in the first scheme.
实际上,您不会't/won正确理解真正涉及的内容。 欧比万克诺比:"These are not the droids you're looking for"
在每个 目录 上执行 递归 分支存在许多技术、性能和系统饱和问题。我编写的示例以设计和性能的最佳折衷避免了所有这些问题。它还允许 master children 的 "run ahead" 并保持 children 尽可能忙,而不管给定目录中 files/subdirs 的数量。
旁注:我有 40 多年的经验,并且编写过许多 nftw
等效程序。因此,以下内容来自所有这些。
想要的最终结果是什么?
你只有框架代码,但你实际做的[打算做的]会影响架构。您的最终计划可能是:
- CPU 绑定[不断等待 CPU 乘法等操作]
- 内存受限[不断等待读取或写入 DRAM 完成]
- I/O 绑定[不断等待 I/O 操作完成]
还有,你要pre-order还是post-order[喜欢FTW_DEPTH]遍历?我假设 pre-order
您不能再使用nftw
。
您将需要使用 opendir/readdir/closedir
[这就是 nftw
所做的]。
您需要的是一个在层次结构中执行 单个 级别的进程。让 nftw
中止并开始一个新的来实现它是一种折磨。
下面是一些伪代码。
但是... 实现变得更加复杂,不会提供更好的性能,实际上可能会降低性能。也可能导致无关程序崩溃,如Firefox、vlc、windowmanagers等
您现在需要进程间通信和共享内存
我上面的例子,只有一个控制进程。要保持节流,只需要 pendcnt
的简单 increment/decrement。
当您为目录添加 递归 分叉时,现在为目录分叉的 任何 子进程必须 increment/decrement [= 共享内存中pendcnt
的110=]全局副本。它必须使用进程间信号量来控制对该变量的访问。或者,也许,一些原子 increment/decrement 原语 [ala C11 atomics].
现在,对该信号量的争用成为一个[延迟]因素。
性能:
拥有多个活动进程实际上 会降低 性能。换句话说,分叉目录实际上 运行 比单个进程慢 。
除了几个 "worker" 处理文件的进程外,磁盘 I/O 带宽将用完。通过添加更多进程,您将没有进一步受益。
对于许多进程,它们实际上可能会相互干扰。考虑进程 A 请求磁盘读取。但是,进程 B 也是如此。A 的读取在内核内部完成,但在它可以返回给 A 之前,用于 A 读取的内核缓冲区必须重新用于完成 B 的读取。 A的阅读将不得不重复。
这就是所谓的 [虚拟内存] 页 "thrashing"。
锁死系统
随着越来越多的磁盘I/O 完成,越来越多的内核缓冲区必须用于包含数据。内核可能不得不驱逐页面缓冲区以腾出空间。其中一些可能是针对上述不相关的程序。
换句话说,您的程序的许多进程可能会独占CPU、磁盘和内存的使用。某些程序(如 Firefox)会超时 [并崩溃],因为它们会看到长时间的延迟,而这些延迟在其他情况下是看不到的,并认为它们内部的某些东西导致了延迟。
我有 运行 这样的 nftw
程序并且看到 Firefox 说:"Killing locked up javascript script".
更糟糕的是,我让 vlc 在时间上落后并开始跳帧。这导致 window 经理感到困惑,因为它认为这是由于某些逻辑错误,而不仅仅是 非常 缓慢的响应系统。最终结果是 window 管理器中止,必须手动重新启动。
这也会降低更多关键程序和内核守护进程的速度。
在某些情况下,这只能通过系统重启来清除。
此外,运行在您与他人共享的系统上设置多个进程会使您变成 "bad citizen",因此请注意不要消耗太多资源。
无论如何,这是伪代码:
// pseudo -- loose pseudo-code for non-nftw method
//
// NOTES:
// (1) pendcnt must now be a _shared_ memory variable (e.g. shmget, etc)
// (2) access must be locked by a shared memory semaphore
// (3) we must now have a list of our outstanding children
// (4) we can no longer do a blind waitpid(0,&status,WNOHANG) as we need to
// keep track of when our direct children complete
struct entfile {
struct dirent ent;
struct stat st;
};
// dodir -- enter/exit directory and perform all actions
void
dodir(const char *subdir)
{
// NOTE: you can create a wrapper struct for this that also has stat
struct entfile dirlist[1000];
// add subdir to directory stack ...
dirstack_push(subdir);
// enter directory
chdir(subdir);
// do whatever you'd like ...
process_directory(subdir);
// open directory
dirctl = opendir(".");
// pre-save all entries [skipping "." and ".."]
// this prevents too many open directory descriptors
// NOTE: we should stat(2) the file if d_type not supported
while (1) {
dirent = readdir(dirctl);
stat(dirent->d_name,&st);
add_to_dirent_list(dirlist,dirent,&st);
}
// close directory _before_ we process any entries
closedir(dirctl);
// process all file entries -- pre-order
for (ALL_IN_DIRLIST(ent,dirlist)) {
if (ent->ent.d_type == ISFILE)
doentry(ent);
}
wait_for_all_on_pendlist();
// process all directory entries -- pre-order
for (ALL_IN_DIRLIST(dirent,dirlist)) {
if (ent->ent.d_type == ISDIR)
doentry(ent);
}
wait_for_all_on_pendlist();
// remove directory from stack
dirstack_pop();
// exit directory
chdir("..")
}
// doentry -- process a directory entry
void
doentry(struct entfile *ent)
{
char *tail;
tail = ent->ent.d_name;
do {
// does throttle, etc.
pid = forkme();
// parent
// see notes above
if (pid) {
// NOTE: these semaphore waits can be costly
sem_wait();
++pendcnt;
sem_post();
add_pid_to_pendlist(pid,tail,...);
break;
}
// child
switch (ent->st.st.st_mode & ...) {
case ISFILE:
process_file(tail);
break;
case ISDIR:
dodir(tail);
break;
}
exit(0);
} while (0);
}
// wait for immediate children
void
wait_for_all_on_pendlist(void)
{
while (MORE_IN_PENDLIST) {
for (FORALL_IN_PENDLIST(tsk)) {
pid = waitpid(tsk->pid,&tsk->status,WNOHANG);
// check status like reap_some
if (pid > 0)
remove_pid_from_pendlist(tsk);
}
}
}