通过请求关闭 EventHubConsumerClient

Close EventHubConsumerClient with a request

我有这个代码:

async def run_events_listener():
    # Create an Azure blob checkpoint store to store the checkpoints.
    checkpoint_store = BlobCheckpointStore.from_connection_string(os.getenv('CHECKPOINT_STORE_URI'), os.getenv('CHECKPOINT_CONTAINER'))

    # Create a consumer client for the event hub.
    client = EventHubConsumerClient.from_connection_string(
        os.getenv('EVENTS_HUB_URI'), consumer_group="$Default",
        eventhub_name=os.getenv('EVENTHUB_NAME'), checkpoint_store=checkpoint_store)
    async with client:
        # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
        await client.receive(on_event=on_event,  starting_position="-1")

为了实现优雅停止,我必须正确关闭EventHubConsumerClient。 run_events_listener 是一个 asyncio.Task。我该如何关闭它?

  • 消费者负责有效的资源管理,目的是在闲置期间保持较低的资源使用率,并在增加使用期间管理健康状况。

  • 当应用程序关闭时,调用 CloseAsync(CancellationToken)DisposeAsync() 方法将确保网络资源和其他非托管对象被适当清除。

  • CloseAsync(CancellationToken) 关闭消费者。

  • DisposeAsync()完成清理EventHubConsumerClient资源的工作,包括确保客户端已经关闭。

  • IsClosed specifies这个EventHubConsumerClient是否已经关闭。

调用者无需明确执行任何操作即可彻底关闭。消费者通过 try/finally 块在内部管理 startup/shutdown 步骤。

终止您的流程或取消 asyncio 任务足以清理一切。