我在这里缺少什么来从这个生成的 Node.js 子进程中获取数据?
What am I missing here to get data out of this spawned Node.js child process?
我正在尝试使用生成的命令行 lzip
进程来扩展 lzipped 数据流,因为我还没有找到任何好的本机 JavaScript 工具来完成这项工作。
我可以使用文件和文件描述符让它工作,但是必须写出并读回一堆临时暂存文件似乎很愚蠢。我想在记忆中做我能做的所有工作。
这是我尝试使用的代码:
import { requestBinary } from 'by-request';
import { spawn } from 'child_process';
import { min } from '@tubular/math';
export async function tarLzToZip(url: string): Promise<void> {
const lzData = await requestBinary(url, { headers: { 'User-Agent': 'curl/7.64.1' } });
const lzipProc = spawn('lzip', ['-d'], { stdio: ['pipe', 'pipe', process.stderr] });
let tarContent = Buffer.alloc(0);
lzipProc.stdout.on('data', data => {
tarContent = Buffer.concat([tarContent, data], tarContent.length + data.length);
});
for (let offset = 0; offset < lzData.length; offset += 4096) {
await new Promise<void>((resolve, reject) => {
lzipProc.stdin.write(lzData.slice(offset, min(offset + 4096, lzData.length)), err => {
if (err)
reject(err);
else
resolve();
});
});
}
await new Promise<void>((resolve, reject) => {
lzipProc.stdin.end((err: any) => {
if (err)
reject(err);
else
resolve();
});
});
console.log('data length:', tarContent.length);
}
当我使用调试器单步执行时,将数据发送到 lzipProc.stdin
似乎一切都很顺利。 (我试过像这样同时处理两个块,并一次完成所有数据。)lzipProc.stdout.on('data', data =>
,但是,永远不会被调用。当我走到尽头时,tarContent
是空的。
这里缺少什么?我需要不同的 stdio
配置吗?我应该使用不同的流对象吗?我需要更多的山羊在满月之光下牺牲吗?
更新
我的解决方案基于下面发布的 Matt 的出色回答,包括我的用例的所有细节:
import archiver from 'archiver';
import fs, { ReadStream } from 'fs';
import fsp from 'fs/promises';
import needle from 'needle';
import path from 'path';
import { spawn } from 'child_process';
import tar from 'tar-stream';
const baseUrl = 'https://data.iana.org/time-zones/releases/';
export async function codeAndDataToZip(version: string): Promise<ReadStream> {
return compressedTarToZip(`${baseUrl}tzdb-${version}.tar.lz`);
}
export async function codeToZip(version: string): Promise<ReadStream> {
return compressedTarToZip(`${baseUrl}tzcode${version}.tar.gz`);
}
export async function dataToZip(version: string): Promise<ReadStream> {
return compressedTarToZip(`${baseUrl}tzdata${version}.tar.gz`);
}
async function compressedTarToZip(url: string): Promise<ReadStream> {
const fileName = /([-a-z0-9]+)\.tar\.[lg]z$/i.exec(url)[1] + '.zip';
const filePath = path.join(process.env.TZE_ZIP_DIR || path.join(__dirname, 'tz-zip-cache'), fileName);
if (await fsp.stat(filePath).catch(() => false))
return fs.createReadStream(filePath);
const [command, args] = url.endsWith('.lz') ? ['lzip', ['-d']] : ['gzip', ['-dc']];
const originalArchive = needle.get(url, { headers: { 'User-Agent': 'curl/7.64.1' } });
const tarExtract = tar.extract({ allowUnknownFormat: true });
const zipPack = archiver('zip');
const writeFile = fs.createWriteStream(filePath);
const commandProc = spawn(command, args);
commandProc.stderr.on('data', msg => { throw new Error(`${command} error: ${msg}`); });
commandProc.stderr.on('error', err => { throw err; });
originalArchive.pipe(commandProc.stdin);
commandProc.stdout.pipe(tarExtract);
tarExtract.on('entry', (header, stream, next) => {
zipPack.append(stream, { name: header.name, date: header.mtime });
stream.on('end', next);
});
tarExtract.on('finish', () => zipPack.finalize());
zipPack.pipe(writeFile);
return new Promise<ReadStream>((resolve, reject) => {
const rejectWithError = (err: any): void =>
reject(err instanceof Error ? err : new Error(err.message || err.toString()));
writeFile.on('error', rejectWithError);
writeFile.on('finish', () => resolve(fs.createReadStream(filePath)));
tarExtract.on('error', err => {
// tar-stream has a problem with the format of a few of the tar files
// dealt with here, which nevertheless are valid archives.
if (/unexpected end of data|invalid tar header/i.test(err.message))
console.error('Archive %s: %s', url, err.message);
else
reject(err);
});
zipPack.on('error', rejectWithError);
zipPack.on('warning', rejectWithError);
commandProc.on('error', rejectWithError);
commandProc.on('exit', err => err && reject(new Error(`${command} error: ${err}`)));
originalArchive.on('error', rejectWithError);
});
}
除非您有特定的处理需要完成,否则我会将流式处理留给节点或程序包。只需将整个流设置包装在一个承诺中。
如果您还流式传输 request/response,它可以通过管道传输到解压缩器中。然后 stdout
从解压器可以通过管道传输到归档流处理程序。
import fs from 'fs'
import { spawn } from 'child_process'
import needle from 'needle'
import tar from 'tar-stream'
import archiver from 'archiver'
export function tarLzToZip(url) {
return new Promise((resolve, reject) => {
// Setup streams
const res = needle.get(url)
const lzipProc = spawn('lzip', ['-dc'], { stdio: ['pipe','pipe',process.stderr] })
const tarExtract = tar.extract()
const zipPack = archiver('zip')
const writeFile = fs.createWriteStream('tardir.zip')
// Pipelines and processing
res.pipe(gzipProc.stdin)
lzipProc.stdout.pipe(tarExtract)
// tar -> zip (simple file name)
tarExtract.on('entry', function(header, stream, next) {
console.log('entry', header)
zipPack.append(stream, { name: header.name })
stream.on('end', () => next())
})
tarExtract.on('finish', function() {
zipPack.finalize()
})
zipPack.pipe(writeFile)
// Handle the things
writeFile.on('error', reject)
writeFile.on('close', () => console.log('write close'))
writeFile.on('finish', resolve(true))
tarExtract.on('error', reject)
zipPack.on('error', reject)
zipPack.on('warning', reject)
lzipProc.on('error', reject)
lzipProc.on('exit', code => {if (code !== 0) reject(new Error(`lzip ${code}`))})
res.on('error', reject)
res.on('done', ()=> console.log('request done', res.request.statusCode))
})
}
您可能希望更详细地记录错误和 stderr,因为单一承诺 reject
可以轻松隐藏多个流中实际发生的事情。
我正在尝试使用生成的命令行 lzip
进程来扩展 lzipped 数据流,因为我还没有找到任何好的本机 JavaScript 工具来完成这项工作。
我可以使用文件和文件描述符让它工作,但是必须写出并读回一堆临时暂存文件似乎很愚蠢。我想在记忆中做我能做的所有工作。
这是我尝试使用的代码:
import { requestBinary } from 'by-request';
import { spawn } from 'child_process';
import { min } from '@tubular/math';
export async function tarLzToZip(url: string): Promise<void> {
const lzData = await requestBinary(url, { headers: { 'User-Agent': 'curl/7.64.1' } });
const lzipProc = spawn('lzip', ['-d'], { stdio: ['pipe', 'pipe', process.stderr] });
let tarContent = Buffer.alloc(0);
lzipProc.stdout.on('data', data => {
tarContent = Buffer.concat([tarContent, data], tarContent.length + data.length);
});
for (let offset = 0; offset < lzData.length; offset += 4096) {
await new Promise<void>((resolve, reject) => {
lzipProc.stdin.write(lzData.slice(offset, min(offset + 4096, lzData.length)), err => {
if (err)
reject(err);
else
resolve();
});
});
}
await new Promise<void>((resolve, reject) => {
lzipProc.stdin.end((err: any) => {
if (err)
reject(err);
else
resolve();
});
});
console.log('data length:', tarContent.length);
}
当我使用调试器单步执行时,将数据发送到 lzipProc.stdin
似乎一切都很顺利。 (我试过像这样同时处理两个块,并一次完成所有数据。)lzipProc.stdout.on('data', data =>
,但是,永远不会被调用。当我走到尽头时,tarContent
是空的。
这里缺少什么?我需要不同的 stdio
配置吗?我应该使用不同的流对象吗?我需要更多的山羊在满月之光下牺牲吗?
更新
我的解决方案基于下面发布的 Matt 的出色回答,包括我的用例的所有细节:
import archiver from 'archiver';
import fs, { ReadStream } from 'fs';
import fsp from 'fs/promises';
import needle from 'needle';
import path from 'path';
import { spawn } from 'child_process';
import tar from 'tar-stream';
const baseUrl = 'https://data.iana.org/time-zones/releases/';
export async function codeAndDataToZip(version: string): Promise<ReadStream> {
return compressedTarToZip(`${baseUrl}tzdb-${version}.tar.lz`);
}
export async function codeToZip(version: string): Promise<ReadStream> {
return compressedTarToZip(`${baseUrl}tzcode${version}.tar.gz`);
}
export async function dataToZip(version: string): Promise<ReadStream> {
return compressedTarToZip(`${baseUrl}tzdata${version}.tar.gz`);
}
async function compressedTarToZip(url: string): Promise<ReadStream> {
const fileName = /([-a-z0-9]+)\.tar\.[lg]z$/i.exec(url)[1] + '.zip';
const filePath = path.join(process.env.TZE_ZIP_DIR || path.join(__dirname, 'tz-zip-cache'), fileName);
if (await fsp.stat(filePath).catch(() => false))
return fs.createReadStream(filePath);
const [command, args] = url.endsWith('.lz') ? ['lzip', ['-d']] : ['gzip', ['-dc']];
const originalArchive = needle.get(url, { headers: { 'User-Agent': 'curl/7.64.1' } });
const tarExtract = tar.extract({ allowUnknownFormat: true });
const zipPack = archiver('zip');
const writeFile = fs.createWriteStream(filePath);
const commandProc = spawn(command, args);
commandProc.stderr.on('data', msg => { throw new Error(`${command} error: ${msg}`); });
commandProc.stderr.on('error', err => { throw err; });
originalArchive.pipe(commandProc.stdin);
commandProc.stdout.pipe(tarExtract);
tarExtract.on('entry', (header, stream, next) => {
zipPack.append(stream, { name: header.name, date: header.mtime });
stream.on('end', next);
});
tarExtract.on('finish', () => zipPack.finalize());
zipPack.pipe(writeFile);
return new Promise<ReadStream>((resolve, reject) => {
const rejectWithError = (err: any): void =>
reject(err instanceof Error ? err : new Error(err.message || err.toString()));
writeFile.on('error', rejectWithError);
writeFile.on('finish', () => resolve(fs.createReadStream(filePath)));
tarExtract.on('error', err => {
// tar-stream has a problem with the format of a few of the tar files
// dealt with here, which nevertheless are valid archives.
if (/unexpected end of data|invalid tar header/i.test(err.message))
console.error('Archive %s: %s', url, err.message);
else
reject(err);
});
zipPack.on('error', rejectWithError);
zipPack.on('warning', rejectWithError);
commandProc.on('error', rejectWithError);
commandProc.on('exit', err => err && reject(new Error(`${command} error: ${err}`)));
originalArchive.on('error', rejectWithError);
});
}
除非您有特定的处理需要完成,否则我会将流式处理留给节点或程序包。只需将整个流设置包装在一个承诺中。
如果您还流式传输 request/response,它可以通过管道传输到解压缩器中。然后 stdout
从解压器可以通过管道传输到归档流处理程序。
import fs from 'fs'
import { spawn } from 'child_process'
import needle from 'needle'
import tar from 'tar-stream'
import archiver from 'archiver'
export function tarLzToZip(url) {
return new Promise((resolve, reject) => {
// Setup streams
const res = needle.get(url)
const lzipProc = spawn('lzip', ['-dc'], { stdio: ['pipe','pipe',process.stderr] })
const tarExtract = tar.extract()
const zipPack = archiver('zip')
const writeFile = fs.createWriteStream('tardir.zip')
// Pipelines and processing
res.pipe(gzipProc.stdin)
lzipProc.stdout.pipe(tarExtract)
// tar -> zip (simple file name)
tarExtract.on('entry', function(header, stream, next) {
console.log('entry', header)
zipPack.append(stream, { name: header.name })
stream.on('end', () => next())
})
tarExtract.on('finish', function() {
zipPack.finalize()
})
zipPack.pipe(writeFile)
// Handle the things
writeFile.on('error', reject)
writeFile.on('close', () => console.log('write close'))
writeFile.on('finish', resolve(true))
tarExtract.on('error', reject)
zipPack.on('error', reject)
zipPack.on('warning', reject)
lzipProc.on('error', reject)
lzipProc.on('exit', code => {if (code !== 0) reject(new Error(`lzip ${code}`))})
res.on('error', reject)
res.on('done', ()=> console.log('request done', res.request.statusCode))
})
}
您可能希望更详细地记录错误和 stderr,因为单一承诺 reject
可以轻松隐藏多个流中实际发生的事情。