使用 Azure 函数和服务总线的 MassTransit 异常处理和错误队列

MassTransit exception handling and error queues with Azure Function and Service Bus

我是 运行 Azure Functions v3 中的 MassTransit 7.1.8,使用服务总线作为传输。当消费者第一次抛出异常时,会在我的服务总线命名空间中创建两个主题:masstransit~fault(没有任何订阅)和 masstransit~fault--mynamespace~myevent--(有一个订阅 Fault-MassTransit)。

Azure 函数 Startup.cs

public override void Configure(IFunctionsHostBuilder builder)
{
    builder.Services.AddMassTransitForAzureFunctions(cfg =>
    {
        cfg.AddConsumersFromNamespaceContaining<MyConsumer>();
    });
}

Azure 函数

[FunctionName("MyFunction")]
public async Task Run(
    [ServiceBusTrigger("my-topic", "my-subscription", Connection = "AzureWebJobsServiceBus")] Message message, ILogger log)
{
    try
    {
        await _receiver.HandleConsumer<MyConsumer>("my-topic", "my-subscription", message, default(CancellationToken));
    }
    catch (System.Exception ex)
    {
    }
}

消费者

public class MyConsumer : IConsumer<MyEvent>
{
    public async Task Consume(ConsumeContext<MyEvent> context)
    {
        throw new System.Exception("Very bad things happened inside the consumer.");
    }
}

我还必须在 masstransit~fault 上手动创建订阅才能开始在那里接收消息。

我的问题:

  1. MyFunction 吞下异常以防止将消息发送到 DLQ。这个可以吗?吞下一个异常感觉很时髦,但另一方面,MassTransit 将故障发送到错误队列,这样它就不会丢失。
  2. 文档讨论了创建 _error 队列(前缀适当),但我没有看到任何此类队列或主题,只有 masstransit~faultmasstransit~fault--mynamespace~myevent--。我猜 masstransit~fault--mynamespace~myevent-- 是服务总线等同于 RabbitMQ _error?
  3. masstransit~fault--mynamespace~myevent-- 有消息发送给它,但当我查看订阅时它是空的。订阅有一个接受所有内容的默认过滤器。我是否需要针对该主题进行一些额外的配置?
  4. 有没有办法重命名上面的错误主题(同时可能是 )?

Azure Functions 并没有真正使用 MassTransit 作为交通工具。但是,可以配置某些方面来模仿相同的行为。在队列上设置 DeliveryCount = 1 将阻止 Azure 重新传送消息(这会导致重试五次,默认值为 5)。

回复各种问题:

  1. 这是送货数量,MassTransit 发布了故障。
  2. 您可以配置重试策略(如下图)
  3. 使用 Azure Functions 时没有 _error 队列,因为 MassTransit 不是交通工具。
  4. 除非您为故障配置消费者,否则它不会被消费。
  5. 您可以使用实体名称格式化程序(如下所示)更改故障主题。

为 Azure 函数配置总线重试:

.AddMassTransitForAzureFunctions(cfg =>
{
    cfg.SetKebabCaseEndpointNameFormatter();

    cfg.AddConsumersFromNamespaceContaining<ConsumerNamespace>();
}, (context, cfg) =>
{
    cfg.MessageTopology.SetEntityNameFormatter(new CleanEntityNameFormatter(cfg.MessageTopology.EntityNameFormatter));

    cfg.UseMessageRetry(r => r.Intervals(200, 500, 1000, 2000));
})

With Azure Functions, when retries are exhausted, Azure will move the message to the dead-letter queue (a logical queue that's part of the queue itself, visible in the Azure portal).

指定的实体名称格式化程序将更改错误的主题名称。必须在 每个将要 produce/consume 发生故障的总线实例 上使用,以确保名称匹配。

using MassTransit;
using MassTransit.Internals.Extensions;
using MassTransit.Topology;

public class CleanEntityNameFormatter :
    IEntityNameFormatter
{
    readonly IEntityNameFormatter _entityNameFormatter;

    public CleanEntityNameFormatter(IEntityNameFormatter entityNameFormatter)
    {
        _entityNameFormatter = entityNameFormatter;
    }

    public string FormatEntityName<T>()
    {
        if (typeof(T).ClosesType(typeof(Fault<>), out Type[] types))
        {
            var name = (string) typeof(IEntityNameFormatter)
                .GetMethod("FormatEntityName")
                .MakeGenericMethod(types)
                .Invoke(_entityNameFormatter, Array.Empty<object>());

            var suffix = typeof(T).Name.Split('`').First();

            return $"{name}-{suffix}";
        }

        return _entityNameFormatter.FormatEntityName<T>();
    }
}