Node.js: 如何逐个处理JSON 大文件的对象以避免堆限制错误

Node.js: How to process objects of a big JSON file one by one to avoid heap limit errors

我正在尝试使用工作线程处理数百个 json.gz 文件。 由于 3 个大文件(每个解压缩约 3gb),我在某些时候收到 js 堆限制错误。

我试图找到一种方法来逐个处理每个文件的对象,但我设法得到的只是一次处理所有文件的对象。

目前的工人代码如下:

    for (let gzFile of zippedFiles) {
        const gunzip = zlib.createGunzip()
        const parser = JSONStream.parse('offers.*')
        const readStream = fs.createReadStream(gzFile)

        readStream.pipe(gunzip).pipe(parser)
            .pipe(es.map((offers, callback) => { //offers contains all of the current file objects array 
                offers.forEach(rawProduct => {
                    let processedProduct = getProcessedProduct(rawProduct)
                    parentPort.postMessage({ processedProduct })
                })
                callback()
            })
                .on('error', (e) => {
                    console.trace(`Error while reading file`, e)
                })
                .on('end', () => {
                    idxCount++
                    if (idxCount === lastIdx) {
                        parentPort.postMessage({ completed: true })
                    }
                })
            )
    }

jsons 结构:

{
"offers":
    {
        "offer":
        [
            {}, // => the objects i wanna get one by one
            {},
            {}
        ]
    }
}

如何避免出现 js 堆限制错误? 谢谢!

Nidhim David 的建议正是我想要的。

这是工作代码:

for (let gzFile of zippedFiles) {
        const pipeline = chain([
            fs.createReadStream(gzFile),
            zlib.createGunzip(),
            parser(),
            pick({ filter: 'offers.offer' }), //getting the array of objects
            streamArray(),
        ]);

        pipeline.on('data', ({key, value}) => {
            //getting objects one by one and processing them
            const rawProduct = value;
            const processedProduct = getProcessedProduct(rawProduct);
            parentPort.postMessage({ processedProduct });
        })

        pipeline.on('end', () => {
            idxCount++;
            if (idxCount === lastIdx) {
                debug(`last zipped file, sending complete message`);
                parentPort.postMessage({ completed: true });
            }
        });
}