节点 |解压缩并只读取前 n 行文件
Nodejs | unzip and read only top n lines of files
我在 s3 中有一个 zip,其中包含数百个 csv 文件。我正在尝试流式传输文件并需要读取文件的前 n 行。我可以解压缩并阅读内容,但不确定在阅读完 n 行并继续处理其余文件后如何停止流。
到目前为止已尝试的代码
const aws = require("aws-sdk");
const s3 = new aws.S3();
const etl = require("etl");
const unzip = require("unzip-stream");
function setupMetadata() {
s3.getObject({Bucket: 'test', Key: 'CSV.zip'}).createReadStream()
.pipe(unzip.Parse())
.on('entry', function (entry) {
var i = 0;
var recordIdentifier;
entry
.pipe(etl.map(res => {
if (recordIdentifier) {
console.log(recordIdentifier);
console.log(i++);
// not sure about this. THis works but it only works for 1st file
// after that the program terminates. I need to do that for all the
// files in the zip
entry.destroy();
}
const data = res.toString("utf-8");
var array = data.toString().split("\n");
if(array.length >= 3) {
recordIdentifier = array[2].split(",")[0];
}
}))
})
}
setupMetadata();
我试过在阅读内容后调用 entry.autodrain()
但它不起作用。 entry.destroy()
有效,但程序在 that.I 想要对 zip 中的所有文件执行相同操作后终止。
任何帮助将不胜感激。
提前致谢。
我用 got 做了一些类似的事情。
我不得不调整代码,所以它没有经过测试,但这种通用方法对我有用。我没有使用过 etl,所以我不确定它如何处理文件块而不是整个文件。
const aws = require("aws-sdk");
const s3 = new aws.S3();
const got = require('got');
const etl = require("etl");
const getAllFilesAsUrls = async(bucket_name, folder_name) => {
const listParams = {
Bucket: bucket_name,
Delimiter: '/',
StartAfter: `${folder_name}/`
};
const data = await s3.listObjectsV2(listParams).promise();
const keys = data.Contents.map(object => object.key);
const urlsArray = [];
for (let key of keys) {
const params = {Bucket: bucket_name, Key: key};
let url = await s3.getSignedUrl('getObject', params).promise();
urlsArray.push(url);
}
return urlsArray;
}
workEachFileUrl = (url) => {
return new Promise((resolve, reject) => {
//if you're looking to limit the amount of data transferred then this was a good way of doing it
const gotstream = got.stream(url, { headers: { "accept-encoding": "gzip" } })
.on('data', (chunk) => {
//pause the stream as soon as we get the first chunk
gotstream.pause();
//do your work with the chunk, as long as etl can handle partial files, then resolve with the first few lines
//otherwise just use the 'response' event as you were with const parsedChunk;
resolve(parsedChunk);
})
.on("error", (err) => {
console.log(err);
reject(err);
});
});
}
runOperation = async (bucket_name, folder_name) => {
const records = [];
const urls = await getAllFilesAsUrls(bucket_name, folder_name);
for (let url of urls) {
let record = await workEachFileUrl(url);
records.push(record);
}
return records;
}
const completedRecords = await runOperation(bucket_name, folder_name);
我试过复制一个类似的案例。我希望你需要这样的东西:
const etl = require("etl");
const unzip = require("unzip-stream");
const fs = require('fs');
function readRecord(entry, entNum) {
let recordCount = 0;
let etlcsv = entry.pipe(etl.csv())
etlcsv.pipe(etl.map(d => {
console.log(d);
recordCount++;
if (recordCount > 2) {
etlcsv.destroy()
entry.autodrain();
}
}))
}
function setupMetadata() {
let entryCount = 0;
let ent = {}
let test = fs.createReadStream('csv.zip').pipe(unzip.Parse())
test.on('entry', function(entry) {
entryCount++;
console.log(entryCount)
readRecord(entry, entryCount)
})
}
setupMetadata()
检查此 REPL 以进行测试:https://repl.it/@sandeepp2016/PlushFatherlyMonotone
我在 s3 中有一个 zip,其中包含数百个 csv 文件。我正在尝试流式传输文件并需要读取文件的前 n 行。我可以解压缩并阅读内容,但不确定在阅读完 n 行并继续处理其余文件后如何停止流。
到目前为止已尝试的代码
const aws = require("aws-sdk");
const s3 = new aws.S3();
const etl = require("etl");
const unzip = require("unzip-stream");
function setupMetadata() {
s3.getObject({Bucket: 'test', Key: 'CSV.zip'}).createReadStream()
.pipe(unzip.Parse())
.on('entry', function (entry) {
var i = 0;
var recordIdentifier;
entry
.pipe(etl.map(res => {
if (recordIdentifier) {
console.log(recordIdentifier);
console.log(i++);
// not sure about this. THis works but it only works for 1st file
// after that the program terminates. I need to do that for all the
// files in the zip
entry.destroy();
}
const data = res.toString("utf-8");
var array = data.toString().split("\n");
if(array.length >= 3) {
recordIdentifier = array[2].split(",")[0];
}
}))
})
}
setupMetadata();
我试过在阅读内容后调用 entry.autodrain()
但它不起作用。 entry.destroy()
有效,但程序在 that.I 想要对 zip 中的所有文件执行相同操作后终止。
任何帮助将不胜感激。
提前致谢。
我用 got 做了一些类似的事情。
我不得不调整代码,所以它没有经过测试,但这种通用方法对我有用。我没有使用过 etl,所以我不确定它如何处理文件块而不是整个文件。
const aws = require("aws-sdk");
const s3 = new aws.S3();
const got = require('got');
const etl = require("etl");
const getAllFilesAsUrls = async(bucket_name, folder_name) => {
const listParams = {
Bucket: bucket_name,
Delimiter: '/',
StartAfter: `${folder_name}/`
};
const data = await s3.listObjectsV2(listParams).promise();
const keys = data.Contents.map(object => object.key);
const urlsArray = [];
for (let key of keys) {
const params = {Bucket: bucket_name, Key: key};
let url = await s3.getSignedUrl('getObject', params).promise();
urlsArray.push(url);
}
return urlsArray;
}
workEachFileUrl = (url) => {
return new Promise((resolve, reject) => {
//if you're looking to limit the amount of data transferred then this was a good way of doing it
const gotstream = got.stream(url, { headers: { "accept-encoding": "gzip" } })
.on('data', (chunk) => {
//pause the stream as soon as we get the first chunk
gotstream.pause();
//do your work with the chunk, as long as etl can handle partial files, then resolve with the first few lines
//otherwise just use the 'response' event as you were with const parsedChunk;
resolve(parsedChunk);
})
.on("error", (err) => {
console.log(err);
reject(err);
});
});
}
runOperation = async (bucket_name, folder_name) => {
const records = [];
const urls = await getAllFilesAsUrls(bucket_name, folder_name);
for (let url of urls) {
let record = await workEachFileUrl(url);
records.push(record);
}
return records;
}
const completedRecords = await runOperation(bucket_name, folder_name);
我试过复制一个类似的案例。我希望你需要这样的东西:
const etl = require("etl");
const unzip = require("unzip-stream");
const fs = require('fs');
function readRecord(entry, entNum) {
let recordCount = 0;
let etlcsv = entry.pipe(etl.csv())
etlcsv.pipe(etl.map(d => {
console.log(d);
recordCount++;
if (recordCount > 2) {
etlcsv.destroy()
entry.autodrain();
}
}))
}
function setupMetadata() {
let entryCount = 0;
let ent = {}
let test = fs.createReadStream('csv.zip').pipe(unzip.Parse())
test.on('entry', function(entry) {
entryCount++;
console.log(entryCount)
readRecord(entry, entryCount)
})
}
setupMetadata()
检查此 REPL 以进行测试:https://repl.it/@sandeepp2016/PlushFatherlyMonotone