节点 |解压缩并只读取前 n 行文件

Nodejs | unzip and read only top n lines of files

我在 s3 中有一个 zip,其中包含数百个 csv 文件。我正在尝试流式传输文件并需要读取文件的前 n 行。我可以解压缩并阅读内容,但不确定在阅读完 n 行并继续处理其余文件后如何停止流。

到目前为止已尝试的代码

const aws = require("aws-sdk");
const s3 = new aws.S3();
const etl = require("etl");
const unzip = require("unzip-stream");

function setupMetadata() {
  s3.getObject({Bucket: 'test', Key: 'CSV.zip'}).createReadStream()
    .pipe(unzip.Parse())
    .on('entry', function (entry) {
      var i = 0;
      var recordIdentifier;
      entry
      .pipe(etl.map(res => {
        if (recordIdentifier) {
          console.log(recordIdentifier);
          console.log(i++);
          // not sure about this. THis works but it only works for 1st file
          // after that the program terminates. I need to do that for all the
          // files in the zip
          entry.destroy(); 
        }
        const data = res.toString("utf-8");
        var array = data.toString().split("\n");
        if(array.length >= 3) {
          recordIdentifier = array[2].split(",")[0];
        }
      }))
    })
}

setupMetadata();

我试过在阅读内容后调用 entry.autodrain() 但它不起作用。 entry.destroy() 有效,但程序在 that.I 想要对 zip 中的所有文件执行相同操作后终止。

任何帮助将不胜感激。

提前致谢。

我用 got 做了一些类似的事情。

我不得不调整代码,所以它没有经过测试,但这种通用方法对我有用。我没有使用过 etl,所以我不确定它如何处理文件块而不是整个文件。

const aws = require("aws-sdk");
const s3 = new aws.S3();
const got = require('got');
const etl = require("etl");


const getAllFilesAsUrls = async(bucket_name, folder_name) => {

    const listParams = {
        Bucket: bucket_name,
        Delimiter: '/',
        StartAfter: `${folder_name}/`
    };

    const data = await s3.listObjectsV2(listParams).promise();

    const keys = data.Contents.map(object => object.key);

    const urlsArray = [];

    for (let key of keys) {

        const params = {Bucket: bucket_name, Key: key};
        let url = await s3.getSignedUrl('getObject', params).promise();
        urlsArray.push(url);

    }

    return urlsArray;


}

workEachFileUrl = (url) => {

    return new Promise((resolve, reject) => {

        //if you're looking to limit the amount of data transferred then this was a good way of doing it
        const gotstream = got.stream(url, { headers: { "accept-encoding": "gzip" } })
            .on('data', (chunk) => {
                //pause the stream as soon as we get the first chunk
                gotstream.pause();
                //do your work with the chunk, as long as etl can handle partial files, then resolve with the first few lines
                //otherwise just use the 'response' event as you were with                     const parsedChunk;
                resolve(parsedChunk);
            })
            .on("error", (err) => {    
                console.log(err);
                reject(err);
            });
    });

}

runOperation = async (bucket_name, folder_name) => {

    const records = [];

    const urls = await getAllFilesAsUrls(bucket_name, folder_name);

    for (let url of urls) {

        let record = await workEachFileUrl(url);
        records.push(record);

    }

    return records;

}

const completedRecords = await runOperation(bucket_name, folder_name);

我试过复制一个类似的案例。我希望你需要这样的东西:

const etl = require("etl");
const unzip = require("unzip-stream");
const fs = require('fs');

function readRecord(entry, entNum) {

    let recordCount = 0;
    let etlcsv = entry.pipe(etl.csv())
    etlcsv.pipe(etl.map(d => {
        console.log(d);
        recordCount++;
        if (recordCount > 2) {
            etlcsv.destroy()
            entry.autodrain();
        }
    }))
}

function setupMetadata() {
    let entryCount = 0;
    let ent = {}

    let test = fs.createReadStream('csv.zip').pipe(unzip.Parse())
    test.on('entry', function(entry) {
        entryCount++;
        console.log(entryCount)
        readRecord(entry, entryCount)
    })

}

setupMetadata()

检查此 REPL 以进行测试:https://repl.it/@sandeepp2016/PlushFatherlyMonotone