异常时重新排队消息

Re-queue message on exception

我正在寻找一种可靠的方法来重新排队目前无法正确处理的消息。

我一直在查看 http://dotnetcodr.com/2014/06/16/rabbitmq-in-net-c-basic-error-handling-in-receiver/,似乎支持在 RabbitMQ API 中重新排队消息。

else //reject the message but push back to queue for later re-try
{
    Console.WriteLine("Rejecting message and putting it back to the queue: {0}", message);
    model.BasicReject(deliveryArguments.DeliveryTag, true);
}

但是我使用的是 EasyNetQ。 所以想知道我将如何在这里做类似的事情。

bus.Subscribe<MyMessage>("my_subscription_id", msg => {
    try
    {
        // do work... could be long running
    }
    catch ()
    {
        // something went wrong - requeue message
    }
});

这是一个好方法吗?不是 ACK 如果 do work 超过 RabbitMQ 服务器等待 ACK 超时,消息可能会导致问题。

据我所知,无法使用 EasyNetQ 手动 acknackreject 消息。

我看到你opened an issue ticket with the EasyNetQ team,关于这个...但还没有答案。

FWIW,这是一件非常合适的事情。我使用的所有库都支持此功能集(在 NodeJS 中)并且很常见。我很惊讶 EasyNetQ 不支持这个。

所以我想到了这个解决方案。它用 EasyNetQ 替换了默认的错误策略。

public class DeadLetterStrategy : DefaultConsumerErrorStrategy
{
    public DeadLetterStrategy(IConnectionFactory connectionFactory, ISerializer serializer, IEasyNetQLogger logger, IConventions conventions, ITypeNameSerializer typeNameSerializer)
    : base(connectionFactory, serializer, logger, conventions, typeNameSerializer)
    {
    }

    public override AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception)
    {
        object deathHeaderObject;
        if (!context.Properties.Headers.TryGetValue("x-death", out deathHeaderObject))
            return AckStrategies.NackWithoutRequeue;

        var deathHeaders = deathHeaderObject as IList;

        if (deathHeaders == null)
            return AckStrategies.NackWithoutRequeue;

        var retries = 0;
        foreach (IDictionary header in deathHeaders)
        {
            var count = int.Parse(header["count"].ToString());
            retries += count;
        }

        if (retries < 3)
            return AckStrategies.NackWithoutRequeue;
        return base.HandleConsumerError(context, exception);
    }
}

你这样替换它:

RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>())

您必须使用 AdvancedBus,所以您必须手动设置所有内容。

using (var bus = RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>()))
{
    var deadExchange = bus.Advanced.ExchangeDeclare("exchange.text.dead", ExchangeType.Direct);
    var textExchange = bus.Advanced.ExchangeDeclare("exchange.text", ExchangeType.Direct);
    var queue = bus.Advanced.QueueDeclare("queue.text", deadLetterExchange: deadExchange.Name);
    bus.Advanced.Bind(deadExchange, queue, "");
    bus.Advanced.Bind(textExchange, queue, "");

    bus.Advanced.Consume<TextMessage>(queue, (message, info) => HandleTextMessage(message, info));
}

这将死信失败消息 3 次。之后会进入EasyNetQ提供的默认错误队列进行错误处理。 您可以订阅该队列。

当异常传播出您的使用者方法时,消息为死信。所以这会触发死信。

static void HandleTextMessage(IMessage<TextMessage> textMessage, MessageReceivedInfo info)
{
    throw new Exception("This is a test!");
}