EventHub 和接收
EventHub and Receive
全部,
我稍微修改了 python 示例 Azure EventHub 脚本,但是当我 运行 它进入一个循环,一遍又一遍地获取相同的事件。我没有向 eventhub 发送任何事件,因为我想阅读那里的内容,但我在这里没有看到 while 循环,所以这是怎么发生的,在它读取 EventHub 中当前的所有事件后我该如何停止?
谢谢
grajee
# https://docs.microsoft.com/en-us/python/api/overview/azure/eventhub-readme?view=azure-python#consume-events-from-an-event-hub
import logging
from azure.eventhub import EventHubConsumerClient
connection_str = 'Endpoint=sb://testhubns01.servicebus.windows.net/;SharedAccessKeyName=getevents;SharedAccessKey=testtestest='
consumer_group = '$Default'
eventhub_name = 'testpart'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)
def on_event(partition_context, event):
logger.info("Received event from partition: \"{}\" : \"{}\"" .format(partition_context.partition_id,event.body_as_str()))
partition_context.update_checkpoint(event)
with client:
client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# client.receive(on_event=on_event, partition_id='0')
client.close()
下面一段来自 here 的代码更清楚了。
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'
async def on_event(partition_context, event):
# do something
await partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
async def receive(client):
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name,
**checkpoint_store=checkpoint_store, # For load balancing and checkpoint. Leave None for no load balancing**
)
async with client:
await receive(client)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
全部,
我稍微修改了 python 示例 Azure EventHub 脚本,但是当我 运行 它进入一个循环,一遍又一遍地获取相同的事件。我没有向 eventhub 发送任何事件,因为我想阅读那里的内容,但我在这里没有看到 while 循环,所以这是怎么发生的,在它读取 EventHub 中当前的所有事件后我该如何停止?
谢谢
grajee
# https://docs.microsoft.com/en-us/python/api/overview/azure/eventhub-readme?view=azure-python#consume-events-from-an-event-hub
import logging
from azure.eventhub import EventHubConsumerClient
connection_str = 'Endpoint=sb://testhubns01.servicebus.windows.net/;SharedAccessKeyName=getevents;SharedAccessKey=testtestest='
consumer_group = '$Default'
eventhub_name = 'testpart'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)
def on_event(partition_context, event):
logger.info("Received event from partition: \"{}\" : \"{}\"" .format(partition_context.partition_id,event.body_as_str()))
partition_context.update_checkpoint(event)
with client:
client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# client.receive(on_event=on_event, partition_id='0')
client.close()
下面一段来自 here 的代码更清楚了。
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'
async def on_event(partition_context, event):
# do something
await partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
async def receive(client):
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name,
**checkpoint_store=checkpoint_store, # For load balancing and checkpoint. Leave None for no load balancing**
)
async with client:
await receive(client)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())