超出 Azure 服务总线 SDK 的 SendBatch() 方法的 link 异常当前允许的限制(262144 字节)

Exceeds the limit (262144 bytes) currently allowed on the link Exception on SendBatch() method of Azure service bus SDK

我们正在使用 Azure 服务总线 SDK 3.1.7 的 SendBatch() 方法将数据发送到 Azure 事件中心。以下是代码片段:

foreach (var packet in transformedPackets)
        {
            EventData eventData = new EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(packet, settings)));;
            try
            {
                eventData.Properties.Add(AppConstants.TenantDeploymentUniqueIdKey, tenantDeploymentUniqueId);
                eventData.Properties.Add(AppConstants.DataTypeKey, DataTypeKey);
                byteCount += eventData.SerializedSizeInBytes;                   
                if (byteCount > MaxBatchSize)
                {
                    sender.SendBatch(transformedMessages);
                    transformedMessages.Clear();
                    byteCount = eventData.SerializedSizeInBytes;
                }
                transformedMessages.Add(eventData);
            }
            catch (System.Exception)
            {
                eventData.Dispose();
                throw;
            }

        }

在将事件数据添加到批处理之前检查 SerializedSizeInBytes 属性 之后的事件(最大允许限制:256 KB),我们得到以下异常:

收到的消息(delivery-id:0,size:262279 字节)超出了 link 当前允许的限制(262144 字节)。 在Microsoft.ServiceBus.Common.ExceptionDispatcher.Throw(异常异常) 在 Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult 结果) 在 Microsoft.ServiceBus.Common.AsyncResult1.End(IAsyncResult asyncResult) at Microsoft.ServiceBus.Messaging.MessageSender.RetrySenderEventDataAsyncResult.End(IAsyncResult r) at Microsoft.ServiceBus.Messaging.MessageSender.EndSendEventData(IAsyncResult result) at Microsoft.ServiceBus.Messaging.EventHubSender.SendBatch(IEnumerable1 个事件数据列表)

当事件被翻译成单个 AmqpMessage 时,一些附加信息会添加到批处理中 header。

确保所有 EventData 的总大小低于 250k。

您可以在此处找到有关此问题的详细信息:How to use client-side event batching functionality while Sending to Microsoft Azure EventHubs

我在事件中心也遇到了同样的问题。但是处理这些函数的聪明方法是使用递归调用。在这里发布对我来说工作正常的示例代码。

方法 1:处理正常的 SendBatchAsync 操作

public async Task SendBatchToEHAsync(IEnumerable<string> inputContent)  // Taking List of String to process (50000 Lines Approx)
{
    try
    {
        var batch = new List<EventData>();
        foreach (var item in inputContent) // Add it to Batch
            {
                var bind = new EventData(Encoding.UTF8.GetBytes(item));
                batch.Add(bind);
            }
            try
            {
                await eventHubClient.SendBatchAsync(batch);

            }
            catch (MessageSizeExceededException)
            {
                await ReprocessQuotaExceededBatch(batch);
            }
    }
    catch (Exception ex)
    {

        throw;
    }
}

方法二:处理并右递归函数处理"MessageSizeExceededException"

 public async Task ReprocessQuotaExceededBatch(List<EventData> batch)
{
    try
    {
        var noOfCalls = 2;
       var noOfRecordsPerSplitBatch = (int)Math.Ceiling((double)batch.Count / noOfCalls);
        var counter = 0;
        while (counter < noOfCalls)
        {
            var insertSubList = GetSubListForEventDataBatch(batch, counter);
            try
            {
                await eventHubClient.SendBatchAsync(insertSubList);

            }
            catch (MessageSizeExceededException)
            {
                await ReprocessQuotaExceededBatch(insertSubList);
                // Perform SPlit processing in case of this exception occures
            }
            counter++;
        }
    }
    catch (Exception ex)
    {

        throw;
    }
}

方法三:Item拆分辅助方法

 private static List<EventData> GetSubListForEventDataBatch(List<EventData> list, int count)
{

    var sList = list.Skip(count * noOfRecordsPerSplitBatch).Take(noOfRecordsPerSplitBatch);
    List<EventData> subList = new List<EventData>();
    subList.AddRange(sList);
    return subList;
}

希望这能为其他人解决问题。 如果有任何改进,请写下您对此代码的改进。