事件中心检查点数据未保存
Event Hub Checkpoint Data is Not Saved
我正在 运行 宁事件中心接收器实现来自:https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-python-get-started-send#create-a-python-script-to-receive-events 除了连接字符串等我没有改变任何东西
创建数百个事件后,我可以看到接收方在存储帐户中创建了检查点文件夹,但是当我再次 运行 接收方时,我看到它处理相同的事件。
每个分区创建的文件也是空的。
consumerClient 中提供存储:
checkpoint_store = BlobCheckpointStore.from_connection_string("...", "eventhubcontainer")
client = EventHubConsumerClient.from_connection_string("...", consumer_group="$Default", eventhub_name="eventhub1", checkpoint_store=checkpoint_store)
还有读取事件后保存检查点的方法:
await partition_context.update_checkpoint(event)
我是不是遗漏了什么?
全部代码:
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
async def on_event(partition_context, event):
print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
await partition_context.update_checkpoint(event)
async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")
client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
async with client:
await client.receive(on_event=on_event, starting_position="-1")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
更新SDK解决了问题。另外@AdamLing 在评论中为我阐明了元数据位置。
这可能是旧 SDK 中的错误。
请尝试安装最新版sdk:azure-eventhub 5.3.1 and azure-eventhub-checkpointstoreblob-aio 1.1.3.
我用这些最新的 sdk 测试了你的代码,它工作正常。
我正在 运行 宁事件中心接收器实现来自:https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-python-get-started-send#create-a-python-script-to-receive-events 除了连接字符串等我没有改变任何东西
创建数百个事件后,我可以看到接收方在存储帐户中创建了检查点文件夹,但是当我再次 运行 接收方时,我看到它处理相同的事件。
每个分区创建的文件也是空的。
consumerClient 中提供存储:
checkpoint_store = BlobCheckpointStore.from_connection_string("...", "eventhubcontainer")
client = EventHubConsumerClient.from_connection_string("...", consumer_group="$Default", eventhub_name="eventhub1", checkpoint_store=checkpoint_store)
还有读取事件后保存检查点的方法:
await partition_context.update_checkpoint(event)
我是不是遗漏了什么?
全部代码:
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
async def on_event(partition_context, event):
print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
await partition_context.update_checkpoint(event)
async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")
client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
async with client:
await client.receive(on_event=on_event, starting_position="-1")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
更新SDK解决了问题。另外@AdamLing 在评论中为我阐明了元数据位置。
这可能是旧 SDK 中的错误。
请尝试安装最新版sdk:azure-eventhub 5.3.1 and azure-eventhub-checkpointstoreblob-aio 1.1.3.
我用这些最新的 sdk 测试了你的代码,它工作正常。