如果启动多个并发请求,Azure 服务总线上的重复消息

Duplicate messages on the Azure Service bus if multiple concurrent requests are started

我使用 .NET 6.0 API 进行了一些关于将消息写入 Azure 服务总线(主题)的测试。
在我的测试场景中,我的 .NET 6.0 API 一次收到 10 个请求 .
每个请求都应将带有唯一标识符的消息写入 Azure 服务总线。 为了让我的 API 响应更快,我从一种非常天真的方法开始,并在每次调用中创建一个额外的任务来执行每个操作,如下所示:

    message.Name = {SOME UNIQUE ID OF MY CALL}
    var task = new Task(async () =>
    {
        var messageObject = new { message.Name, message.Body };
        var messageJson = JsonConvert.SerializeObject(messageObject);
        ServiceBusSender sender = _serviceBusClient.CreateSender(_topicName);
        await sender.SendMessageAsync(new ServiceBusMessage(messageJson));
    });
    task.Start();

现在用户无需等待消息发送,而是在消息成功上车前直接收到积极响应。

我的预期:

所以如果我有那些​​独特的 ids:

0、1、2、3、4、5、6、7、8、9

总线最终总是收到十条消息,但有些 uid 是双倍的,有些则根本没有发送,例如:

0, 0, 0, 1, 4, 5, 6, 6, 9, 9

如果不是同时发送所有呼叫,但按顺序我得到:

0, 1, 2, 3, 4, 5, 6, 7, 8, 9(有道理)

我现在有两个具体问题:

有什么想法吗?

顺便说一句:在我的生产场景中,我不是这样做的。我正在使用“BackgroundService”来执行这样的后台作业。

当您调用 Task.Start 时,您的任务不一定会立即执行,它会被安排并 运行 在将来的某个 non-deterministic 时间点执行。当它执行时,它将捕获 message.Name 的当前值,而不是您创建任务时存在的值。

要在创建任务时捕获当前 message 实例,您需要使用类似于以下的形式:

    message.Name = {SOME UNIQUE ID OF MY CALL}

    var task = Task.Factory.StartNew(async state =>
    {
        var messageParam = (MessgeType)state;
        var messageObject = new { messageParam.Name, messageParam.Body };
        var messageJson = JsonConvert.SerializeObject(messageObject);
        
        await using var sender = _serviceBusClient.CreateSender(_topicName);        
        await sender.SendMessageAsync(new ServiceBusMessage(messageJson));

    }, message);

Why are always all messages transferred to the bus, even though the main Task of the request ends before the inner task ends?

当您调用 Task.Start 时,您正在排队执行任务,其中队列由与线程池相关的构造拥有。这些不受代码生命周期的限制,它们是 运行 时间的一部分。您的 task 变量不再在范围内,但它引用的实例仍由 运行time 持有。

它确实会让您处于未处理和未观察到异常的位置。根据它们发生的时间、严重程度和主机环境,在某些极端情况下,异常可能会使主机进程崩溃 and/or 影响线程池的吞吐量,您可能不明白为什么。

ServiceBusSender sender = _serviceBusClient.CreateSender(_topicName);

这一行也有问题。您正在为同一主题的每个操作创建一个新的发件人,这将执行不佳并增加资源使用。 ServiceBusSender 旨在 long-lived 并重复使用。我强烈建议在发布到 _topicName.

时缓存并重复使用您的发件人

您也没有关闭或处置发件人。这将使它的 AMQP link 打开并不必要地消耗网络资源,直到它闲置为止。即便如此,您也会有待处理的 clean-up 等待终结器。