Node.js 流:流出现冻结
Node.js stream: stream appears freezed
我想创建一个简单的 csv 解析器(使用 csv 模块)并在文件不存在时处理错误。
如果我注释掉睡眠方法,代码到达最后(并写出一些错误)。
我想念什么?在我的真实例子中,我确实需要在那里做一些等待的任务。
import fs from 'fs';
import * as csv from 'csv';
import { finished } from 'stream';
type TabularFileType = 'csv' | 'tsv';
const DELIMITER_BY_TYPE: Record<TabularFileType, string> = {
csv: ',',
tsv: '\t',
};
/**
* Creates an (async) iterable from a tabular file at `path`
* @param type type of file, could be `'csv'` or `'tsv'`
* @param path path of the file to be loaded
* @param columns an array of string defining the property names of the columns
* @param options can specify some optional things,
* like encoding (will decode using iconv-lite)
* and skip some header lines (with the help of `fromLines`)
*/
export function tabularStream<T>(
type: TabularFileType,
path: string,
columns: string[],
options?: { encoding?: string; fromLine?: number }
): csv.parser.Parser {
console.info('About to read file from "%s"', path);
const stream = fs.createReadStream(path);
const result = stream.pipe(
csv.parse({
delimiter: DELIMITER_BY_TYPE[type],
fromLine: options?.fromLine,
columns,
})
);
finished(stream, (err) => {
if (err) {
result.destroy(new Error('whoops'));
}
});
console.log("About to return");
return result;
}
const sleep = (millis: number) => new Promise(resolve => setTimeout(resolve, millis));
(async () => {
console.log('wait a bit');
await sleep(100);
console.log('waited');
const p = tabularStream('csv', 'nonexistent', []);
// If you comment it out, ended will appear on output
await sleep(100);
console.log('after sleep') ;
try {
for await(const record of p) {
console.log('Got a record', record);
}
} catch (e) {
console.log('Some error!', e);
} finally {
console.log('Finally!');
}
})();
与直觉相反,每个流中间件都需要自己的 "error"
侦听器来正确处理流错误。
stream
变量在 OP 的代码片段中缺少错误侦听器。因此,读取 non-existing 文件时会出现未处理的(异步)错误。
export function tabularStream(...) {
const stream = fs.createReadStream(path); // <-- stream needs an "error" listener
const result = stream.pipe(csv.parse(...));
...
}
(Un)注释“睡眠”步骤会通过尽早评估异步、未处理的错误来更改代码段的执行方式。
nodejs 提供了一个 stream.pipeline()
函数来处理管道数据和错误,而不是必须在所有流中间件上包含 "error"
侦听器。
export function tabularStream(...) {
const result: stream.Readable = stream.pipeline(
fs.createReadStream(path),
csv.parse(...),
error => (error && result.emit("error", error))
);
...
return result;
} // remember to add `error` listener to the returned variable
我想创建一个简单的 csv 解析器(使用 csv 模块)并在文件不存在时处理错误。
如果我注释掉睡眠方法,代码到达最后(并写出一些错误)。
我想念什么?在我的真实例子中,我确实需要在那里做一些等待的任务。
import fs from 'fs';
import * as csv from 'csv';
import { finished } from 'stream';
type TabularFileType = 'csv' | 'tsv';
const DELIMITER_BY_TYPE: Record<TabularFileType, string> = {
csv: ',',
tsv: '\t',
};
/**
* Creates an (async) iterable from a tabular file at `path`
* @param type type of file, could be `'csv'` or `'tsv'`
* @param path path of the file to be loaded
* @param columns an array of string defining the property names of the columns
* @param options can specify some optional things,
* like encoding (will decode using iconv-lite)
* and skip some header lines (with the help of `fromLines`)
*/
export function tabularStream<T>(
type: TabularFileType,
path: string,
columns: string[],
options?: { encoding?: string; fromLine?: number }
): csv.parser.Parser {
console.info('About to read file from "%s"', path);
const stream = fs.createReadStream(path);
const result = stream.pipe(
csv.parse({
delimiter: DELIMITER_BY_TYPE[type],
fromLine: options?.fromLine,
columns,
})
);
finished(stream, (err) => {
if (err) {
result.destroy(new Error('whoops'));
}
});
console.log("About to return");
return result;
}
const sleep = (millis: number) => new Promise(resolve => setTimeout(resolve, millis));
(async () => {
console.log('wait a bit');
await sleep(100);
console.log('waited');
const p = tabularStream('csv', 'nonexistent', []);
// If you comment it out, ended will appear on output
await sleep(100);
console.log('after sleep') ;
try {
for await(const record of p) {
console.log('Got a record', record);
}
} catch (e) {
console.log('Some error!', e);
} finally {
console.log('Finally!');
}
})();
与直觉相反,每个流中间件都需要自己的 "error"
侦听器来正确处理流错误。
stream
变量在 OP 的代码片段中缺少错误侦听器。因此,读取 non-existing 文件时会出现未处理的(异步)错误。
export function tabularStream(...) {
const stream = fs.createReadStream(path); // <-- stream needs an "error" listener
const result = stream.pipe(csv.parse(...));
...
}
(Un)注释“睡眠”步骤会通过尽早评估异步、未处理的错误来更改代码段的执行方式。
nodejs 提供了一个 stream.pipeline()
函数来处理管道数据和错误,而不是必须在所有流中间件上包含 "error"
侦听器。
export function tabularStream(...) {
const result: stream.Readable = stream.pipeline(
fs.createReadStream(path),
csv.parse(...),
error => (error && result.emit("error", error))
);
...
return result;
} // remember to add `error` listener to the returned variable