如何解析事件中心存储帐户中捕获的 AVRO blob?
How to parse AVRO blobs captured in a storage account of Event Hub?
在 Microsoft Azure 中,我们有一个 Event Hub 捕获 JSON 数据并将其以 AVRO 格式存储在 blob 存储帐户中:
我写了一个 python 脚本,它会从事件中心获取 AVRO 文件:
import os, avro
from io import BytesIO
from operator import itemgetter, attrgetter
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
conn_str = 'DefaultEndpointsProtocol=https;AccountName=...;AccountKey=...;EndpointSuffix=core.windows.net'
container_name = 'container1'
blob_service_client = BlobServiceClient.from_connection_string(conn_str)
container_client = blob_service_client.get_container_client(container_name)
blob_list = []
for blob in container_client.list_blobs():
if blob.name.endswith('.avro'):
blob_list.append(blob)
blob_list.sort(key=attrgetter('creation_time'), reverse=True)
效果很好,我得到了一个按创建时间排序的 AVRO blob 列表。
现在我正在尝试添加最后的步骤,我将在其中下载 blob,parse the AVRO-formatted data 并检索 JSON 有效负载。
我尝试将列表中的每个 blob 检索到内存缓冲区并解析它:
for blob in blob_list:
blob_client = container_client.get_blob_client(blob.name)
downloader = blob_client.download_blob()
stream = BytesIO()
downloader.download_to_stream(stream) # also tried readinto(stream)
reader = DataFileReader(stream, DatumReader())
for event_data in reader:
print(event_data)
reader.close()
不幸的是,上面的Python代码不起作用,没有打印任何东西。
我也看到了,有一个StorageStreamDownloader.readall()
方法,但是我不确定,如何应用它。
我正在使用通过 pip 安装的 Windows 10、python 3.8.5 和 avro 1.10.0。
使用readall()
方法时,应按如下方式使用:
with open("xxx", "wb+") as my_file:
my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
更详细的读取抓取的eventhub数据,可以参考官方文档:Create a Python script to read your Capture files.
如果您还有其他问题,请告诉我:)。
在 Microsoft Azure 中,我们有一个 Event Hub 捕获 JSON 数据并将其以 AVRO 格式存储在 blob 存储帐户中:
我写了一个 python 脚本,它会从事件中心获取 AVRO 文件:
import os, avro
from io import BytesIO
from operator import itemgetter, attrgetter
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
conn_str = 'DefaultEndpointsProtocol=https;AccountName=...;AccountKey=...;EndpointSuffix=core.windows.net'
container_name = 'container1'
blob_service_client = BlobServiceClient.from_connection_string(conn_str)
container_client = blob_service_client.get_container_client(container_name)
blob_list = []
for blob in container_client.list_blobs():
if blob.name.endswith('.avro'):
blob_list.append(blob)
blob_list.sort(key=attrgetter('creation_time'), reverse=True)
效果很好,我得到了一个按创建时间排序的 AVRO blob 列表。
现在我正在尝试添加最后的步骤,我将在其中下载 blob,parse the AVRO-formatted data 并检索 JSON 有效负载。
我尝试将列表中的每个 blob 检索到内存缓冲区并解析它:
for blob in blob_list:
blob_client = container_client.get_blob_client(blob.name)
downloader = blob_client.download_blob()
stream = BytesIO()
downloader.download_to_stream(stream) # also tried readinto(stream)
reader = DataFileReader(stream, DatumReader())
for event_data in reader:
print(event_data)
reader.close()
不幸的是,上面的Python代码不起作用,没有打印任何东西。
我也看到了,有一个StorageStreamDownloader.readall()
方法,但是我不确定,如何应用它。
我正在使用通过 pip 安装的 Windows 10、python 3.8.5 和 avro 1.10.0。
使用readall()
方法时,应按如下方式使用:
with open("xxx", "wb+") as my_file:
my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
更详细的读取抓取的eventhub数据,可以参考官方文档:Create a Python script to read your Capture files.
如果您还有其他问题,请告诉我:)。