我在这里缺少什么来从这个生成的 Node.js 子进程中获取数据?

What am I missing here to get data out of this spawned Node.js child process?

我正在尝试使用生成的命令行 lzip 进程来扩展 lzipped 数据流,因为我还没有找到任何好的本机 JavaScript 工具来完成这项工作。

我可以使用文件和文件描述符让它工作,但是必须写出并读回一堆临时暂存文件似乎很愚蠢。我想在记忆中做我能做的所有工作。

这是我尝试使用的代码:

import { requestBinary } from 'by-request';
import { spawn } from 'child_process';
import { min } from '@tubular/math';

export async function tarLzToZip(url: string): Promise<void> {
  const lzData = await requestBinary(url, { headers: { 'User-Agent': 'curl/7.64.1' } });
  const lzipProc = spawn('lzip', ['-d'], { stdio: ['pipe', 'pipe', process.stderr] });
  let tarContent = Buffer.alloc(0);

  lzipProc.stdout.on('data', data => {
    tarContent = Buffer.concat([tarContent, data], tarContent.length + data.length);
  });

  for (let offset = 0; offset < lzData.length; offset += 4096) {
    await new Promise<void>((resolve, reject) => {
      lzipProc.stdin.write(lzData.slice(offset, min(offset + 4096, lzData.length)), err => {
        if (err)
          reject(err);
        else
          resolve();
      });
    });
  }

  await new Promise<void>((resolve, reject) => {
    lzipProc.stdin.end((err: any) => {
      if (err)
        reject(err);
      else
        resolve();
    });
  });

  console.log('data length:', tarContent.length);
}

当我使用调试器单步执行时,将数据发送到 lzipProc.stdin 似乎一切都很顺利。 (我试过像这样同时处理两个块,并一次完成所有数据。)lzipProc.stdout.on('data', data =>,但是,永远不会被调用。当我走到尽头时,tarContent 是空的。

这里缺少什么?我需要不同的 stdio 配置吗?我应该使用不同的流对象吗?我需要更多的山羊在满月之光下牺牲吗?

更新

我的解决方案基于下面发布的 Matt 的出色回答,包括我的用例的所有细节:

import archiver from 'archiver';
import fs, { ReadStream } from 'fs';
import fsp from 'fs/promises';
import needle from 'needle';
import path from 'path';
import { spawn } from 'child_process';
import tar from 'tar-stream';

const baseUrl = 'https://data.iana.org/time-zones/releases/';

export async function codeAndDataToZip(version: string): Promise<ReadStream> {
  return compressedTarToZip(`${baseUrl}tzdb-${version}.tar.lz`);
}

export async function codeToZip(version: string): Promise<ReadStream> {
  return compressedTarToZip(`${baseUrl}tzcode${version}.tar.gz`);
}

export async function dataToZip(version: string): Promise<ReadStream> {
  return compressedTarToZip(`${baseUrl}tzdata${version}.tar.gz`);
}

async function compressedTarToZip(url: string): Promise<ReadStream> {
  const fileName = /([-a-z0-9]+)\.tar\.[lg]z$/i.exec(url)[1] + '.zip';
  const filePath = path.join(process.env.TZE_ZIP_DIR || path.join(__dirname, 'tz-zip-cache'), fileName);

  if (await fsp.stat(filePath).catch(() => false))
    return fs.createReadStream(filePath);

  const [command, args] = url.endsWith('.lz') ? ['lzip', ['-d']] : ['gzip', ['-dc']];
  const originalArchive = needle.get(url, { headers: { 'User-Agent': 'curl/7.64.1' } });
  const tarExtract = tar.extract({ allowUnknownFormat: true });
  const zipPack = archiver('zip');
  const writeFile = fs.createWriteStream(filePath);
  const commandProc = spawn(command, args);

  commandProc.stderr.on('data', msg => { throw new Error(`${command} error: ${msg}`); });
  commandProc.stderr.on('error', err => { throw err; });

  originalArchive.pipe(commandProc.stdin);
  commandProc.stdout.pipe(tarExtract);

  tarExtract.on('entry', (header, stream, next) => {
    zipPack.append(stream, { name: header.name, date: header.mtime });
    stream.on('end', next);
  });

  tarExtract.on('finish', () => zipPack.finalize());
  zipPack.pipe(writeFile);

  return new Promise<ReadStream>((resolve, reject) => {
    const rejectWithError = (err: any): void =>
      reject(err instanceof Error ? err : new Error(err.message || err.toString()));

    writeFile.on('error', rejectWithError);
    writeFile.on('finish', () => resolve(fs.createReadStream(filePath)));
    tarExtract.on('error', err => {
      // tar-stream has a problem with the format of a few of the tar files
      // dealt with here, which nevertheless are valid archives.
      if (/unexpected end of data|invalid tar header/i.test(err.message))
        console.error('Archive %s: %s', url, err.message);
      else
        reject(err);
    });
    zipPack.on('error', rejectWithError);
    zipPack.on('warning', rejectWithError);
    commandProc.on('error', rejectWithError);
    commandProc.on('exit', err => err && reject(new Error(`${command} error: ${err}`)));
    originalArchive.on('error', rejectWithError);
  });
}

除非您有特定的处理需要完成,否则我会将流式处理留给节点或程序包。只需将整个流设置包装在一个承诺中。

如果您还流式传输 request/response,它可以通过管道传输到解压缩器中。然后 stdout 从解压器可以通过管道传输到归档流处理程序。

import fs from 'fs'
import { spawn } from 'child_process'
import needle from 'needle'
import tar from 'tar-stream'
import archiver from 'archiver'

export function tarLzToZip(url) {
  return new Promise((resolve, reject) => {
    // Setup streams
    const res = needle.get(url)
    const lzipProc = spawn('lzip', ['-dc'], { stdio: ['pipe','pipe',process.stderr] })
    const tarExtract = tar.extract()
    const zipPack = archiver('zip')
    const writeFile = fs.createWriteStream('tardir.zip')

    // Pipelines and processing
    res.pipe(gzipProc.stdin)
    lzipProc.stdout.pipe(tarExtract)
    // tar -> zip (simple file name)
    tarExtract.on('entry', function(header, stream, next) {
      console.log('entry', header)
      zipPack.append(stream, { name: header.name })
      stream.on('end', () => next())
    })
    tarExtract.on('finish', function() {
      zipPack.finalize()
    })
    zipPack.pipe(writeFile)

    // Handle the things
    writeFile.on('error', reject)
    writeFile.on('close', () => console.log('write close'))
    writeFile.on('finish', resolve(true))
    tarExtract.on('error', reject)
    zipPack.on('error', reject)
    zipPack.on('warning', reject)
    lzipProc.on('error', reject)
    lzipProc.on('exit', code => {if (code !== 0) reject(new Error(`lzip ${code}`))})
    res.on('error', reject)
    res.on('done', ()=> console.log('request done', res.request.statusCode))
  })
}

您可能希望更详细地记录错误和 stderr,因为单一承诺 reject 可以轻松隐藏多个流中实际发生的事情。