未处理异常的 MessageLockLostException 结合 Azure 服务总线上的 MassTransit ConfigureDeadLetterQueueErrorTransport

MessageLockLostException on unhandled exception in combination with ConfigureDeadLetterQueueErrorTransport for MassTransit on Azure Service Bus

我正在尝试使用 Azure 服务总线设置 MassTransit。我想使用 DLQ 而不是依赖 _skipped_error 队列,但我不确定我是否做错了。

这是我的设置的简化版本:

services.AddMassTransit(busConfigurator =>
{
    busConfigurator.AddConsumer<MessageConsumer>();
    busConfigurator.UsingAzureServiceBus((context, serviceBusBusFactoryConfigurator) =>
    {
        serviceBusBusFactoryConfigurator.Host(connectionString);

        serviceBusBusFactoryConfigurator.SubscriptionEndpoint<Message>(
            "message-subscription",
            configurator =>
            {
                configurator.ConfigureDeadLetterQueueErrorTransport();
                configurator.ConfigureDeadLetterQueueDeadLetterTransport();

                configurator.ConfigureConsumer<MessageConsumer>(context);
            });
        
    });
});

services.AddMassTransitHostedService(true);


---


public class Message
{
    public string Text { get; set; }
}

public class MessageConsumer : IConsumer<Message>
{
    public Task Consume(ConsumeContext<Message> context)
    {
        throw new Exception("Hello");
    }
}

发布消息 (await _bus.Publish<Message>(new {})) 时,我在日志中看到类似这样的内容:

[15:19:22 | DBG | MassTransit] Create send transport: sb://mytestbus.servicebus.windows.net/MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message--
[15:19:22 | DBG | MassTransit] Topic: MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message-- ()
[15:19:22 | DBG | MassTransit] Topic: MassTransit/Fault ()
[15:19:22 | DBG | MassTransit] Subscription Fault-MassTransit (MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message-- -> sb://mytestbus.servicebus.windows.net/MassTransit/Fault)
[15:19:22 | DBG | MassTransit] SEND sb://mytestbus.servicebus.windows.net/MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message-- 09c40000-5dcc-0015-85cf-08d9988339b8 MassTransit.Fault<Acme.Common.Messaging.Contracts.Events.Message>
[15:19:22 | ERR | MassTransit] R-FAULT sb://mytestbus.servicebus.windows.net/Acme.Common.Messaging.Contracts.Events/Message/Subscriptions/message-subscription 09c40000-5dcc-0015-5c52-08d998833786 Acme.Common.Messaging.Contracts.Events.Message Acme.Services.UpdateService.MessageConsumer(00:00:03.0423556)
System.Exception: Hello
   at Acme.Services.UpdateService.MessageConsumer.Consume(ConsumeContext`1 context) in C:\Development\acme\src\Acme.Services.UpdateService\Startup.cs:line 503
   at MassTransit.Pipeline.Filters.MethodConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumerConsumeContext<TConsumer,TMessage>>.Send(ConsumerConsumeContext`2 context, IPipe`1 next)
   at GreenPipes.Pipes.LastPipe`1.Send(TContext context)
   at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
   at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
[15:19:23 | WRN | MassTransit] Message Lock Lost: 09c400005dcc00155c5208d998833786
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DeadLetterAsync(String lockToken, IDictionary`2 propertiesToModify)
   at MassTransit.Azure.ServiceBus.Core.Contexts.ReceiverClientMessageLockContext.DeadLetter(Exception exception)
   at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
[15:19:23 | WRN | MassTransit] Exception on Receiver sb://mytestbus.servicebus.windows.net/Acme.Common.Messaging.Contracts.Events/Message/Subscriptions/message-subscription during UserCallback ActiveDispatchCount(0) ErrorRequiresRecycle(False)
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DeadLetterAsync(String lockToken, IDictionary`2 propertiesToModify)
   at MassTransit.Azure.ServiceBus.Core.Contexts.ReceiverClientMessageLockContext.DeadLetter(Exception exception)
   at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
   at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
   at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
   at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
   at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
   at MassTransit.Azure.ServiceBus.Core.Contexts.SubscriptionClientContext.<>c__DisplayClass16_0.<<OnMessageAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at Microsoft.Azure.ServiceBus.MessageReceivePump.MessageDispatchTask(Message message)

这是设计使然还是我做错了什么?我本以为消息会被移动到 DLQ,没有任何记录的警告(除了我抛出的异常)。

如果我取消注释 ConfigureDeadLetterQueueErrorTransport,那么我将不再获得 MessageLockLostException

使用地铁 7.2.3

对于订阅端点,DLQ 是默认行为。

您配置的内容可能与默认配置冲突。如果去掉以下几行,它应该会按预期工作:

configurator.ConfigureDeadLetterQueueErrorTransport();
configurator.ConfigureDeadLetterQueueDeadLetterTransport();

You can see that those same lines are already configured for subscription endpoints.

仅需要这些行来配置接收端点以使用 DLQ。