MassTransit RabbitMQ 将错误队列上的一半消耗消息移动到 Error_Skipped 队列

MassTransit RabbitMQ moving half of Consumed messages on Error Queue to Error_Skipped Queue

我正在尝试将消息从错误队列移回错误队列。 为此,我在错误队列上创建了一个消费者,然后将其发布到必需队列。 当我尝试这样做时,一半的消费消息被发布,但另一半被发送到 Error_Skipped 队列。

我尝试了很多东西都没有成功,所以我可能缺少一些简单的东西。

以下是我的代码示例:

public class ClaimsMessage
{
    public string Description { get; set; }

    public DateTime Date { get; set; }

    public bool Handled { get; set; }
}

public class ClaimsMessageErrorConsumer : IConsumer<Fault<ClaimsMessage>>
{
    public async Task Consume(ConsumeContext<Fault<ClaimsMessage>> context)
    {
        try
        {
            await context.Publish<ClaimsMessage>(context.Message.Message);

        }
        catch (Exception e)
        {
            string error = e.Message;
        }
    }
}

public static IBusControl CreateClaimsErrorConsumerBus(string endPoint)
{
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.ReceiveEndpoint(host, endPoint, e =>
        {
            e.Consumer(() => new ClaimsMessageErrorConsumer());
        });
    });
    return busControl;
}

如果您将消息从错误 queue 移回处理 queue,则不应调用 Publish -- 这会将消息重新发送给所有订阅者。您已经知道 queue 个名字,所以直接将消息发送回 queue。您所看到的是,您已经针对错误 queue 创建了一个使用者,它为该消息创建了一个交换绑定。

所以,改为这样做:

sbc.ReceiveEndpoint("input_error", x =>
{
    // this prevents extra message bindings from being created
    x.BindMessageExchanges = false;

    x.Consumer<MyMover>(() => new MyMover(inputQueueAddress);
});

public class MyMover : 
    IConsumer<ClaimsMessage>
{
    public async Task Consume(ConsumeContext<ClaimsMessage> context)
    {
        try
        {
            var endpoint = await context.GetSendEndpoint(_inputQueueAddress);
            await endpoint.Send<ClaimsMessage>(context.Message);
        }
        catch (Exception e)
        {
            string error = e.Message;
        }
    }
}

如需加分,请复制原始消息 headers 以保留消息的保真度。