当通过 es.map() 和 JSONStream.stringify() 将 JSONStream.parsed() 数据传输到文件流时,节点堆耗尽
node heap exhausted when piping JSONStream.parsed() data through es.map() and JSONStream.stringify() to file stream
我正在尝试通过 JSONStream.parse() 将输入流(从巨大的 GeoJSON 文件创建)通过管道传输以将流分解为对象,然后通过 event-stream.map() 允许我转换对象,然后通过 JSONStream.stringify() 从中创建一个字符串,最后到一个可写的输出流。随着进程 运行s,我可以看到节点的内存占用量继续增长,直到它最终耗尽堆。这是重现问题的最简单的脚本 (test.js):
const fs = require("fs")
const es = require("event-stream")
const js = require("JSONStream")
out = fs.createWriteStream("/dev/null")
process.stdin
.pipe(js.parse("features.*"))
.pipe(es.map( function(data, cb) {
cb(null, data);
return;
} ))
.pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
.pipe(out)
一个小 bash 脚本 (barf.sh) 将源源不断的 JSON 喷入节点的 process.stdin 将导致节点的堆逐渐增长:
#!/bin/bash
echo '{"type":"FeatureCollection","features":['
while :
do
echo '{"type":"Feature","properties":{"name":"A Street"}, "geometry":{"type":"LineString"} },'
done
通过 运行 将其设置为:
barf.sh | node test.js
有几个奇怪的方法可以回避这个问题:
- 删除 fs.createWriteStream() 并将最后一个管道阶段从“.pipe(out)”更改为“.pipe(process.stdout)”,然后将管道节点的标准输出更改为 /dev/null
- 将异步 es.map() 更改为同步 es.mapSync()
前面两个操作中的任何一个都将允许脚本永远运行,节点的内存占用量低且不变。我在具有 8GB 内存的八核机器上使用节点 v6.3.1、事件流 v3.3.4 和 JSONStream 1.1.4 运行ning Ubuntu 16.04.
我希望有人能帮助我纠正我确定是我的明显错误。
JSONStream 不是 streams2 流,因此它不支持背压控制。 (关于streams2here有简要总结。)
这意味着数据将在 data
事件中从 parse
流中出来,并且无论消费流是否已为它们准备好,该流都将继续输出它们.如果管道中某处的读写速度存在差异,则会出现缓冲 - 这就是您所看到的。
您的 barf.sh
线束看到通过 stdin
注入的功能。相反,如果您正在读取一个大文件,您应该能够通过暂停文件的读取流来管理流程。因此,如果您要在 map
回调中插入一些 pause/resume
逻辑,您应该能够让它处理大量文件;只是需要更长的时间。我会尝试这样的事情:
let in = fs.createReadStream("/some/massive/file");
let out = fs.createWriteStream("/dev/null");
in
.pipe(js.parse("features.*"))
.pipe(es.map(function(data, cb) {
// This is just an example; a 10-millisecond wait per feature would be very slow.
if (!in.isPaused()) {
in.pause();
global.setTimeout(function () { in.resume(); }, 10);
}
cb(null, data);
return;
}))
.pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
.pipe(out);
顺便说一下,使用 mapSync
对我的电脑(又旧又慢)几乎没有影响。但是,除非您要在 map
中执行一些异步操作,否则我会选择 mapSync
。
我正在尝试通过 JSONStream.parse() 将输入流(从巨大的 GeoJSON 文件创建)通过管道传输以将流分解为对象,然后通过 event-stream.map() 允许我转换对象,然后通过 JSONStream.stringify() 从中创建一个字符串,最后到一个可写的输出流。随着进程 运行s,我可以看到节点的内存占用量继续增长,直到它最终耗尽堆。这是重现问题的最简单的脚本 (test.js):
const fs = require("fs")
const es = require("event-stream")
const js = require("JSONStream")
out = fs.createWriteStream("/dev/null")
process.stdin
.pipe(js.parse("features.*"))
.pipe(es.map( function(data, cb) {
cb(null, data);
return;
} ))
.pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
.pipe(out)
一个小 bash 脚本 (barf.sh) 将源源不断的 JSON 喷入节点的 process.stdin 将导致节点的堆逐渐增长:
#!/bin/bash
echo '{"type":"FeatureCollection","features":['
while :
do
echo '{"type":"Feature","properties":{"name":"A Street"}, "geometry":{"type":"LineString"} },'
done
通过 运行 将其设置为:
barf.sh | node test.js
有几个奇怪的方法可以回避这个问题:
- 删除 fs.createWriteStream() 并将最后一个管道阶段从“.pipe(out)”更改为“.pipe(process.stdout)”,然后将管道节点的标准输出更改为 /dev/null
- 将异步 es.map() 更改为同步 es.mapSync()
前面两个操作中的任何一个都将允许脚本永远运行,节点的内存占用量低且不变。我在具有 8GB 内存的八核机器上使用节点 v6.3.1、事件流 v3.3.4 和 JSONStream 1.1.4 运行ning Ubuntu 16.04.
我希望有人能帮助我纠正我确定是我的明显错误。
JSONStream 不是 streams2 流,因此它不支持背压控制。 (关于streams2here有简要总结。)
这意味着数据将在 data
事件中从 parse
流中出来,并且无论消费流是否已为它们准备好,该流都将继续输出它们.如果管道中某处的读写速度存在差异,则会出现缓冲 - 这就是您所看到的。
您的 barf.sh
线束看到通过 stdin
注入的功能。相反,如果您正在读取一个大文件,您应该能够通过暂停文件的读取流来管理流程。因此,如果您要在 map
回调中插入一些 pause/resume
逻辑,您应该能够让它处理大量文件;只是需要更长的时间。我会尝试这样的事情:
let in = fs.createReadStream("/some/massive/file");
let out = fs.createWriteStream("/dev/null");
in
.pipe(js.parse("features.*"))
.pipe(es.map(function(data, cb) {
// This is just an example; a 10-millisecond wait per feature would be very slow.
if (!in.isPaused()) {
in.pause();
global.setTimeout(function () { in.resume(); }, 10);
}
cb(null, data);
return;
}))
.pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
.pipe(out);
顺便说一下,使用 mapSync
对我的电脑(又旧又慢)几乎没有影响。但是,除非您要在 map
中执行一些异步操作,否则我会选择 mapSync
。