为什么关闭管道需要很长时间才能终止 child 进程?

Why does closing a pipe take so long to terminate a child process?

我的程序在等待 child 进程 (gzip) 完成时遇到问题,并且花费了很长时间。

在它开始等待之前它关闭输入流到 gzip 所以这应该触发它很快终止。我检查了系统,gzip 没有消耗任何 CPU 或等待 IO(写入磁盘)。

非常奇怪的是它停止等待的时间...

我们在内部使用 pthreads 的程序。它并排处理 4 个 pthread。每个线程处理许多工作单元,并且对于每个工作单元,它启动一个新的 gzip 进程(使用 fork()execve())来写入结果。当 gzip 没有终止时线程挂起,但当其他线程关闭它们的实例时它突然终止。

为清楚起见,我正在设置一个管道:my program(pthread) --> gzip --> file.gz

我想这可以部分由 CPU 负载来解释。但是当进程间隔几分钟启动并且整个系统由于这个锁定问题而最终只使用 4 个核心中的 1 个时,这似乎不太可能。

开始 gzip 的代码如下。调用 execPipeProcess 使得 child 直接写入文件,但从我的程序中读取。即:

execPipeProcess(&process, "gzip", -1, gzFileFd)

有什么建议吗?

typedef struct {
    int processID;
    const char * command;
    int stdin;
    int stdout;
} ChildProcess;


void closeAndWait(ChildProcess * process) {
    if (process->stdin >= 0) {
                stdLog("Closing post process stdin");
                if (close(process->stdin)) {
                exitError(-1,errno, "Failed to close stdin for %s",  process->command);
                }
        }
    if (process->stdout >= 0) {
                stdLog("Closing post process stdin");
                if (close(process->stdout)) {
            exitError(-1,errno, "Failed to close stdout for %s", process->command);
                }
        }

    int status;
        stdLog("waiting on post process %d", process->processID);
    if (waitpid(process->processID, &status, 0) == -1) {
        exitError(-1, errno, "Could not wait for %s", process->command);
    }
        stdLog("post process finished");

    if (!WIFEXITED(status)) exitError(-1, 0, "Command did not exit properly %s", process->command);
    if (WEXITSTATUS(status)) exitError(-1, 0, "Command %s returned %d not 0", process->command, WEXITSTATUS(status));
    process->processID = 0;
}



void execPipeProcess(ChildProcess * process, const char* szCommand, int in, int out) {
    // Expand any args
    wordexp_t words;
    if (wordexp (szCommand, &words, 0)) exitError(-1, 0, "Could not expand command %s\n", szCommand);


    // Runs the command
    char nChar;
    int nResult;

    if (in < 0) {
        int aStdinPipe[2];
        if (pipe(aStdinPipe) < 0) {
            exitError(-1, errno, "allocating pipe for child input redirect failed");
        }
        process->stdin = aStdinPipe[PIPE_WRITE];
        in = aStdinPipe[PIPE_READ];
    }
    else {
        process->stdin = -1;
    }
    if (out < 0) {
        int aStdoutPipe[2];
        if (pipe(aStdoutPipe) < 0) {
            exitError(-1, errno, "allocating pipe for child input redirect failed");
        }
        process->stdout = aStdoutPipe[PIPE_READ];
        out = aStdoutPipe[PIPE_WRITE];
    }
    else {
        process->stdout = -1;
    }

    process->processID = fork();
    if (0 == process->processID) {
        // child continues here

        // these are for use by parent only
        if (process->stdin >= 0) close(process->stdin);
        if (process->stdout >= 0) close(process->stdout);

        // redirect stdin
        if (STDIN_FILENO != in) {
            if (dup2(in, STDIN_FILENO) == -1) {
              exitError(-1, errno, "redirecting stdin failed");
            }
            close(in);
        }

        // redirect stdout
        if (STDOUT_FILENO != out) {
            if (dup2(out, STDOUT_FILENO) == -1) {
              exitError(-1, errno, "redirecting stdout failed");
            }
            close(out);
        }

        // we're done with these; they've been duplicated to STDIN and STDOUT

        // run child process image
        // replace this with any exec* function find easier to use ("man exec")
        nResult = execvp(words.we_wordv[0], words.we_wordv);

        // if we get here at all, an error occurred, but we are in the child
        // process, so just exit
        exitError(-1, errno, "could not run %s", szCommand);
  } else if (process->processID > 0) {
        wordfree(&words);
        // parent continues here

        // close unused file descriptors, these are for child only
        close(in);
        close(out);
        process->command = szCommand;
    } else {
        exitError(-1,errno, "Failed to fork");
    }
}

子进程继承打开的文件描述符。

每个后续 gzip 子进程不仅继承用于与该特定实例通信的管道文件描述符,还继承用于连接到先前子进程实例的管道的文件描述符。

这意味着当主进程执行关闭时标准输入管道仍然打开,因为在一些子进程中还有一些其他文件描述符用于同一个管道。一旦那些终止管道最终关闭。

一个快速修复方法是通过设置 close-on-exec 标志来防止子进程继承专用于主进程的管道文件描述符。

由于涉及多个线程,因此应序列化子进程的生成,以防止子进程继承用于另一个子进程的管道 fds。

您没有给我们足够的信息来确定,因为答案取决于您如何使用所提供的功能。但是,您的 closeAndWait() 函数看起来有点可疑。假设 child 进程在到达其 stdin 末尾时将退出可能是合理的,但是它已经写入的数据应该发生什么,甚至可能仍然写入它的数据stdout?您的 child 进程挂起可能是因为它们的标准输出被阻塞,而且它们识别它的速度很慢。

我认为这反映了一个设计问题。如果您正在捕获 child 进程的输出(您似乎至少支持这样做),那么在关闭 child 输入流的 parent 末尾后,您将希望 parent 继续读取 child 的输出直到它结束,并执行它打算对其执行的任何处理。否则你可能会丢失其中的一些(对于 child 执行 gzip 将意味着损坏的数据)。如果您将关闭两个流作为终止 child.

过程的一部分,则无法执行此操作

相反,您应该先关闭 child 的 stdin 的 parent 的末端,继续处理它的输出直到到达它的末端,然后才尝试收集 child。如果您愿意,可以将关闭 child 输出流的 parent 末端作为收集 child 过程的一部分。或者,如果您真的想丢弃 child 的任何剩余输出,那么您应该 耗尽 在关闭输入和关闭输出之间的输出流。