Node.js 流:流出现冻结

Node.js stream: stream appears freezed

我想创建一个简单的 csv 解析器(使用 csv 模块)并在文件不存在时处理错误。

如果我注释掉睡眠方法,代码到达最后(并写出一些错误)。

我想念什么?在我的真实例子中,我确实需要在那里做一些等待的任务。

import fs from 'fs';
import * as csv from 'csv';
import { finished } from 'stream';

type TabularFileType = 'csv' | 'tsv';

const DELIMITER_BY_TYPE: Record<TabularFileType, string> = {
    csv: ',',
    tsv: '\t',
};

/**
 * Creates an (async) iterable from a tabular file at `path`
 * @param type type of file, could be `'csv'` or `'tsv'`
 * @param path path of the file to be loaded
 * @param columns an array of string defining the property names of the columns
 * @param options can specify some optional things,
 * like encoding (will decode using iconv-lite)
 * and skip some header lines (with the help of `fromLines`)
 */
export function tabularStream<T>(
    type: TabularFileType,
    path: string,
    columns: string[],
    options?: { encoding?: string; fromLine?: number }
): csv.parser.Parser {
    console.info('About to read file from "%s"', path);
    const stream = fs.createReadStream(path);

    const result = stream.pipe(
        csv.parse({
            delimiter: DELIMITER_BY_TYPE[type],
            fromLine: options?.fromLine,
            columns,
        })
    );

    finished(stream, (err) => {
        if (err) {
            result.destroy(new Error('whoops'));
        }
    });

    console.log("About to return");
    return result;
}

const sleep = (millis: number) => new Promise(resolve => setTimeout(resolve, millis));

(async () => {
    console.log('wait a bit');
    await sleep(100);
    console.log('waited');
    const p = tabularStream('csv', 'nonexistent', []);
    // If you comment it out, ended will appear on output
    await sleep(100);
    console.log('after sleep')  ;
    try {
        for await(const record of p) {
            console.log('Got a record', record);
        }
    } catch (e) {
        console.log('Some error!', e);
    } finally {
        console.log('Finally!');
    }
})();

与直觉相反,每个流中间件都需要自己的 "error" 侦听器来正确处理流错误。

stream 变量在 OP 的代码片段中缺少错误侦听器。因此,读取 non-existing 文件时会出现未处理的(异步)错误。

export function tabularStream(...) {
    const stream = fs.createReadStream(path); // <-- stream needs an "error" listener
    const result = stream.pipe(csv.parse(...));
    ...
}

(Un)注释“睡眠”步骤会通过尽早评估异步、未处理的错误来更改代码段的执行方式。

nodejs 提供了一个 stream.pipeline() 函数来处理管道数据和错误,而不是必须在所有流中间件上包含 "error" 侦听器。

export function tabularStream(...) {
  const result: stream.Readable = stream.pipeline(
    fs.createReadStream(path),
    csv.parse(...),
    error => (error && result.emit("error", error))
  );
  ...
  return result;
} // remember to add `error` listener to the returned variable