超出 Azure 服务总线 SDK 的 SendBatch() 方法的 link 异常当前允许的限制(262144 字节)
Exceeds the limit (262144 bytes) currently allowed on the link Exception on SendBatch() method of Azure service bus SDK
我们正在使用 Azure 服务总线 SDK 3.1.7 的 SendBatch() 方法将数据发送到 Azure 事件中心。以下是代码片段:
foreach (var packet in transformedPackets)
{
EventData eventData = new EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(packet, settings)));;
try
{
eventData.Properties.Add(AppConstants.TenantDeploymentUniqueIdKey, tenantDeploymentUniqueId);
eventData.Properties.Add(AppConstants.DataTypeKey, DataTypeKey);
byteCount += eventData.SerializedSizeInBytes;
if (byteCount > MaxBatchSize)
{
sender.SendBatch(transformedMessages);
transformedMessages.Clear();
byteCount = eventData.SerializedSizeInBytes;
}
transformedMessages.Add(eventData);
}
catch (System.Exception)
{
eventData.Dispose();
throw;
}
}
在将事件数据添加到批处理之前检查 SerializedSizeInBytes 属性 之后的事件(最大允许限制:256 KB),我们得到以下异常:
收到的消息(delivery-id:0,size:262279 字节)超出了 link 当前允许的限制(262144 字节)。
在Microsoft.ServiceBus.Common.ExceptionDispatcher.Throw(异常异常)
在 Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult 结果)
在 Microsoft.ServiceBus.Common.AsyncResult1.End(IAsyncResult asyncResult)
at Microsoft.ServiceBus.Messaging.MessageSender.RetrySenderEventDataAsyncResult.End(IAsyncResult r)
at Microsoft.ServiceBus.Messaging.MessageSender.EndSendEventData(IAsyncResult result)
at Microsoft.ServiceBus.Messaging.EventHubSender.SendBatch(IEnumerable
1 个事件数据列表)
当事件被翻译成单个 AmqpMessage
时,一些附加信息会添加到批处理中 header。
确保所有 EventData
的总大小低于 250k。
您可以在此处找到有关此问题的详细信息:How to use client-side event batching functionality while Sending to Microsoft Azure EventHubs
我在事件中心也遇到了同样的问题。但是处理这些函数的聪明方法是使用递归调用。在这里发布对我来说工作正常的示例代码。
方法 1:处理正常的 SendBatchAsync 操作
public async Task SendBatchToEHAsync(IEnumerable<string> inputContent) // Taking List of String to process (50000 Lines Approx)
{
try
{
var batch = new List<EventData>();
foreach (var item in inputContent) // Add it to Batch
{
var bind = new EventData(Encoding.UTF8.GetBytes(item));
batch.Add(bind);
}
try
{
await eventHubClient.SendBatchAsync(batch);
}
catch (MessageSizeExceededException)
{
await ReprocessQuotaExceededBatch(batch);
}
}
catch (Exception ex)
{
throw;
}
}
方法二:处理并右递归函数处理"MessageSizeExceededException"
public async Task ReprocessQuotaExceededBatch(List<EventData> batch)
{
try
{
var noOfCalls = 2;
var noOfRecordsPerSplitBatch = (int)Math.Ceiling((double)batch.Count / noOfCalls);
var counter = 0;
while (counter < noOfCalls)
{
var insertSubList = GetSubListForEventDataBatch(batch, counter);
try
{
await eventHubClient.SendBatchAsync(insertSubList);
}
catch (MessageSizeExceededException)
{
await ReprocessQuotaExceededBatch(insertSubList);
// Perform SPlit processing in case of this exception occures
}
counter++;
}
}
catch (Exception ex)
{
throw;
}
}
方法三:Item拆分辅助方法
private static List<EventData> GetSubListForEventDataBatch(List<EventData> list, int count)
{
var sList = list.Skip(count * noOfRecordsPerSplitBatch).Take(noOfRecordsPerSplitBatch);
List<EventData> subList = new List<EventData>();
subList.AddRange(sList);
return subList;
}
希望这能为其他人解决问题。
如果有任何改进,请写下您对此代码的改进。
我们正在使用 Azure 服务总线 SDK 3.1.7 的 SendBatch() 方法将数据发送到 Azure 事件中心。以下是代码片段:
foreach (var packet in transformedPackets)
{
EventData eventData = new EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(packet, settings)));;
try
{
eventData.Properties.Add(AppConstants.TenantDeploymentUniqueIdKey, tenantDeploymentUniqueId);
eventData.Properties.Add(AppConstants.DataTypeKey, DataTypeKey);
byteCount += eventData.SerializedSizeInBytes;
if (byteCount > MaxBatchSize)
{
sender.SendBatch(transformedMessages);
transformedMessages.Clear();
byteCount = eventData.SerializedSizeInBytes;
}
transformedMessages.Add(eventData);
}
catch (System.Exception)
{
eventData.Dispose();
throw;
}
}
在将事件数据添加到批处理之前检查 SerializedSizeInBytes 属性 之后的事件(最大允许限制:256 KB),我们得到以下异常:
收到的消息(delivery-id:0,size:262279 字节)超出了 link 当前允许的限制(262144 字节)。
在Microsoft.ServiceBus.Common.ExceptionDispatcher.Throw(异常异常)
在 Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult 结果)
在 Microsoft.ServiceBus.Common.AsyncResult1.End(IAsyncResult asyncResult)
at Microsoft.ServiceBus.Messaging.MessageSender.RetrySenderEventDataAsyncResult.End(IAsyncResult r)
at Microsoft.ServiceBus.Messaging.MessageSender.EndSendEventData(IAsyncResult result)
at Microsoft.ServiceBus.Messaging.EventHubSender.SendBatch(IEnumerable
1 个事件数据列表)
当事件被翻译成单个 AmqpMessage
时,一些附加信息会添加到批处理中 header。
确保所有 EventData
的总大小低于 250k。
您可以在此处找到有关此问题的详细信息:How to use client-side event batching functionality while Sending to Microsoft Azure EventHubs
我在事件中心也遇到了同样的问题。但是处理这些函数的聪明方法是使用递归调用。在这里发布对我来说工作正常的示例代码。
方法 1:处理正常的 SendBatchAsync 操作
public async Task SendBatchToEHAsync(IEnumerable<string> inputContent) // Taking List of String to process (50000 Lines Approx)
{
try
{
var batch = new List<EventData>();
foreach (var item in inputContent) // Add it to Batch
{
var bind = new EventData(Encoding.UTF8.GetBytes(item));
batch.Add(bind);
}
try
{
await eventHubClient.SendBatchAsync(batch);
}
catch (MessageSizeExceededException)
{
await ReprocessQuotaExceededBatch(batch);
}
}
catch (Exception ex)
{
throw;
}
}
方法二:处理并右递归函数处理"MessageSizeExceededException"
public async Task ReprocessQuotaExceededBatch(List<EventData> batch)
{
try
{
var noOfCalls = 2;
var noOfRecordsPerSplitBatch = (int)Math.Ceiling((double)batch.Count / noOfCalls);
var counter = 0;
while (counter < noOfCalls)
{
var insertSubList = GetSubListForEventDataBatch(batch, counter);
try
{
await eventHubClient.SendBatchAsync(insertSubList);
}
catch (MessageSizeExceededException)
{
await ReprocessQuotaExceededBatch(insertSubList);
// Perform SPlit processing in case of this exception occures
}
counter++;
}
}
catch (Exception ex)
{
throw;
}
}
方法三:Item拆分辅助方法
private static List<EventData> GetSubListForEventDataBatch(List<EventData> list, int count)
{
var sList = list.Skip(count * noOfRecordsPerSplitBatch).Take(noOfRecordsPerSplitBatch);
List<EventData> subList = new List<EventData>();
subList.AddRange(sList);
return subList;
}
希望这能为其他人解决问题。 如果有任何改进,请写下您对此代码的改进。