AWS Lambda 和节点:在流式传输时写入数据 - 过早结束并且数据丢失

AWS Lambda & Node: Write data while streaming - ends prematurely and data is missing

我有一个由写入 S3 存储桶触发的 Lambda 函数。它读取写入存储桶的 JSON 文件,解析出各个记录,并将它们写入数据库。

问题是;我不确定我做错了什么,因为流结束并且 Lambda 在写入所有数据之前退出。

我在我的可读流上 "flowing mode",在数据库写入期间我在 pausing/resuming。根据文档,这应该可以解决问题,但它没有按预期工作。

Lambda 处理程序:

exports.handler = async (event, context) => {
    let result = false;
    try {
        result = await parseData(event);
    } catch (e) {
        console.error(e);
    }
    return result;
};

承诺:

const StreamArray = require("stream-json/streamers/StreamArray");

async parseData(event) {
    try {
        let objectStream = s3.getObject(params).createReadStream();
        const streamParser = StreamArray.withParser();
        return new Promise((resolve, reject) => {
            objectStream.pipe(streamParser).on("data", async streamData => {
                objectStream.pause();
                let result = await writeData(streamData);
                objectStream.resume();
            }).on("finish", () => {
                console.log("STREAM FINISH!");
                resolve(true);
            }).on("error", e => {
                console.error("Stream error:", e);
                reject(e);
            });
        });
    } catch (e) {
        console.error(e);
    }
}

通过简单地将 stream-json 换成 JSONStream 就可以正常工作,无论如何,JSONStream 是一个使用更广泛的包。现在效果很好!

const JSONStream = require("JSONStream");

async parseData(event) {
    try {
        let objectStream = s3.getObject(params).createReadStream();
        const streamParser = JSONStream.parse("*");
        return new Promise((resolve, reject) => {
            objectStream.pipe(streamParser).on("data", async streamData => {
                streamParser.pause();
                let result = await writeData(streamData);
                streamParser.resume();
            }).on("finish", () => {
                console.log("STREAM FINISH!");
                resolve(true);
            }).on("error", e => {
                console.error("Stream error:", e);
                reject(e);
            });
        });
    } catch (e) {
        console.error(e);
    }
}