在 azure 函数中处理 1GB 大小的管道分隔文本文件的最佳方法

Best way to process a 1GB size pipe separated text file in an azure function

我在 blob 存储中有一个 1GB 的文本文件,我目前正在构建一个函数,它将获取每一行的内容并将它们发送到外部 API。这个外部 API 限制为每秒 200 个请求,而且由于我的订阅计划,我的功能也被限制在十分钟的运行时间。

我正在考虑使用持久函数通过分块读取文件来处理此用例。我已经编写了以下代码来测试将代码读入卡盘。

module.exports = async function (context, myTimer) {
    context.log("Trigger fired");
    
    if (myTimer.isPastDue)
    {
        context.log('JavaScript is running late!');
    }

    const containerClient = getContainerClient(process.env.AzureWebJobsStorage, 'location');
    for await (const blob of containerClient.listBlobsFlat()) {
        if(blob.name !== 'test.txt') {
            continue
        }

        const blobClient = containerClient.getBlobClient(blob.name);
        const downloadBlockBlobResponse = await blobClient.download(0, (1024 * 1024));
        try{
            const blobContent = (
              await streamToBuffer(downloadBlockBlobResponse.readableStreamBody)
            ).toString();
            context.log(blobContent);
        }
        catch (error) {
            context.log(`ERROR: issues reading the following file - ${blob.name}, due to the following error : ${error.message}`);
        } 
    }
    context.log("Trigger completed");
};

async function streamToBuffer(readableStream) {
  return new Promise((resolve, reject) => {
    const chunks = [];
    readableStream.on("data", (data) => {
      chunks.push(data instanceof Buffer ? data : Buffer.from(data));
    });
    readableStream.on("end", () => {
      resolve(Buffer.concat(chunks));
    });
    readableStream.on("error", reject);
  });
}

但是,当我读取单个 MB 的文本文件时,块在一行的中间而不是末尾结束,这意味着我无法将最后一行发送到 API。

有人知道我如何保证数据块始终包含完整行吗?或者有更好的方法来处理 Azure 中的这个用例吗?

文件内容将如下所示

Test|TestTwo|test@test.com|美国|纽约|1234|主要|街道|12347|711|1973-09-09

除非您事先知道每行的行长(例如每行恰好 128 字节长或类似长度),否则无法始终读取到完美的行边界。

相反,您必须连续少量阅读,直到读到一行的末尾,然后在临时存储中标记下一次继续阅读的位置,以继续阅读下一行。

例如,如果典型的一行有 100 个字节长,而您以部分行结尾(您几乎总是会这样),然后再读取 250 个字节左右,直到您找到该行的结尾当前。然后,计算该行结束的文件位置并将其存储以供下一次通过。