Node.Js 流管道上的异步迭代器
Node.Js async iterator over stream pipeline
我有以下管道:
readFile > parseCSV > otherProcess
readFile
是标准 Node.Js createReadStream
,而 parseCSV 是 Node.js 转换流(模块 link)。
我想逐行遍历 csv 文件并同时处理一行。因此,流和异步迭代器是绝配。
我有以下代码可以正常工作:
async function* readByLine(path, opt) {
const readFileStream = fs.createReadStream(path);
const csvParser = parse(opt);
const parser = readFileStream.pipe(csvParser);
for await (const record of parser) {
yield record;
}
}
我对 Node.Js 流很陌生,但我从许多来源了解到,模块 stream.pipeline
优于读取流的 .pipe
方法。
如何更改上面的代码以使用 stream.pipeline
(实际上是从 util.promisify(pipeline)
获得的承诺版本)并同时产生一行?
您实际上应该能够将 fs-stream 和 parser-stream 都传递给 pipeline()
并在 parser-stream 上使用您的异步迭代器:
const fs = require('fs');
const parse = require('csv-parse');
const stream = require('stream')
const util = require('util');
const pipeline = util.promisify(stream.pipeline);
async function* readByLine(path, opt) {
const readFileStream = fs.createReadStream(path);
const csvParser = parse(opt);
await pipeline(readFileStream, csvParser);
for await (const record of csvParser) {
yield record;
}
}
添加到@eol 的回答中,我建议存储承诺并在异步迭代后await
对其进行处理。
const fs = require('fs');
const parse = require('csv-parse');
const stream = require('stream');
async function* readByLine(path, opt) {
const readFileStream = fs.createReadStream(path);
const csvParser = parse(opt);
const promise = stream.promises.pipeline(readFileStream, csvParser);
for await (const record of csvParser) {
yield record;
}
await promise;
}
通过在循环之前调用 await pipeline(...)
,它将消耗整个流,然后您才能从缓冲区中剩余的任何内容进行迭代,这在小流上偶然起作用,但很可能在更大的流上中断(或infinite/lazy) 流。
根据我们 await
.
的位置,等效的回调可能会更清楚发生了什么
// await before iterating
stream.pipeline(a, b, err => {
if (err) return callback(err)
for await (const record of b) {
// process record
}
callback()
}
// await after iterating
for await (const record of stream.pipeline(a, b, callback)) {
// process record
}
我有以下管道:
readFile > parseCSV > otherProcess
readFile
是标准 Node.Js createReadStream
,而 parseCSV 是 Node.js 转换流(模块 link)。
我想逐行遍历 csv 文件并同时处理一行。因此,流和异步迭代器是绝配。
我有以下代码可以正常工作:
async function* readByLine(path, opt) {
const readFileStream = fs.createReadStream(path);
const csvParser = parse(opt);
const parser = readFileStream.pipe(csvParser);
for await (const record of parser) {
yield record;
}
}
我对 Node.Js 流很陌生,但我从许多来源了解到,模块 stream.pipeline
优于读取流的 .pipe
方法。
如何更改上面的代码以使用 stream.pipeline
(实际上是从 util.promisify(pipeline)
获得的承诺版本)并同时产生一行?
您实际上应该能够将 fs-stream 和 parser-stream 都传递给 pipeline()
并在 parser-stream 上使用您的异步迭代器:
const fs = require('fs');
const parse = require('csv-parse');
const stream = require('stream')
const util = require('util');
const pipeline = util.promisify(stream.pipeline);
async function* readByLine(path, opt) {
const readFileStream = fs.createReadStream(path);
const csvParser = parse(opt);
await pipeline(readFileStream, csvParser);
for await (const record of csvParser) {
yield record;
}
}
添加到@eol 的回答中,我建议存储承诺并在异步迭代后await
对其进行处理。
const fs = require('fs');
const parse = require('csv-parse');
const stream = require('stream');
async function* readByLine(path, opt) {
const readFileStream = fs.createReadStream(path);
const csvParser = parse(opt);
const promise = stream.promises.pipeline(readFileStream, csvParser);
for await (const record of csvParser) {
yield record;
}
await promise;
}
通过在循环之前调用 await pipeline(...)
,它将消耗整个流,然后您才能从缓冲区中剩余的任何内容进行迭代,这在小流上偶然起作用,但很可能在更大的流上中断(或infinite/lazy) 流。
根据我们 await
.
// await before iterating
stream.pipeline(a, b, err => {
if (err) return callback(err)
for await (const record of b) {
// process record
}
callback()
}
// await after iterating
for await (const record of stream.pipeline(a, b, callback)) {
// process record
}