Masstransit :: IConsumer<Fault<T>> 无法使用故障消息并创建额外的新故障主题

Mass Transit :: IConsumer<Fault<T>> not able to consume the fault message and additional new Fault Topic is created

我是 Masstransit 的新手,我已经开始使用 AWS SQS 创建 pub/Sub 的小测试用例,并在出现任何异常时消耗故障并执行可恢复的操作。

 Registration and  Receive End point configuration 

public void ConfigureServices(IServiceCollection 服务) { services.AddControllers();

        services.AddMassTransit(config => {

            config.AddConsumer<OrderConsumer>();
            config.AddConsumer<OrderFaultConsumer>();
            //config.AddConsumer<OrderAccepted>();

            config.UsingAmazonSqs((ctx, cfg) => {
                cfg.Host("ap-southeast-2",
                 h =>
                 {
                     h.AccessKey("AccessKey");
                     h.SecretKey("SecretKey");
    
                 });

                cfg.ReceiveEndpoint("order-queue", c => {

                    //Don't create a new topic
                    c.ConfigureConsumeTopology = false;
                    c.Subscribe("Order", s =>
                    {
                    });
                    c.UseMessageRetry(r=>r.Immediate(3));
                    c.ConfigureConsumer<OrderConsumer>(ctx);
                    c.ConfigureConsumer<OrderFaultConsumer>(ctx);
                  
                });
                services.AddMassTransitHostedService();

示例发布代码 :

 public async Task<IActionResult> Post([FromBody] OrderDto order)
    {

        await publishEndpoint.Publish<IOrder>(new Order()
        {
            Message="Fault",
            order=order
        });

        return Ok();
    } 

OrderConsumer

   public class OrderConsumer : IConsumer<IOrder>
{
    private readonly ILogger<OrderConsumer> logger;

    public OrderConsumer(ILogger<OrderConsumer> logger)
    {
        this.logger = logger;
    }

    /*
     MassTransit delivers messages to consumers by calling the Consume method */
    public async Task Consume(ConsumeContext<IOrder> context)
    {
        if(context.Message.Message=="Fault")
         throw new Exception("Very bad things happened");

        await Console.Out.WriteLineAsync(context.Message.Message);
        
    }
}

OrderFaultConsumer

  public class OrderFaultConsumer : IConsumer<Fault<IOrder>>
{

    private readonly ILogger<OrderFaultConsumer> _logger;

    public OrderFaultConsumer(ILogger<OrderFaultConsumer> logger)
    {
        _logger = logger;
    }


    public async Task Consume(ConsumeContext<Fault<IOrder>> context)
    {

        var originalMessage = context.Message.Message.order;
        var exceptions = context.Message.Exceptions;

        //Do something interesting
        await Console.Out.WriteLineAsync($"discarding message:{JsonConvert.SerializeObject(originalMessage)}");
    }
}

预期是:

  1. 应创建“订单”主题
  2. "Order_Queue" 应创建订阅订单主题。
  3. 如果发布成功且消息(输入)!=Fault,则应调用“OrderConsumer”。
  4. 如果Message(input)=="Fault"则调用"OrderConsumer",立即重试3次后抛异常,调用OrderFaultConsumer处理fault

实际:

  1. 已创建主题
  2. 队列已创建和订阅
  3. 成功场景已通过。
  4. 场景失败,创建了新的 FaultTopic(不是预期的)并且从未调用或使用 OrderFaultConsumer。 5.Order_Queue_error 已创建并推送故障消息(不是预期的)。

请指教

这里有很多东西要打开,所以和我在一起。

  1. 对,发布消息时,会根据发布的消息类型创建主题。在这种情况下,名称是根据 IOrder 类型生成的。如果要更改主题的实体名称,可以将 [EntityName("order")] 添加到消息类型,或在总线拓扑中指定自定义实体名称格式化程序。

  2. 消费者服务启动时,会创建order-queue队列。到目前为止,还不错。

  3. 是的,发布了确切的消息类型,消费者得到了它的副本。这实际上是一个难题,因为:

    1. 指定c.ConfigureConsumeTopology = false;,启动消费者服务既不会创建主题,也不会将主题绑定到队列。所以这必须是在添加此行之前或手动完成的。

    2. 由于使用了Subscribe("Order", ...),特别是Order的名称,主题名称与发布的消息类型不匹配。由于 SNS 不支持多态消息路由,发布 IOrder 不会使用 Order 主题,因此您要么使用速记,要么将该类型的实体名称更改为 Order.

  4. 当消费者抛出异常时,MassTransit will generate一个Fault<IOrder>将被发布到匹配的主题。通用类型将始终匹配使用的消息类型(而不是发布的消息类型)。

    1. 这会导致创建一个主题(如果它尚不存在)。

    2. 至于您的消费者,由于您指定了 c.ConfigureConsumeTopology = false;,接收端点既不会在启动时创建主题,也不会创建对该主题的订阅。结果是您的 OrderFaultConsumer 将不会收到已发布的故障消息。

    3. 不鼓励在同一个接收端点上同时配置消费者和错误消费者。。我建议为故障消费者使用单独的队列。

    4. 创建order-queue_error队列是因为MassTransit将所有故障消息移至错误队列以防止消息丢失,以便消息可以问题解决后移回队列。