使用持久函数推送到服务总线的消息计数不可靠
Unreliable count of messages pushed to Service Bus using Durable Functions
我在使用 Durable azure 函数向 azure 服务总线提交消息时遇到了这个奇怪的问题。
我的代码是一个简单的扇出实现
- REST 触发器获取要提交的消息数并交给编排器。
- Orchestrator 存储调用 activity,这将创建消息并将消息提交到服务总线。
问题是当我发送要求添加 3000 条消息的 REST 参数时,添加了 3000 多条消息。
更糟糕的是,它也不是同一个数字——3104、3100、3286 任何东西……
查看下面的代码:
[FunctionName("Function1_HttpStart")]
//public static async Task<HttpResponseMessage> HttpStart(
public static async Task<IActionResult> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequest req,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
String type = req.Query["type"];
if(!long.TryParse(req.Query["count"], out var count))
{
return new ObjectResult($"Parse failed for parameter 'count' ({req.Query["count"]}) to Int.") { StatusCode = 400};
}
var restInputs = new RestInputs()
{ Type = type, Count = count };
// Function input comes from the request content.
string instanceId = await starter.StartNewAsync
("EmailQueueSubmitter_OrchestratorSingleton"
, restInputs);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
return starter.CreateCheckStatusResponse(req, instanceId);
}
[FunctionName("EmailQueueSubmitter_OrchestratorSingleton")]
public static async Task<List<string>> EmailQueueSubmitter_OrchestratorSingleton(
[OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
{
var outputs = new List<string>();
try
{
var restInputs = context.GetInput<RestInputs>();
var parallelTasks = new List<Task>();
long runBatchLen;
long i_batch, i_iter, batchCount = 0;
for (i_batch = 0; i_batch < restInputs.Count; i_batch++)
{
parallelTasks.Add(context.CallActivityAsync("EmailQueueSubmitter_ActivitySendMessageBatchSingleton", i_batch.ToString()));
log.LogWarning($"Message {i_batch} Added");
}
log.LogWarning($"Awaiting {parallelTasks.Count} tasks");
await Task.WhenAll(parallelTasks);
var doneTaskCount = parallelTasks.Where(t => t.IsCompleted).ToList().Count;
var successTaskCount = parallelTasks.Where(t => t.IsCompletedSuccessfully).ToList().Count;
var faultedTaskCount = parallelTasks.Where(t => t.IsFaulted).ToList().Count;
var exceptionTaskCount = parallelTasks.Where(t => t.Exception != null).ToList().Count;
log.LogWarning($"Done:{doneTaskCount}, Success: {successTaskCount}, Fault:{faultedTaskCount}, Exception:{exceptionTaskCount}");
log.LogWarning($"Acheived Completion.");
}
catch (Exception ex)
{
log.LogError(ex.Message);
throw new InvalidOperationException(ex.Message);
}
return outputs;
}
[FunctionName("EmailQueueSubmitter_ActivitySendMessageBatchSingleton")]
public static async Task EmailQueueSubmitter_ActivitySendMessageBatchSingleton([ActivityTrigger] IDurableActivityContext activityContext, ILogger log)
{
log.LogWarning($"Starting Activity.");
var payload = activityContext.GetInput<String>();
await ServiceBus_Sender.SendMessageBatch(payload);
log.LogWarning($"Finished Activity.");
}
public static ServiceBusMessage CreateMessage(String Payload)
{
try
{
var sbMsg = new ServiceBusMessage(Payload)
{
MessageId = Guid.NewGuid().ToString(),
ContentType = "text/plain"
};
//sbMsg.ApplicationProperties.Add("RequestType", "Publish");
return sbMsg;
}
catch (Exception ex)
{
throw new InvalidOperationException(ex.Message, ex);
}
}
感谢@Camilo Terevinto 提供的信息,我正在将其转换为答案,以便帮助其他社区成员:
正如评论中所建议的那样,对于 运行 重复检查,您可以生成一个 Guid 并将其与数据一起发送,然后检查 Guid 以前没有处理过。希望这能解决您的问题。
OP 编辑: 通过将服务总线队列更改为启用会话并启用重复数据删除来启用重复检查。提交的消息的 MessageId 在每个会话中设置为唯一。这是我能想到的处理 at-least-once 保证的唯一方法...
我在使用 Durable azure 函数向 azure 服务总线提交消息时遇到了这个奇怪的问题。
我的代码是一个简单的扇出实现
- REST 触发器获取要提交的消息数并交给编排器。
- Orchestrator 存储调用 activity,这将创建消息并将消息提交到服务总线。
问题是当我发送要求添加 3000 条消息的 REST 参数时,添加了 3000 多条消息。
更糟糕的是,它也不是同一个数字——3104、3100、3286 任何东西……
查看下面的代码:
[FunctionName("Function1_HttpStart")]
//public static async Task<HttpResponseMessage> HttpStart(
public static async Task<IActionResult> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequest req,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
String type = req.Query["type"];
if(!long.TryParse(req.Query["count"], out var count))
{
return new ObjectResult($"Parse failed for parameter 'count' ({req.Query["count"]}) to Int.") { StatusCode = 400};
}
var restInputs = new RestInputs()
{ Type = type, Count = count };
// Function input comes from the request content.
string instanceId = await starter.StartNewAsync
("EmailQueueSubmitter_OrchestratorSingleton"
, restInputs);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
return starter.CreateCheckStatusResponse(req, instanceId);
}
[FunctionName("EmailQueueSubmitter_OrchestratorSingleton")]
public static async Task<List<string>> EmailQueueSubmitter_OrchestratorSingleton(
[OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
{
var outputs = new List<string>();
try
{
var restInputs = context.GetInput<RestInputs>();
var parallelTasks = new List<Task>();
long runBatchLen;
long i_batch, i_iter, batchCount = 0;
for (i_batch = 0; i_batch < restInputs.Count; i_batch++)
{
parallelTasks.Add(context.CallActivityAsync("EmailQueueSubmitter_ActivitySendMessageBatchSingleton", i_batch.ToString()));
log.LogWarning($"Message {i_batch} Added");
}
log.LogWarning($"Awaiting {parallelTasks.Count} tasks");
await Task.WhenAll(parallelTasks);
var doneTaskCount = parallelTasks.Where(t => t.IsCompleted).ToList().Count;
var successTaskCount = parallelTasks.Where(t => t.IsCompletedSuccessfully).ToList().Count;
var faultedTaskCount = parallelTasks.Where(t => t.IsFaulted).ToList().Count;
var exceptionTaskCount = parallelTasks.Where(t => t.Exception != null).ToList().Count;
log.LogWarning($"Done:{doneTaskCount}, Success: {successTaskCount}, Fault:{faultedTaskCount}, Exception:{exceptionTaskCount}");
log.LogWarning($"Acheived Completion.");
}
catch (Exception ex)
{
log.LogError(ex.Message);
throw new InvalidOperationException(ex.Message);
}
return outputs;
}
[FunctionName("EmailQueueSubmitter_ActivitySendMessageBatchSingleton")]
public static async Task EmailQueueSubmitter_ActivitySendMessageBatchSingleton([ActivityTrigger] IDurableActivityContext activityContext, ILogger log)
{
log.LogWarning($"Starting Activity.");
var payload = activityContext.GetInput<String>();
await ServiceBus_Sender.SendMessageBatch(payload);
log.LogWarning($"Finished Activity.");
}
public static ServiceBusMessage CreateMessage(String Payload)
{
try
{
var sbMsg = new ServiceBusMessage(Payload)
{
MessageId = Guid.NewGuid().ToString(),
ContentType = "text/plain"
};
//sbMsg.ApplicationProperties.Add("RequestType", "Publish");
return sbMsg;
}
catch (Exception ex)
{
throw new InvalidOperationException(ex.Message, ex);
}
}
感谢@Camilo Terevinto 提供的信息,我正在将其转换为答案,以便帮助其他社区成员:
正如评论中所建议的那样,对于 运行 重复检查,您可以生成一个 Guid 并将其与数据一起发送,然后检查 Guid 以前没有处理过。希望这能解决您的问题。
OP 编辑: 通过将服务总线队列更改为启用会话并启用重复数据删除来启用重复检查。提交的消息的 MessageId 在每个会话中设置为唯一。这是我能想到的处理 at-least-once 保证的唯一方法...