在 azure 函数中处理 1GB 大小的管道分隔文本文件的最佳方法
Best way to process a 1GB size pipe separated text file in an azure function
我在 blob 存储中有一个 1GB 的文本文件,我目前正在构建一个函数,它将获取每一行的内容并将它们发送到外部 API。这个外部 API 限制为每秒 200 个请求,而且由于我的订阅计划,我的功能也被限制在十分钟的运行时间。
我正在考虑使用持久函数通过分块读取文件来处理此用例。我已经编写了以下代码来测试将代码读入卡盘。
module.exports = async function (context, myTimer) {
context.log("Trigger fired");
if (myTimer.isPastDue)
{
context.log('JavaScript is running late!');
}
const containerClient = getContainerClient(process.env.AzureWebJobsStorage, 'location');
for await (const blob of containerClient.listBlobsFlat()) {
if(blob.name !== 'test.txt') {
continue
}
const blobClient = containerClient.getBlobClient(blob.name);
const downloadBlockBlobResponse = await blobClient.download(0, (1024 * 1024));
try{
const blobContent = (
await streamToBuffer(downloadBlockBlobResponse.readableStreamBody)
).toString();
context.log(blobContent);
}
catch (error) {
context.log(`ERROR: issues reading the following file - ${blob.name}, due to the following error : ${error.message}`);
}
}
context.log("Trigger completed");
};
async function streamToBuffer(readableStream) {
return new Promise((resolve, reject) => {
const chunks = [];
readableStream.on("data", (data) => {
chunks.push(data instanceof Buffer ? data : Buffer.from(data));
});
readableStream.on("end", () => {
resolve(Buffer.concat(chunks));
});
readableStream.on("error", reject);
});
}
但是,当我读取单个 MB 的文本文件时,块在一行的中间而不是末尾结束,这意味着我无法将最后一行发送到 API。
有人知道我如何保证数据块始终包含完整行吗?或者有更好的方法来处理 Azure 中的这个用例吗?
文件内容将如下所示
Test|TestTwo|test@test.com|美国|纽约|1234|主要|街道|12347|711|1973-09-09
除非您事先知道每行的行长(例如每行恰好 128 字节长或类似长度),否则无法始终读取到完美的行边界。
相反,您必须连续少量阅读,直到读到一行的末尾,然后在临时存储中标记下一次继续阅读的位置,以继续阅读下一行。
例如,如果典型的一行有 100 个字节长,而您以部分行结尾(您几乎总是会这样),然后再读取 250 个字节左右,直到您找到该行的结尾当前。然后,计算该行结束的文件位置并将其存储以供下一次通过。
我在 blob 存储中有一个 1GB 的文本文件,我目前正在构建一个函数,它将获取每一行的内容并将它们发送到外部 API。这个外部 API 限制为每秒 200 个请求,而且由于我的订阅计划,我的功能也被限制在十分钟的运行时间。
我正在考虑使用持久函数通过分块读取文件来处理此用例。我已经编写了以下代码来测试将代码读入卡盘。
module.exports = async function (context, myTimer) {
context.log("Trigger fired");
if (myTimer.isPastDue)
{
context.log('JavaScript is running late!');
}
const containerClient = getContainerClient(process.env.AzureWebJobsStorage, 'location');
for await (const blob of containerClient.listBlobsFlat()) {
if(blob.name !== 'test.txt') {
continue
}
const blobClient = containerClient.getBlobClient(blob.name);
const downloadBlockBlobResponse = await blobClient.download(0, (1024 * 1024));
try{
const blobContent = (
await streamToBuffer(downloadBlockBlobResponse.readableStreamBody)
).toString();
context.log(blobContent);
}
catch (error) {
context.log(`ERROR: issues reading the following file - ${blob.name}, due to the following error : ${error.message}`);
}
}
context.log("Trigger completed");
};
async function streamToBuffer(readableStream) {
return new Promise((resolve, reject) => {
const chunks = [];
readableStream.on("data", (data) => {
chunks.push(data instanceof Buffer ? data : Buffer.from(data));
});
readableStream.on("end", () => {
resolve(Buffer.concat(chunks));
});
readableStream.on("error", reject);
});
}
但是,当我读取单个 MB 的文本文件时,块在一行的中间而不是末尾结束,这意味着我无法将最后一行发送到 API。
有人知道我如何保证数据块始终包含完整行吗?或者有更好的方法来处理 Azure 中的这个用例吗?
文件内容将如下所示
Test|TestTwo|test@test.com|美国|纽约|1234|主要|街道|12347|711|1973-09-09
除非您事先知道每行的行长(例如每行恰好 128 字节长或类似长度),否则无法始终读取到完美的行边界。
相反,您必须连续少量阅读,直到读到一行的末尾,然后在临时存储中标记下一次继续阅读的位置,以继续阅读下一行。
例如,如果典型的一行有 100 个字节长,而您以部分行结尾(您几乎总是会这样),然后再读取 250 个字节左右,直到您找到该行的结尾当前。然后,计算该行结束的文件位置并将其存储以供下一次通过。