使用 proc_open 流式传输 运行 时间生成的 gzip 文件

Stream run-time generated gzip file with proc_open

我正在尝试流式传输 tar.gz 而不在内存中缓冲任何内容或将数据保存到磁盘。我需要压缩一堆 PDF 文件(每个文件约 100kb)。

如果通过脚本发送 10-20 字节的小文本文件并且用户下载可读的 tar.gz 文件,一切似乎都正常,但是在发送真实数据时(运行-时间生成PDF 文件)脚本阻塞和停止

下面是代码片段。为什么在几次循环迭代后写入 stdin 时脚本会阻塞?它在此时停止等待某些东西

每一步都记录到一个文件中,以便在写入之前查看消息 stdin 是最后记录的消息

$proc = proc_open('gzip - -c', [
    0   => ['pipe', 'r'],
    1   => ['pipe', 'w'],
    2   => ['pipe', 'w']
], $pipes);

stream_set_read_buffer($pipes[1], 0);
stream_set_read_buffer($pipes[2], 0);

stream_set_blocking($pipes[1], false);
stream_set_blocking($pipes[2], false);

while(true){
    log_step('file stream');
    // fetching data from database and generating PDF file as tar stream (string)

    log_step('stdin: '.strlen($tar_string));
    fwrite($pipes[0], $tar_string); // <--- in the second iteration the script blocks/stops here!
    log_step('stdin done!');
    
    if($output = stream_get_contents($pipes[1])){
        log_step('output: '.strlen($output));
        echo $output;
    }
}

输出日志文件

2021-01-26 10:28:29 file stream
2021-01-26 10:28:29 stdin: 116224
2021-01-26 10:28:29 stdin done!
2021-01-26 10:28:29 output: 32768
2021-01-26 10:28:29 file stream
2021-01-26 10:28:29 stdin: 116736

完整代码

$proc = proc_open('gzip - -c', [
    0   => ['pipe', 'r'],
    1   => ['pipe', 'w'],
    2   => ['pipe', 'w']
], $pipes);
stream_set_read_buffer($pipes[1], 0);
stream_set_read_buffer($pipes[2], 0);
stream_set_blocking($pipes[1], false);
stream_set_blocking($pipes[2], false);

//  get data from database
while($row = $result->fetch()){
    //  generate PDF

    $filename = $pdf['name'];
    $filesize = strlen($pdf['data']);

    $header = pack(
        'a100a8a8a8a12A12a8a1a100a255',
        $filename,
        sprintf('%6s ',     ''),
        sprintf('%6s ',     ''),
        sprintf('%6s ',     ''),
        sprintf('%11s ',    $filesize),
        sprintf('%11s',     ''),
        sprintf('%8s ',     ' '),
        0,
        '',
        ''
    );
    
    $checksum = 0;
    for($i=0; $i<512; $i++){
        $checksum += ord($header{$i});
    }
    
    $checksum_data = pack(
        'a8',
        sprintf('%6s ',     decoct($checksum))
    );
    
    for($i=0, $j=148; $i<8; $i++, $j++){
        $header{$j} = $checksum_data{$i};
    }
    
    fwrite($pipes[0], $header.$pdf['data'].pack(
        'a'.(512 * ceil($filesize / 512) - $filesize),
        ''
    ));
    
    if($output = stream_get_contents($pipes[1])){
        echo $output;
    }
}

fwrite($pipes[0], pack('a512', ''));
fclose($pipes[0]);

while(true){
    if($output = stream_get_contents($pipes[1])){
        echo $output;
    }
    
    if(!proc_get_status($proc)['running']){
        foreach($pipes as $pipe){
            if(is_resource($pipe)){
                fclose($pipe);
            }
        }
        proc_close($proc);
        
        break;
    }
}

您的脚本没有进行的原因是它试图向管道中写入比 gzip 进程一次能够处理的更多的数据。情况大致是这样的:

  1. 您的脚本将 116736 字节写入管道。
  2. gzip 进程从其标准输入中读取其中的一些内容,对其进行压缩,并在其标准输出中输出压缩数据。
  3. PHP 进程被阻塞,直到 gzip 进程读取它写入管道的其余输入。
  4. gzip 进程被阻塞,直到 PHP 进程读取它写入标准输出的压缩输出。

因此您的脚本陷入了僵局。

问题的根源在于,与 C 中的同名不同,阻塞模式下的 PHP fwrite 函数将始终尝试将整个缓冲区写入流,直到所有内容都被写入.这也可以通过在标准输入管道上启用非阻塞模式并监视实际写入了多少输入来解决。例如像这样:

$proc = proc_open('gzip -c -', [
    0 => ['pipe', 'r'],
    1 => ['pipe', 'w'],
], $pipes);

stream_set_read_buffer($pipes[1], 0);

stream_set_blocking($pipes[0], false);
stream_set_blocking($pipes[1], false);

$tar_string = '';
for (;;) {
    if ($tar_string === '') {
        if (/* more input available */)
            $tar_string = /* read more input */;
        else {
            $tar_string = null;
            \fclose($pipes[0]);
        }
    }

    if ($tar_string !== null) {
        $written = \fwrite($pipes[0], $tar_string);
        if ($written === false)
            throw new \Exception('write error');
        $tar_string = \substr($tar_string, $written);
    }

    /* THIS IS JUST SOME DUMB DEMONSTRATIVE CODE, DO NOT COPY-PASTE */

    for (;;) {
        $outbuf = \fread($pipes[1], 69420);
        if ($outbuf === false)
            throw new \Exception('read error');
        if ($outbuf === '')
            break;
        $outlen = \strlen($outbuf);
        echo $outbuf;
    }
    
    if (\feof($pipes[1]))
        break;
}

以上内容表面上 有效。一个很大的缺点是它的性能会非常差:当 gzip 进程准备好读取或写入任何数据时,脚本将继续无用地忙循环并占用 CPU 时间来自实际需要它的 gzip 进程。

在更明智的编程语言中,您可以访问:

  • 诸如poll or select之类的调用,它们能够在流准备好读取或写入时发出信号,否则会放弃 CPU 时间给可能需要它的其他进程;
  • I/O 原语可以 return 成功 部分 读取或写入,而不是尝试处理缓冲区的整个大小。

但这是PHP,所以我们不能有好东西。至少不是内置的。

然而,对于这个问题有一个更好的解决方案,它完全避免了 proc_open,而是使用 zlib 扩展实现 gzip 压缩,如下所示:

$zctx = \deflate_init(ZLIB_ENCODING_GZIP);
if ($zctx === false)
    throw new \Exception('deflate_init failed');

while (/* more data available */) {
    $input = /* get more data */;
    $data = \deflate_add($zctx, $input, ZLIB_NO_FLUSH);
    if ($data === false)
        throw new \Exception('deflate_add failed');
    echo $data;
}

$data = \deflate_add($zctx, '', ZLIB_FINISH);
if ($data === false)
    throw new \Exception('deflate_add failed');
echo $data;

unset($zctx); // free compressor resources

deflate_init and deflate_add 自 PHP 7 起可用,假设在构建 PHP 时启用了 zlib 扩展。调用库比调用子进程更可取(实际上在 any 语言中),因为它更轻量级:将所有内容放在同一个进程中可以避免内存和上下文切换开销。