EventHub 和接收

EventHub and Receive

全部,

我稍微修改了 python 示例 Azure EventHub 脚本,但是当我 运行 它进入一个循环,一遍又一遍地获取相同的事件。我没有向 eventhub 发送任何事件,因为我想阅读那里的内容,但我在这里没有看到 while 循环,所以这是怎么发生的,在它读取 EventHub 中当前的所有事件后我该如何停止?

谢谢
grajee

# https://docs.microsoft.com/en-us/python/api/overview/azure/eventhub-readme?view=azure-python#consume-events-from-an-event-hub
import logging
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://testhubns01.servicebus.windows.net/;SharedAccessKeyName=getevents;SharedAccessKey=testtestest='
consumer_group = '$Default'
eventhub_name = 'testpart'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition: \"{}\"   :  \"{}\"" .format(partition_context.partition_id,event.body_as_str()))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

client.close()

下面一段来自 here 的代码更清楚了。

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        **checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing**
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())