通过流将 CSV 文件转换为 lambda 中的多个 JSON 小文件

Convert a CSV file to multiple small JSON files in lambda through streams

我正在尝试编写一个 lambda 函数,该函数可以将一个巨大的 csv 文件流式传输到多个小 json 文件(比如一个 json 文件,包含 2000 行),并从一个 s3 存储桶流式传输到一个 s3 存储桶。我虽然在 256 MB 的有限 RAM 内存中有一些限制,例如 运行。

我可以通过将文件作为文件而不是像下面的流那样来做同样的事情。

但是由于内存限制,我需要在流中处理这个问题。有没有办法使用流来做同样的事情?

// transformationClass.js

const csv = require('csvtojson');

const extension = '.json';
class S3CsvToJson {
    static async perform(input, output, headers) {
        let jsonArray = null;
        const s3Object = await s3.getObject(); // getting the s3 object
        const csvString = s3Object.Body.toString('utf8');
        await csv({
            noheader: false,
        })
            .fromString(csvString)
            .then((csvRow) => {
                jsonArray = csvRow;
            });
        const fileNames = await S3CsvToJson.writeToFile(jsonArray, output);
        return { files: fileNames };
    }

    static async writeToFile(jsonArray, output) {
        const minNumber = 0;
        const maxNumber = 1999;
        const fileNames = [];
        let outFile;
        if (jsonArray && Array.isArray(jsonArray)) {
            let fileIterator = 1;
            while (jsonArray.length) {
                outFile = `${output.key}-${fileIterator}${extension}`;
                await // s3.putObject(). writing to s3
              .putObject(
                    outFile,
                    output.bucketName,
                    JSON.stringify(jsonArray.splice(minNumber, maxNumber)),
                );
                console.log('rows left :', jsonArray.length);
                fileNames.push(outFile);
                fileIterator += 1;
            }
        }
        return fileNames;
    }
}

module.exports = S3CsvToJson;

这里是处理函数

// handler.js
module.exports.perform = async (event, context, callback) => {

    context.callbackWaitsForEmptyEventLoop = false;
    await s3CsvToJson.perform(event.input, event.output, event.headerMapping)
        .then((result) => callback(null, result));
    console.log('leaving - ', Date.now());
};

提前致谢!!

经历了很多事情后,我终于得出了一个方法来完成它。

我要做的是,将整个过程包装成一个承诺,然后 return 它。我从 s3 创建了一个读取流,将其转发给解析器,然后再转发给写入流。 我想在这里分享它,所以它可能对其他人有用。也开放任何更好的优化解决方案。

// transformationClass.js

const csv = require('fast-csv');
const { Transform, pipeline } = require('stream');

const extension = '.json';
class S3CsvToJson {
  static async perform(input, output, headers) {
    console.log(input, output, headers);
    const threshold = 2000;
    try {
      const promiseTransformData = () => new Promise((resolve, reject) => {
        try {
          let jsonOutput = [];
          let fileCounter = 0;
          const fileNames = [];
          const writableStream = new Transform({
            objectMode: true,
            autoDestroy: true,
            async transform(data, _, next) {
              if (jsonOutput.length === threshold) {
                fileCounter += 1;
                const fileUpload = new Promise((resolveWriter) => {
                  s3
                    .putObject(
                      `${output.key}-${fileCounter}${extension}`,
                      output.bucketName,
                      JSON.stringify(jsonOutput),
                    )
                    .then(() => resolveWriter());
                });
                await fileUpload;
                fileNames.push(`${output.key}-${fileCounter}${extension}`);
                jsonOutput = [];
              }
              jsonOutput.push(data);
              next();
            },
          });
          const readFileStream = s3.getReadStream(input.key, input.bucketName);
          pipeline(
            readFileStream,
            csv.parse({ headers: true }),
            writableStream,
            (error) => {
              // if (err) throw new Error('Pipeline error');
              if (error) {
                console.error(`Error occurred in pipeline - ${error}`);
                resolve({ errorMessage: error.message });
              }
            },
          );
          writableStream.on('finish', async () => {
            if (jsonOutput.length) {
              fileCounter += 1;
              const fileUpload = new Promise((resolveWriter) => {
                s3
                  .putObject(
                    `${output.key}-${fileCounter}${extension}`,
                    output.bucketName,
                    JSON.stringify(jsonOutput),
                  )
                  .then(() => resolveWriter());
              });
              await fileUpload;
              fileNames.push(`${output.key}-${fileCounter}${extension}`);
              jsonOutput = [];
            }
            console.log({ status: 'Success', files: fileNames });
            resolve({ status: 'Success', files: fileNames });
          });
        } catch (error) {
          console.error(`Error occurred while transformation - ${error}`);
          resolve({ errorMessage: error ? error.message : null });
        }
      });
      return await promiseTransformData();
    } catch (error) {
      return error.message || error;
    }
  }
}

module.exports = S3CsvToJson;

对于处理程序,我这样调用 S3CsvToJson

// handler.js
module.exports.perform = async (event, context, callback) => {
    context.callbackWaitsForEmptyEventLoop = false;
    await s3CsvToJson.perform(event.input, event.output, event.headerMapping)
        .then((result) => callback(null, result))
        .catch((error) => callback(error));
};

希望对您有所帮助。谢谢!