Azure 事件中心:QuotaExceededException:4999 个处理限制

Azure Event Hubs : QuotaExceededException : 4999 handles limit

我在 WebApi 应用程序中使用 C# 的 Azure 事件中心库。我在发送消息时遇到了这个异常:

"Message":"An error has occurred.",
"ExceptionMessage":"Cannot allocate more handles to the current session or connection. The maximum number of handles allowed is 4999. Please free up resources and try again., referenceId: d975f39c71a14ae5915c9adca322e110_G15"
"ExceptionType":"Microsoft.ServiceBus.Messaging.QuotaExceededException"

我认为 EventHubProducerClient 的实例化一次并重用它而不是在每次发送消息时创建它的实例(使用 IAsyncDisposable 模式)会有所帮助,如此处所述 EventHub Exception :Cannot allocate more handles to the current session or connection。但它没有。

我认为可能存在一些更全球化的问题。可能遗漏了什么。

    public class EventHubService : SubscriberBase
    {
        private readonly Action<string> errorHandler;
        private readonly BlobContainerClient blobContainerClient;
        private readonly EventProcessorClient eventProcessorClient;
        private readonly EventHubProducerClient eventHubProducerClient;
        private readonly int eventsToCheckpoint;
        private readonly Timer checkpointTimer;
        private int eventsSinceLastCheckpoint;
        private bool shouldUpdateCheckpoint;

        public EventHubService(EventHubSettings settings, Action<string> errorHandler) : base()
        {
            this.errorHandler = errorHandler;
            eventHubProducerClient = new EventHubProducerClient(settings.ConnectionString, settings.EventHubName);

            if (!String.IsNullOrWhiteSpace(settings.GroupId))
            {
                eventManager = new EventManager();

                blobContainerClient = new BlobContainerClient(settings.StorageConnectionString, settings.BlobContainerName);
                eventProcessorClient = new EventProcessorClient(blobContainerClient, settings.GroupId, settings.ConnectionString, settings.EventHubName);

                eventsToCheckpoint = settings.EventsToUpdateCheckpoint;
                checkpointTimer = new Timer(settings.IntervalToUpdateCheckpoint.TotalMilliseconds);
                checkpointTimer.Elapsed += (sender, eventArgs) => shouldUpdateCheckpoint = true;
            }
        }

        public override void Start()
        {
            eventProcessorClient.ProcessErrorAsync += HandleError;
            eventProcessorClient.ProcessEventAsync += ProcessEventData;
            eventProcessorClient.StartProcessingAsync().Wait();
            checkpointTimer.Start();
        }

        public override async Task Stop()
        {
            try
            {
                checkpointTimer.Stop();
                await eventProcessorClient.StopProcessingAsync();
            }
            finally
            {
                eventProcessorClient.ProcessEventAsync -= ProcessEventData;
                eventProcessorClient.ProcessErrorAsync -= HandleError;
            }
        }

        public override async Task Publish(string topic, JObject message)
        {
            using (EventDataBatch eventBatch = await eventHubProducerClient.CreateBatchAsync())
            {
                var @event = new Event(topic, message);
                string json = @event.ToString(Formatting.None);
                byte[] bytes = Encoding.UTF8.GetBytes(json);
                var eventData = new EventData(bytes);
                eventBatch.TryAdd(eventData);

                await eventHubProducerClient.SendAsync(eventBatch);
            }
        }

        private async Task ProcessEventData(ProcessEventArgs eventArgs)
        {
            if (eventArgs.CancellationToken.IsCancellationRequested)
            {
                return;
            }

            if (++eventsSinceLastCheckpoint >= eventsToCheckpoint)
            {
                eventsSinceLastCheckpoint = 0;
                shouldUpdateCheckpoint = true;
            }

            if (shouldUpdateCheckpoint)
            {
                await eventArgs.UpdateCheckpointAsync();
                shouldUpdateCheckpoint = false;
            }

            string json = Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray());
            var @event = new Event(json);
            eventManager.TryRaise(@event);
        }

        private Task HandleError(ProcessErrorEventArgs eventArgs)
        {
            if (!eventArgs.CancellationToken.IsCancellationRequested)
            {
                errorHandler.Invoke($"[P:{eventArgs.PartitionId}][O:{eventArgs.Operation}] {eventArgs.Exception.Message}");
            }

            return Task.CompletedTask;
        }
    }  

我在 Service Bus Quotas 中找到了一些信息,例如:

Number of concurrent receive requests on a queue, topic, or subscription entity (5000).

Subsequent receive requests are rejected, and an exception is received by the calling code. This quota applies to the combined number of concurrent receive operations across all subscriptions on a topic.

但是还是想不出怎么处理。

请帮忙。 谢谢。

这确实是答案EventHub Exception :Cannot allocate more handles to the current session or connection

我为 NET Core 的 Azure 事件中心库做了类似的“修复”,但我忘记了我也在使用 NET 框架的 Azure 事件中心库!

所以我已经实例化了 EventHubProducerClient 一次并现在重新使用它。 似乎工作正常。

我的错。不够细心

在我的例子中,除了只创建一个 Client 实例外,只使用一个 sender 实例。 我每次发送消息时都使用方法CreateSender,它也会产生异常