Azure Eventhubs (Python):使用 blob 存储检查点 - 启用检查点时 EventProcessor 中的键错误问题

Azure Eventhubs (Python): checkpointing with blob storage - keyerror issue in EventProcessor when checkpointing is enabled

我在 eventhub 中遇到 blob 存储检查点问题。如果我在获取消费者客户端时没有设置 checkpoint_store,我的应用程序 运行 没问题。每当我尝试设置 checkpoint_store 变量和 运行 我的代码时,它都会抛出以下异常:

EventProcessor instance 'xxxxxxxxxxx' of eventhub <name of my eventhub> consumer group <name of my consumer group>. An error occurred while load-balancing and claiming ownership. The exception is KeyError('ownerid'). Retrying after xxxx seconds

我能找到的唯一 github 条目甚至提到了这种错误是 ,但是问题本身从未得到解决,有问题的人最终使用了不同的库.

我使用的相关库是 azure-eventhub 和 azure-eventhub-checkpointstoreblob-aio

以下是我正在使用的代码的相关片段 (I used this tutorial as a guide):

import asyncio
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient
from azure.eventhub import EventData
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
async def on_event(partition_context, event):
    await partition_context.update_checkpoint(event)
    #<do stuff with event data>
checkpoint_store = BlobCheckpointStore.from_connection_string(blob_connection_string, container_name)
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=input_eventhub_name, checkpoint_store=checkpoint_store)

async def main():
  async with client:
    await client.receive(
      on_event=on_event,
    )
    print("Terminated.")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

问题似乎只与 blob 存储检查点有关;如果我在创建消费者客户端时注释掉 'checkpoint_store=checkpoint_store' 一切 运行 都没有问题。

与 blob 存储的连接似乎正常,因为我做了一些挖掘,发现在 blob 存储中创建了一些文件夹,'checkpoint' 和 'ownership': blob storage snapshot 后者包含一些在其元数据中带有 'ownerid' 的文件: owner files metadata

即关键肯定存在。我认为正在发生的事情是 EventProcessor 试图获取这些 blob 的所有权元数据,但不知何故未能这样做。如果有人知道如何解决这个问题,我将不胜感激!

这看起来像是从其中一个 blob 中检索“ownerid”的问题。你能帮我测试一下这些场景吗?

  1. 从 blob 容器中删除所有内容并重试。
  2. 如果问题仍然存在,您能否检查每个 blob 是否都有元数据“ownerid”?
  3. 如果问题仍然存在,能否将库 azure-eventhub-checkpointstoreblob-aio 版本 1.1.0 中文件 azure.eventhub.extensions.checkpointstoreblobaio._blobstoragecsaio.py 的第 144 行替换为以下内容并重试?
"owner_id": blob.metadata.get("ownerid"),

根本原因是在启用数据湖(分层命名空间)的 v2 存储 blob 上调用存储 sdk 的 list_blobs 功能时,不仅会获得每个分区 checkpoint/ownership,而且还获取不包含元数据的父 blob 节点。

为了更好地说明这一点,假设我们有以下 blob 结构:

- fullqualifiednamespace (directory)
  - eventhubname (directory)
    - $default (directory)
        - ownership (directory)
          - 0 (blob)
          - 1 (blob)
          ...

在启用了数据湖(分层命名空间)的 v2 存储中,当代码使用前缀时 {<fully_qualified_namespace>/<eventhub_name>/<consumer_group>/ownership 来搜索 blob,{<fully_qualified_namespace>/<eventhub_name>/<consumer_group>/ownership 目录本身也会返回,不包含导致 KeyError 的元数据,当我们试图提取信息时。

checkpointstoreblob sdk 有错误修复版本,请升级到最新版本以查看它是否解决了您的问题。

如果您还有其他问题,请告诉我。

链接:

同步:https://pypi.org/project/azure-eventhub-checkpointstoreblob/1.1.2/

对于异步:https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/1.1.2/

github 问题:https://github.com/Azure/azure-sdk-for-python/issues/13060