超过了每个分区允许的最大接收器数 eventhub 中的错误

Exceeded the maximum number of allowed receivers per partition Error in eventhub

像下面的代码一样,我不断地从 azure eventhub 接收数据。我经常看到错误说 "Exceeded the maximum number of allowed receivers per partition" 我知道它来自哪里。

import os
import sys
import logging
import time
from azure.eventhub import EventHubClient, Receiver, Offset

logger = logging.getLogger("azure")

ADDRESS = ""
USER = ""
KEY = ""

CONSUMER_GROUP = "$default"
OFFSET = Offset("@latest")
PARTITION = "0"

total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)

receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0, offset=OFFSET)    
client.run()
start_time = time.time()
while True:
    for event_data in receiver.receive(timeout=5000):
        print("Received: {}".format(event_data.body_as_str(encoding='UTF-8')))
        a = event_data.body_as_str(encoding='UTF-8')
        total += 1
    end_time = time.time()
    run_time = end_time - start_time
    print("Received {} messages in {} seconds".format(total, run_time))

这里的行是添加接收器的那一行,如果我添加超过五个接收器,它会达到每个分区可能的接收器数量的限制。

receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0
  1. 所以,我尝试删除带有函数的接收器。 比如使用receiver.client.clients.removereceiver.client.clients.clear(),清除之前添加的接收者。但是,这些方法中的 none 似乎有效。

我看到这个错误的原因是因为我 运行 上面的整个代码每当我需要停止 运行ning 脚本进行调试时,所以每当我重新 运行它,我必须 运行 行 receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0

  1. 我也试过 运行 只有 "add_receiver" 行下面的部分代码
client.run()
start_time = time.time()
while True:
    for event_data in receiver.receive(timeout=5000):
        print("Received: {}".format(event_data.body_as_str(encoding='UTF-8')))
        a = event_data.body_as_str(encoding='UTF-8')
        total += 1
    end_time = time.time()
    run_time = end_time - start_time
    print("Received {} messages in {} seconds".format(total, run_time))

但是,我看到另一个错误说 EventHubError: This receive handler is now closed.

有什么可能的方法来解决这个问题?

经过挣扎,我认为这可能是解决这个问题的方法。

对于上面的第 22 行,我可以简单地向其添加 'keep_alive' 输入,例如:

receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=0, offset=OFFSET, keep_alive = 10000000)  

通过这种方式,我可以让客户端接收器保持打开状态而不会收到错误消息 "EventHubError: This receive handler is now closed.",然后只需 运行 下面唯一的部分:

start_time = time.time()
while True:
    for event_data in receiver.receive(timeout=5000):
        print("Received: {}".format(event_data.body_as_str(encoding='UTF-8')))
        a = event_data.body_as_str(encoding='UTF-8')
        total += 1
    end_time = time.time()
    run_time = end_time - start_time
    print("Received {} messages in {} seconds".format(total, run_time))