Azure EventHubs 抛出异常:端点的至少一个接收器是用纪元“0”创建的,因此不允许使用非纪元接收器

Azure EventHubs throws Exception: At least one receiver for the endpoint is created with epoch of '0', and so non-epoch receiver is not allowed

简介

大家好,我们目前正在开发一个微服务平台,该平台使用 Azure EventHub 和事件在服务之间发送数据。 让我们直接命名这些服务:CustomerService、OrderService 和 MobileBFF。

CustomerService 主要发送更新(带有事件),然后由 OrderService 和 MobileBFF 存储这些更新,以便能够响应查询而无需为此数据调用 CustomerService。

所有这 3 项服务 + 我们在 DEV 环境中的开发人员使用相同的 ConsumerGroup 连接到这些事件中心。

我们目前只使用 1 个分区,但计划以后扩展到多个。 (你可以看到我们的代码已经可以从多个分区读取)

异常

不过,我们时不时地 运行 会遇到异常(如果它启动,通常会持续抛出此错误一个小时左右)。不过目前我们只在 DEV/TEST 环境中看到此错误。

异常:

Azure.Messaging.EventHubs.EventHubsException(ConsumerDisconnected): At least one receiver for the endpoint is created with epoch of '0', and so non-epoch receiver is not allowed. Either reconnect with a higher epoch, or make sure all epoch receivers are closed or disconnected.

EventHub 的所有消费者,将他们的 SequenceNumber 存储在他们自己的数据库中。这允许我们让每个消费者单独使用事件,并将最后处理的 SequenceNumber 存储在它自己的 SQL 数据库中。当服务(重新)启动时,它从数据库加载 SequenceNumber,然后从这里开始请求事件,直到找不到更多事件。然后它会休眠 100 毫秒,然后重试。这是(稍微简化的)代码:

var consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
string[] allPartitions = null;
await using (var consumer = new EventHubConsumerClient(consumerGroup, _inboxOptions.EventHubConnectionString, _inboxOptions.EventHubName))
{
    allPartitions = await consumer.GetPartitionIdsAsync(stoppingToken);
}

var allTasks = new List<Task>();

foreach (var partitionId in allPartitions)
{
    //This is required if you reuse variables inside a Task.Run();
    var partitionIdInternal = partitionId;

    allTasks.Add(Task.Run(async () =>
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await using (var consumer = new EventHubConsumerClient(consumerGroup, _inboxOptions.EventHubConnectionString, _inboxOptions.EventHubName))
                {
                    EventPosition startingPosition;
                    using (var testScope = _serviceProvider.CreateScope())
                    {
                        var messageProcessor = testScope.ServiceProvider.GetService<EventHubInboxManager<T, EH>>();
                        //Obtains starting position from the database or sets to "Earliest" or "Latest" based on configuration
                        startingPosition = await messageProcessor.GetStartingPosition(_inboxOptions.InboxIdentifier, partitionIdInternal);
                    }

                    while (!stoppingToken.IsCancellationRequested)
                    {
                        bool processedSomething = false;
                        await foreach (PartitionEvent partitionEvent in consumer.ReadEventsFromPartitionAsync(partitionIdInternal, startingPosition, stoppingToken))
                        {
                            processedSomething = true;

                            startingPosition = await messageProcessor.Handle(partitionEvent);
                        }

                        if (processedSomething == false)
                        {
                            await Task.Delay(100, stoppingToken);
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                //Log error / delay / retry
            }

        }
    }
}

以下行抛出异常:

await using (var consumer = new EventHubConsumerClient(consumerGroup, _inboxOptions.EventHubConnectionString, _inboxOptions.EventHubName))

更多调查

上述代码是 运行 微服务(在 Azure 中作为 AppService 托管)

除此之外,我们还有 运行 1 个 Azure 函数,它也从 EventHub 读取事件。 (可能使用相同的消费者组)。

根据此处的文档:https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups 每个消费者组应该可以有 5 个消费者。似乎只建议有一个,但我们不清楚如果我们不遵循这个指导会发生什么。

我们确实通过手动生成读取事件的服务的多个实例进行了一些测试,当超过 5 个时,这会导致不同的错误,该错误非常清楚地表明每个消费者组每个分区只能有 5 个消费者(或类似的东西)。

此外,似乎(我们不是 100% 确定)当我们重写代码(以上)以便能够为每个分区生成一个线程时,这个问题就开始发生了。 (即使我们在 EventHub 中只有 1 个分区)。 编辑:我们进行了更多的日志挖掘,并且在合并代码以每个分区生成一个线程之前还发现了一些异常。

该异常表明有另一个消费者被配置为使用相同的消费者组并断言对分区的独占访问。除非您在客户端选项中明确设置 OwnerLevel 属性,否则可能的候选者是至少有一个 EventProcessorClient 运行.

要补救,您可以:

  • 针对相同的事件中心和消费者组组合停止任何事件处理器 运行,并确保没有其他消费者明确设置 OwnerLevel

  • 运行这些消费者在一个专门的消费者组中;这将允许他们与独家消费者共存 and/or 事件处理器。

  • 为这些消费者显式地将 OwnerLevel 设置为 1 或更大;这将声明所有权并强制同一消费者组中的任何其他消费者断开连接。
    (注意:根据其他消费者的情况,您可能需要在此处测试不同的值。事件处理器类型使用 0,因此高于该值的任何值都将优先。)

添加到 Jesse's 答案中,我认为异常消息是 旧的 SDK。
如果你查看 docs,那里定义了 3 种类型的接收模式:

  1. 纪元

Epoch is a unique identifier (epoch value) that the service uses, to enforce partition/lease ownership. The epoch feature provides users the ability to ensure that there is only one receiver on a consumer group at any point in time...

  1. 非纪元:

... There are some scenarios in stream processing where users would like to create multiple receivers on a single consumer group. To support such scenarios, we do have ability to create a receiver without epoch and in this case we allow upto 5 concurrent receivers on the consumer group.

  1. 混合:

... If there is a receiver already created with epoch e1 and is actively receiving events and a new receiver is created with no epoch, the creation of new receiver will fail. Epoch receivers always take precedence in the system.