MassTransit Azure 总线 PrefetchCount 必须为 1

MassTransit Azure bus PrefetchCount must be 1

使用带有 Azure 服务总线的 MassTransit 作为传输。我被迫始终将 PrefetchCount 设置为 1,否则消费者将多次从队列中拉出相同的消息,这时我会收到消息锁定异常。为什么我的消费者会提取相同的消息并尝试多次处理它?我怎样才能避免这种行为以允许我的消费者同时处理多条消息(只是不同的消息)?

2020-01-03 10:30:31 ERR Exception on Receiver sb://**queueuri**/create-access-points during "RenewLock"
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. Reference:2e0c6c96-a0e1-40e2-9f95-1930aad9744b, TrackingId:37d9c13c-5bb4-4c19-9853-21f9ef06c7dd_B31, SystemTracker:qesi-local-queue:Queue:create-access-points, Timestamp:2020-01-03T16:31:08

强制在我的 Consumer 中设置 PrefetchCount

public class CreateAccessPointCommandHandlerDef : ConsumerDefinition<CreateAccessPointCommandHandler>
{
    public CreateAccessPointCommandHandlerDef()
    {
        EndpointName = "create-access-points";
        ConcurrentMessageLimit = 4;
    }

    protected override void ConfigureConsumer(
        IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<CreateAccessPointCommandHandler> consumerConfigurator
    )
    {
        if (endpointConfigurator is IServiceBusEndpointConfigurator)
        {
            (endpointConfigurator as IServiceBusEndpointConfigurator).LockDuration = TimeSpan.FromMinutes(5);
            (endpointConfigurator as IServiceBusEndpointConfigurator).MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
            (endpointConfigurator as IServiceBusEndpointConfigurator).PrefetchCount = 1;
        }
    }

这是我的日志,显示我在收到消息后不久收到锁定异常。该过程最终成功完成,然后它再次尝试处理相同的消息。

2020-01-06 10:19:38 DBG Received CreateAccessPoints message (ID: 14020000-5d08-0015-ac23-08d792c43254 for ProviderNetworkId 17 and ProviderNetworkVersionId 1014.
info: Worker[0] Worker running at: 01/06/2020 10:19:52 -06:00
2020-01-06 10:19:52 INF Worker running at: 01/06/2020 10:19:52 -06:00
info: Worker[0] Worker running at: 01/06/2020 10:20:12 -06:00
2020-01-06 10:20:13 INF Worker running at: 01/06/2020 10:20:12 -06:00
fail: MassTransit[0]
  Exception on Receiver sb://**azuresb** during RenewLock
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. Reference:9470f437-5c5f-4280-aa32-0f1d97ac7c50, TrackingId:89f135d1-69ce-4c08-b80d-247d3c17eb76_B1, SystemTracker:**azuresb**:Queue:create-access-points, Timestamp:2020-01

这是我启动时的日志,其中是否有任何错误?

2020-01-06 10:43:05 DBG Topic: "qesi.Core.ProviderNetwork.Commands/CreateAccessPointsCommand" ("")
2020-01-06 10:43:05 DBG Queue: "create-access-points" ("auto-delete: 29247y1M2w2h48m5s477ms")
2020-01-06 10:43:05 DBG Subscription "create-access-points" ("Commands/CreateAccessPointsCommand" -> "sb://**azuresb**/create-access-points")
2020-01-06 10:43:05 DBG Creating message receiver for sb://qesi-local-queue.servicebus.windows.net/create-access-points
2020-01-06 10:43:06 DBG Creating queue "QA1271_****_bus_nobyyyn7byybmbdabdm3ft598r"
2020-01-06 10:43:07 DBG Queue: "QA1271_****_bus_nobyyyn7byybmbdabdm3ft598r" ("dead letter, auto-delete: 5m")

最后是我的总线创建代码:

        return Bus.Factory.CreateUsingAzureServiceBus(cfg =>
        {
            cfg.Host(_config.ServiceBusUri, host => {
                host.SharedAccessSignature(s =>
                {
                    s.KeyName = _config.KeyName;
                    s.SharedAccessKey = _config.SharedAccessKey;
                    s.TokenTimeToLive = TimeSpan.FromDays(1);
                    s.TokenScope = TokenScope.Namespace;
                });
            });
            cfg.SetLoggerFactory(_logFactory);
            cfg.ConfigureEndpoints(_provider);
            cfg.UseMessageData(_messageDataRepository);
            cfg.UseMessageRetry(retry => retry.Immediate(2));
        });

您在这里混淆了几件与 MassTransit 工作方式无关的事情。 Azure 服务总线将消息传递给消费者。消息被锁定一段时间。如果一条消息在给定的时间段内没有完成、死信或取消,它将失去它的锁,另一个竞争的消费者将获得它进行重新处理。如果一条消息多次失去锁定,特别是 MaxDeliveryCount 次,该消息将被代理自动设为死信。由于您为 MaxAutoRenewDuration 分配了 30 分钟,MassTransit 将尝试 更新锁定并将处理时间最多延长 30 分钟。除此之外,这将是同一件事——锁将丢失,消息将被提供给另一个竞争消费者。检查您的消息处理需要多长时间。

PrefetchCount 是不同的设置。这意味着“预取了多少消息,这样客户端就不必每次都显式地发送消息。在你的情况下,这是一条额外的消息,只要 MassTransit requested/received 消息就预取了。这设置提高了整体端点吞吐量,而不是并发性。您设置为 4.

原来这个问题是因为我们 运行 我们的 .NET Core Generic Host 在 Continuous WebJob 中。在 WebJob 上下文中 运行 时,Azure 服务总线似乎存在连接问题。它大部分时间都可以工作,然后就会崩溃,失去 Azure 服务总线锁。这让我们进行了大量的试验和错误来追踪。一旦我们在 Docker 容器中部署了通用主机,我们所有的服务总线锁定问题就都消失了。