未处理异常的 MessageLockLostException 结合 Azure 服务总线上的 MassTransit ConfigureDeadLetterQueueErrorTransport
MessageLockLostException on unhandled exception in combination with ConfigureDeadLetterQueueErrorTransport for MassTransit on Azure Service Bus
我正在尝试使用 Azure 服务总线设置 MassTransit。我想使用 DLQ 而不是依赖 _skipped
和 _error
队列,但我不确定我是否做错了。
这是我的设置的简化版本:
services.AddMassTransit(busConfigurator =>
{
busConfigurator.AddConsumer<MessageConsumer>();
busConfigurator.UsingAzureServiceBus((context, serviceBusBusFactoryConfigurator) =>
{
serviceBusBusFactoryConfigurator.Host(connectionString);
serviceBusBusFactoryConfigurator.SubscriptionEndpoint<Message>(
"message-subscription",
configurator =>
{
configurator.ConfigureDeadLetterQueueErrorTransport();
configurator.ConfigureDeadLetterQueueDeadLetterTransport();
configurator.ConfigureConsumer<MessageConsumer>(context);
});
});
});
services.AddMassTransitHostedService(true);
---
public class Message
{
public string Text { get; set; }
}
public class MessageConsumer : IConsumer<Message>
{
public Task Consume(ConsumeContext<Message> context)
{
throw new Exception("Hello");
}
}
发布消息 (await _bus.Publish<Message>(new {})
) 时,我在日志中看到类似这样的内容:
[15:19:22 | DBG | MassTransit] Create send transport: sb://mytestbus.servicebus.windows.net/MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message--
[15:19:22 | DBG | MassTransit] Topic: MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message-- ()
[15:19:22 | DBG | MassTransit] Topic: MassTransit/Fault ()
[15:19:22 | DBG | MassTransit] Subscription Fault-MassTransit (MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message-- -> sb://mytestbus.servicebus.windows.net/MassTransit/Fault)
[15:19:22 | DBG | MassTransit] SEND sb://mytestbus.servicebus.windows.net/MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message-- 09c40000-5dcc-0015-85cf-08d9988339b8 MassTransit.Fault<Acme.Common.Messaging.Contracts.Events.Message>
[15:19:22 | ERR | MassTransit] R-FAULT sb://mytestbus.servicebus.windows.net/Acme.Common.Messaging.Contracts.Events/Message/Subscriptions/message-subscription 09c40000-5dcc-0015-5c52-08d998833786 Acme.Common.Messaging.Contracts.Events.Message Acme.Services.UpdateService.MessageConsumer(00:00:03.0423556)
System.Exception: Hello
at Acme.Services.UpdateService.MessageConsumer.Consume(ConsumeContext`1 context) in C:\Development\acme\src\Acme.Services.UpdateService\Startup.cs:line 503
at MassTransit.Pipeline.Filters.MethodConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumerConsumeContext<TConsumer,TMessage>>.Send(ConsumerConsumeContext`2 context, IPipe`1 next)
at GreenPipes.Pipes.LastPipe`1.Send(TContext context)
at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
[15:19:23 | WRN | MassTransit] Message Lock Lost: 09c400005dcc00155c5208d998833786
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DeadLetterAsync(String lockToken, IDictionary`2 propertiesToModify)
at MassTransit.Azure.ServiceBus.Core.Contexts.ReceiverClientMessageLockContext.DeadLetter(Exception exception)
at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
[15:19:23 | WRN | MassTransit] Exception on Receiver sb://mytestbus.servicebus.windows.net/Acme.Common.Messaging.Contracts.Events/Message/Subscriptions/message-subscription during UserCallback ActiveDispatchCount(0) ErrorRequiresRecycle(False)
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DeadLetterAsync(String lockToken, IDictionary`2 propertiesToModify)
at MassTransit.Azure.ServiceBus.Core.Contexts.ReceiverClientMessageLockContext.DeadLetter(Exception exception)
at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
at MassTransit.Azure.ServiceBus.Core.Contexts.SubscriptionClientContext.<>c__DisplayClass16_0.<<OnMessageAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at Microsoft.Azure.ServiceBus.MessageReceivePump.MessageDispatchTask(Message message)
这是设计使然还是我做错了什么?我本以为消息会被移动到 DLQ,没有任何记录的警告(除了我抛出的异常)。
如果我取消注释 ConfigureDeadLetterQueueErrorTransport
,那么我将不再获得 MessageLockLostException
。
使用地铁 7.2.3
对于订阅端点,DLQ 是默认行为。
您配置的内容可能与默认配置冲突。如果去掉以下几行,它应该会按预期工作:
configurator.ConfigureDeadLetterQueueErrorTransport();
configurator.ConfigureDeadLetterQueueDeadLetterTransport();
You can see that those same lines are already configured for subscription endpoints.
仅需要这些行来配置接收端点以使用 DLQ。
我正在尝试使用 Azure 服务总线设置 MassTransit。我想使用 DLQ 而不是依赖 _skipped
和 _error
队列,但我不确定我是否做错了。
这是我的设置的简化版本:
services.AddMassTransit(busConfigurator =>
{
busConfigurator.AddConsumer<MessageConsumer>();
busConfigurator.UsingAzureServiceBus((context, serviceBusBusFactoryConfigurator) =>
{
serviceBusBusFactoryConfigurator.Host(connectionString);
serviceBusBusFactoryConfigurator.SubscriptionEndpoint<Message>(
"message-subscription",
configurator =>
{
configurator.ConfigureDeadLetterQueueErrorTransport();
configurator.ConfigureDeadLetterQueueDeadLetterTransport();
configurator.ConfigureConsumer<MessageConsumer>(context);
});
});
});
services.AddMassTransitHostedService(true);
---
public class Message
{
public string Text { get; set; }
}
public class MessageConsumer : IConsumer<Message>
{
public Task Consume(ConsumeContext<Message> context)
{
throw new Exception("Hello");
}
}
发布消息 (await _bus.Publish<Message>(new {})
) 时,我在日志中看到类似这样的内容:
[15:19:22 | DBG | MassTransit] Create send transport: sb://mytestbus.servicebus.windows.net/MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message--
[15:19:22 | DBG | MassTransit] Topic: MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message-- ()
[15:19:22 | DBG | MassTransit] Topic: MassTransit/Fault ()
[15:19:22 | DBG | MassTransit] Subscription Fault-MassTransit (MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message-- -> sb://mytestbus.servicebus.windows.net/MassTransit/Fault)
[15:19:22 | DBG | MassTransit] SEND sb://mytestbus.servicebus.windows.net/MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message-- 09c40000-5dcc-0015-85cf-08d9988339b8 MassTransit.Fault<Acme.Common.Messaging.Contracts.Events.Message>
[15:19:22 | ERR | MassTransit] R-FAULT sb://mytestbus.servicebus.windows.net/Acme.Common.Messaging.Contracts.Events/Message/Subscriptions/message-subscription 09c40000-5dcc-0015-5c52-08d998833786 Acme.Common.Messaging.Contracts.Events.Message Acme.Services.UpdateService.MessageConsumer(00:00:03.0423556)
System.Exception: Hello
at Acme.Services.UpdateService.MessageConsumer.Consume(ConsumeContext`1 context) in C:\Development\acme\src\Acme.Services.UpdateService\Startup.cs:line 503
at MassTransit.Pipeline.Filters.MethodConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumerConsumeContext<TConsumer,TMessage>>.Send(ConsumerConsumeContext`2 context, IPipe`1 next)
at GreenPipes.Pipes.LastPipe`1.Send(TContext context)
at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
[15:19:23 | WRN | MassTransit] Message Lock Lost: 09c400005dcc00155c5208d998833786
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DeadLetterAsync(String lockToken, IDictionary`2 propertiesToModify)
at MassTransit.Azure.ServiceBus.Core.Contexts.ReceiverClientMessageLockContext.DeadLetter(Exception exception)
at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
[15:19:23 | WRN | MassTransit] Exception on Receiver sb://mytestbus.servicebus.windows.net/Acme.Common.Messaging.Contracts.Events/Message/Subscriptions/message-subscription during UserCallback ActiveDispatchCount(0) ErrorRequiresRecycle(False)
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DeadLetterAsync(String lockToken, IDictionary`2 propertiesToModify)
at MassTransit.Azure.ServiceBus.Core.Contexts.ReceiverClientMessageLockContext.DeadLetter(Exception exception)
at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
at MassTransit.Azure.ServiceBus.Core.Contexts.SubscriptionClientContext.<>c__DisplayClass16_0.<<OnMessageAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at Microsoft.Azure.ServiceBus.MessageReceivePump.MessageDispatchTask(Message message)
这是设计使然还是我做错了什么?我本以为消息会被移动到 DLQ,没有任何记录的警告(除了我抛出的异常)。
如果我取消注释 ConfigureDeadLetterQueueErrorTransport
,那么我将不再获得 MessageLockLostException
。
使用地铁 7.2.3
对于订阅端点,DLQ 是默认行为。
您配置的内容可能与默认配置冲突。如果去掉以下几行,它应该会按预期工作:
configurator.ConfigureDeadLetterQueueErrorTransport();
configurator.ConfigureDeadLetterQueueDeadLetterTransport();
You can see that those same lines are already configured for subscription endpoints.
仅需要这些行来配置接收端点以使用 DLQ。