Azure Blob Storage / NodeJS - 从缓冲区读取 Avro 文件

Azure Blob Storage / NodeJS - Read Avro File from Buffer

我已经使用捕获功能将事件中心输出到 Blob 存储 - 这会将我们作为 .avro 文件放入队列的任何内容输出。

如果我下载此文件并尝试使用像 avro-js 这样的库来解析它,我没有任何问题 - 我可以很好地读取文件并按我认为合适的方式处理内容。

但是 - 在使用 Node 处理 Azure Blob 存储时,我想将文件作为下载文件进行处理。读取文件时返回的格式是缓冲区,但我找不到使用库成功解析此文件的方法(找不到正确的方法,如果有的话)。

用于从 Azure 下载 blob 的代码,省略了几位:

const { BlobServiceClient } = require('@azure/storage-blob');
const blobServiceClient = BlobServiceClient.fromConnectionString(AZURE_STORAGE_CONNECTION_STRING);
const containerClient = blobServiceClient.getContainerClient("data");

const blockBlobClient = containerClient.getBlockBlobClient(blob.name);
    
const downloadBlockBlobResponse = await blockBlobClient.download(0);

输出到控制台时的缓冲区片段:

<Buffer 4f 62 6a 01 04 14 61 76 72 6f 2e 63 6f 64 65 63 08 6e 75 6c 6c 16 61 76 72 6f 2e 73 63 68 65 6d 61 ec 06 7b 22 74 79 70 65 22 3a 22 72 65 63 6f 72 64 ... 589 more bytes>

转换为字符串时的内容(粘贴图片时乱码输出不正常):

已尝试将 .avro 文件读取为纯文本,虽然它们大部分没问题,但有一些字符是乱码,因此它不会读出 JSON(而且我不'想对内容做出假设以尝试拉出邮件正文)。

有没有人基于缓冲区成功地从 Azure 中提取 .avro 内容? 我在网上看到很多关于将这些文件加载​​到 Spark 或 Kafka 中的指南,但不仅仅是读取流中的文件。

谢谢!

关于这个问题,我们可以使用包avsc来解析带有buffer的avro文件。详情请参考here.

例如

const avro = require("avsc");
const {
  BlobServiceClient,
  StorageSharedKeyCredential,
} = require("@azure/storage-blob");

const accountName = "";
const accountKey =
  "";
async function main() {
  const creds = new StorageSharedKeyCredential(accountName, accountKey);
  const blobServiceClient = new BlobServiceClient(
    `https://${accountName}.blob.core.windows.net`,
    creds
  );
  const containerClient = blobServiceClient.getContainerClient("");
  const blockBlobClient = containerClient.getBlockBlobClient(
    ""
  );

  const downloadBlockBlobResponse = await blockBlobClient.download(0);
  const buf = await streamToBuffer(
    downloadBlockBlobResponse.readableStreamBody
  );

  const decoder = new avro.streams.BlockDecoder({
    parseHook: (schema) => {
      console.log("the avro file schema:");
      console.log(schema);
      return avro.Type.forSchema(schema, { wrapUnions: true });
    },
  });

  decoder.on("data", (data) => {
    console.log(data);
  });
  decoder.end(buf);
}
async function streamToBuffer(readableStream) {
  return new Promise((resolve, reject) => {
    const chunks = [];
    readableStream.on("data", (data) => {
      chunks.push(data instanceof Buffer ? data : Buffer.from(data));
    });
    readableStream.on("end", () => {
      resolve(Buffer.concat(chunks));
    });
    readableStream.on("error", reject);
  });
}
main();