ServiceBusTrigger:会话一次接收一条消息
ServiceBusTrigger: session receiving one message at a time
我有一个带有 ServiceBusTrigger 触发器的 Azure 函数
它被配置为接收会话,当我插入消息时,它插入了正确的“SessionId”。
但是在执行触发器时,它一次只执行一条会话消息。
有人可以帮助我吗?我想一次 运行 所有会话消息。
我在下面留下了有关插入消息和触发器的代码片段
ServiceBusTrigger:
public async Task RunAsync([ServiceBusTrigger("%" + AzureServiceBusConfiguration.ServiceBusBlobMigrationQueueNameSecretName + "%", Connection = AzureServiceBusConfiguration.ServiceBusConnectionSecretName, IsSessionsEnabled = true)] string messageBrokerMessageString, FunctionContext context)
{
//CODE
}
消息插入:
for (int chunkIndex = 0; chunkIndex < chunks.Count(); chunkIndex++)
{
string workerNodeName = _workerManager.GetWorkerNodeName(controlMetadataEntity.GetBlobMigrationStrategyIdentifier(), Convert.ToUInt32(chunkIndex), controlMetadataEntity.RowKey);
foreach (BlobMetadataEntity blob in chunks.ElementAt(chunkIndex))
{
ServiceBusMessage message = new ServiceBusMessage()
{
// The message ID is necessary to prevent duplicate entries in message broker.
// See https://docs.microsoft.com/en-us/azure/service-bus-messaging/duplicate-detection.
MessageId = string.Join('|', blob.PartitionKey, blob.RowKey),
// Body will be the JSON of a object converted to bytes.
Body = new BinaryData(Encoding.UTF8.GetBytes(new BlobExecutionQueueMessage(blob.PartitionKey, blob.RowKey, workerNodeName, null).GetObjectAsJsonAsync())),
ContentType = MediaTypeNames.Application.Json,
// This will separate the message in logic queues to control the parallelism per execution.
// See https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions.
SessionId = workerNodeName
};
// Set user properties to identify this blob in message broker.
message.ApplicationProperties.Add("ExecutionId", blob.PartitionKey);
message.ApplicationProperties.Add("BlobId", blob.RowKey);
// Add the "AddAsync" task to the list of tasks to run them after all
// messages were enqueued in the list.
messageBrokerInsertions.Add(_azureServiceBusClient.SendMessageAsync(serviceBusClient, sender, message));
}
}
我的Host.json
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingExcludedTypes": "Request",
"samplingSettings": {
"isEnabled": true
}
},
"logLevel": {
"Default": "Information"
}
},
"extensions": {
"durableTask": {
"maxConcurrentActivityFunctions": 500,
"maxConcurrentOrchestratorFunctions": 500
},
"serviceBus": {
"sessionHandlerOptions": {
"maxConcurrentSessions": 1
}
}
}
}
据我所知,这是为了尊重与消费者相关的会话顺序而设计的,这有点棘手。您可以增加数量->
"maxConcurrentSessions": 16
这将 运行 它们并行。此外,如果您正在执行消费计划,则不必担心缩放问题,并且队列长度会被计入因式缩放。如果您不关心订购,您可以切换到非会话启用并分批引入消息。请注意此处的行:
https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus?tabs=in-process%2Cextensionv5%2Cextensionv3&pivots=programming-language-powershell#hostjson-settings
"When you set the isSessionsEnabled property or attribute on the trigger to true, the sessionHandlerOptions is honored. When you set the isSessionsEnabled property or attribute on the trigger to false, the messageHandlerOptions is honored."
留下一些参考链接以备阅读:
//队列和主题
https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions
//maxconcurrentsessions
https://docs.microsoft.com/en-us/dotnet/api/microsoft.servicebus.messaging.sessionhandleroptions.maxconcurrentsessions?view=azure-dotnet#microsoft-servicebus-messaging-sessionhandleroptions-maxconcurrentsessions
我有一个带有 ServiceBusTrigger 触发器的 Azure 函数
它被配置为接收会话,当我插入消息时,它插入了正确的“SessionId”。
但是在执行触发器时,它一次只执行一条会话消息。
有人可以帮助我吗?我想一次 运行 所有会话消息。
我在下面留下了有关插入消息和触发器的代码片段
ServiceBusTrigger:
public async Task RunAsync([ServiceBusTrigger("%" + AzureServiceBusConfiguration.ServiceBusBlobMigrationQueueNameSecretName + "%", Connection = AzureServiceBusConfiguration.ServiceBusConnectionSecretName, IsSessionsEnabled = true)] string messageBrokerMessageString, FunctionContext context)
{
//CODE
}
消息插入:
for (int chunkIndex = 0; chunkIndex < chunks.Count(); chunkIndex++)
{
string workerNodeName = _workerManager.GetWorkerNodeName(controlMetadataEntity.GetBlobMigrationStrategyIdentifier(), Convert.ToUInt32(chunkIndex), controlMetadataEntity.RowKey);
foreach (BlobMetadataEntity blob in chunks.ElementAt(chunkIndex))
{
ServiceBusMessage message = new ServiceBusMessage()
{
// The message ID is necessary to prevent duplicate entries in message broker.
// See https://docs.microsoft.com/en-us/azure/service-bus-messaging/duplicate-detection.
MessageId = string.Join('|', blob.PartitionKey, blob.RowKey),
// Body will be the JSON of a object converted to bytes.
Body = new BinaryData(Encoding.UTF8.GetBytes(new BlobExecutionQueueMessage(blob.PartitionKey, blob.RowKey, workerNodeName, null).GetObjectAsJsonAsync())),
ContentType = MediaTypeNames.Application.Json,
// This will separate the message in logic queues to control the parallelism per execution.
// See https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions.
SessionId = workerNodeName
};
// Set user properties to identify this blob in message broker.
message.ApplicationProperties.Add("ExecutionId", blob.PartitionKey);
message.ApplicationProperties.Add("BlobId", blob.RowKey);
// Add the "AddAsync" task to the list of tasks to run them after all
// messages were enqueued in the list.
messageBrokerInsertions.Add(_azureServiceBusClient.SendMessageAsync(serviceBusClient, sender, message));
}
}
我的Host.json
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingExcludedTypes": "Request",
"samplingSettings": {
"isEnabled": true
}
},
"logLevel": {
"Default": "Information"
}
},
"extensions": {
"durableTask": {
"maxConcurrentActivityFunctions": 500,
"maxConcurrentOrchestratorFunctions": 500
},
"serviceBus": {
"sessionHandlerOptions": {
"maxConcurrentSessions": 1
}
}
}
}
据我所知,这是为了尊重与消费者相关的会话顺序而设计的,这有点棘手。您可以增加数量->
"maxConcurrentSessions": 16
这将 运行 它们并行。此外,如果您正在执行消费计划,则不必担心缩放问题,并且队列长度会被计入因式缩放。如果您不关心订购,您可以切换到非会话启用并分批引入消息。请注意此处的行:
https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus?tabs=in-process%2Cextensionv5%2Cextensionv3&pivots=programming-language-powershell#hostjson-settings
"When you set the isSessionsEnabled property or attribute on the trigger to true, the sessionHandlerOptions is honored. When you set the isSessionsEnabled property or attribute on the trigger to false, the messageHandlerOptions is honored."
留下一些参考链接以备阅读:
//队列和主题
https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions
//maxconcurrentsessions
https://docs.microsoft.com/en-us/dotnet/api/microsoft.servicebus.messaging.sessionhandleroptions.maxconcurrentsessions?view=azure-dotnet#microsoft-servicebus-messaging-sessionhandleroptions-maxconcurrentsessions