使用 fast-csv 将 s3 流式传输到 dynamodb:并非所有数据都已插入

stream s3 to dynamodb with fast-csv : not all data inserted

当 csv 文件上传到我的 s3 存储桶时,我的 lambda 将被触发以将我的数据插入 DynamoDB。 我需要一个流,因为文件太大,无法作为完整对象下载。

const batchWrite = async (clientDynamoDB, itemsToProcess) => {
    const ri = {};
    ri[TABLE_DYNAMO] = itemsToProcess.map((itm) => toPutRequest(itm));
    const params = { RequestItems: ri };
    await clientDynamoDB.batchWriteItem(params).promise();
};

function runStreamPromiseAsync(stream, clientDynamoDB) {
    return new Promise((resolve, reject) => {
        const sizeChunk = 25;
        let itemsToProcess = [];

        stream
            .pipe(fastCsv.parse({headers: Object.keys(schemaGeData), trim: true}))
            .on("data", (row) => {
                stream.pause();
                itemsToProcess.push(row);

                if (itemsToProcess.length === sizeChunk) {
                    batchWrite(clientDynamoDB, itemsToProcess).finally(() => {
                        stream.resume();
                    });
                    itemsToProcess = [];
                }
            })
            .on("error", (err) => {
                console.log(err);
                reject("Error");
            })
            .on("end", () => {
                stream.pause();
                console.log("end");
                batchWrite(clientDynamoDB, itemsToProcess).finally(() => {
                    resolve("OK");
                });
            });
    });
}

module.exports.main = async (event, context, callback) => {

    context.callbackWaitsForEmptyEventLoop = false;

    const AWS = require('aws-sdk');
    const s3 = new AWS.S3();

    const object = event.Records[0].s3;
    const bucket = object.bucket.name;
    const file = object.object.key;

    const agent = new https.Agent({
        keepAlive: true
    });
    const client = new AWS.DynamoDB({
        httpOptions: {
            agent
        }
    });

    try {
        //get Stream csv data
        const stream = s3
            .getObject({
                Bucket: bucket,
                Key: file
            })
            .createReadStream()
            .on('error', (e) => {
                console.log(e);
            });

        await runStreamPromiseAsync(stream, client);
    } catch (e) {
        console.log(e);
    }
};

当我的文件是 1000 行时,所有内容都被插入但是当我有 5000 行时,我的函数只插入大约 3000 行并且这个数字是随机的...有时更多有时更少..

所以我想了解我在这里遗漏了什么?

我也读过这个 article 但老实说,即使你暂停第二个流,第一个仍然是 运行.. 所以如果有人对如何做到这一点有任何想法,将不胜感激!

谢谢

我找到了为什么它没有被完全处理,这是因为 batchWriteItem 的回调可以 return 取消处理项目。所以我改变了函数 batchWriterunPromiseStreamAsync 一点点,因为我可能没有从 itemsToProcess.

处理所有项目

这里是完整的代码:

const batchWrite = (client, itemsToProcess) => {
    const ri = {};
    ri[TABLE_DYNAMO] = itemsToProcess.map((itm) => toPutRequest(itm));
    const items = { RequestItems: ri };
    const processItemsCallback = function(err, data) {
        return new Promise((resolve, reject) => {
            if(!data || data.length === 0){
                return resolve();
            }
            if(err){
                return reject(err);
            }
            let params = {};
            params.RequestItems = data.UnprocessedItems;
            return client.batchWriteItem(params, processItemsCallback);
        });
    };
    return client.batchWriteItem(items, processItemsCallback );
};

function runStreamPromiseAsync(stream, clientDynamoDB) {
    return new Promise((resolve, reject) => {
        const sizeChunk = 25;
        let itemsToProcess = [];
        let arrayPromise = [];

        stream
            .pipe(fastCsv.parse({headers: Object.keys(schemaGeData), trim: true}))
            .on("error", (err) => {
                console.log(err);
                reject("Error");
            })
            .on('data', data => {
                itemsToProcess.push(data);
                if(itemsToProcess.length === sizeChunk){
                    arrayPromise.push(batchWrite(clientDynamoDB, itemsToProcess));
                    itemsToProcess = [];
                }
            })
            .on('end', () => {
                if(itemsToProcess.length !== 0){
                    arrayPromise.push(batchWrite(clientDynamoDB, itemsToProcess));
                }
                resolve(Promise.all(arrayPromise).catch(e => {
                    reject(e)
                }));
            });
    });
}