让 RabbitMq 按我的意愿行事(死信和错误重新排队)

Getting RabbitMq to behave as I want (dead letter and re-queue on errors)

我有一个简单的兔子设置,目前正在做我想做的事...

它根据消息的类型发布消息。每种类型都有自己的队列。
发布消息后,即使没有消费者使用它们,它们也会位于队列中(如果没有消费者到达,则永远坐在那里)。
当有消费者(只有一个!)时,它会吃掉消息。
如果由于某种原因它无法处理一条消息(例如,它在父消息到达之前收到一条子消息),它 nack 将消息放回队列中。

如果它看到同一条消息六次,它会nack发送消息。

一切正常,但目前在六次尝试后它丢弃了消息。

我想要的是将消息传递给 'dead letter queue' 并在一段时间后(比如 5 分钟)将该消息重新排队到它来自的特定队列的末尾。

我绝对是 cargo cult 编程,我不太了解所有 exchange/queue/binding/routing 键和其他涉及的奥秘......感谢手握!

 public void PublishEntity<T>(T message) where T : class, ISendable
    {
        logger.Info($"publishing {message.UniqueId}");

        var factory = new ConnectionFactory
        {
            HostName = appSettings.RabbitHostName,
            UserName = appSettings.RabbitUsername,
            Password = appSettings.RabbitPassword
        };
        try
        {
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine($"Setting up queues for: {typeof(T).Name}");                      

                    channel.QueueDeclare($"App_{typeof(T).Name}",
                        true,
                        false,
                        false,
                        null);                        

                    var json = JsonConvert.SerializeObject(message);
                    var body = Encoding.UTF8.GetBytes(json);

                    channel.TxSelect();

                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    properties.Headers = new Dictionary<string, object>
                    {
                        { "Id", Guid.NewGuid().ToString() }

                    };

                    channel.BasicPublish("",
                        $"App_{typeof(T).Name}",
                        properties,
                        body);
                    Data.MarkAsSent(message);
                    channel.TxCommit();
                }
            }
        }

ISendable 只需确保邮件具有 Data.MarkAsSent(message); 中使用的某些属性,以在数据库中进行标记。

接收器有类似的代码块来处理每种类型。正如我所说,这是有效的。

添加死信队列东西需要做什么?

我这样的尝试创建了死信队列,但没有任何东西移动到它们。

 public void PublishEntity<T>(T message) where T : class, ISendable
    {
        logger.Info($"publishing {message.UniqueId}");

        var factory = new ConnectionFactory
        {
            HostName = appSettings.RabbitHostName,
            UserName = appSettings.RabbitUsername,
            Password = appSettings.RabbitPassword
        };
        try
        {
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine($"Setting up queues for: {typeof(T).Name}");
                    channel.ExchangeDeclare("App.Dead.Letter", "direct", true);

                    var args = new Dictionary<string, object>
                    {
                        { "x-dead-letter-exchange", "App.Dead.Letter" },
                        {
                            "x-dead-letter-routing-key", $"DLQ.App_{typeof(T).Name}"
                        }
                    };

                    channel.QueueDeclare($"App_{typeof(T).Name}",
                        true,
                        false,
                        false,
                        args);

                    channel.QueueDeclare($"DLQ.App_{typeof(T).Name}",
                        true,
                        false,
                        false,
                        null);

                    var json = JsonConvert.SerializeObject(message);
                    var body = Encoding.UTF8.GetBytes(json);

                    channel.TxSelect();

                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    properties.Headers = new Dictionary<string, object>
                    {
                        { "Id", Guid.NewGuid().ToString() }

                    };

                    channel.BasicPublish("",
                        $"App_{typeof(T).Name}",
                        properties,
                        body);
                    Data.MarkAsSent(message);
                    channel.TxCommit();
                }
            }
        }

我的接收器有这个魔力

              catch (Exception ex)
            {
                var attemptsToHandle = MarkFailedToHandleMessage(logId, ex);
                if (attemptsToHandle > 5)
                {
                    //If we have seen this message many times then don't re-que.
                    channel.BasicNack(ea.DeliveryTag, false, false);
                    return;
                }

                // re-que so we can re-try later.
                channel.BasicNack(ea.DeliveryTag, false, true);
                return;
            }

呸...很多代码。谢谢,如果你已经做到了这一点....

我想问的是我的代码中有哪些明显的问题使事情落入死信队列。

以及我需要添加什么额外的东西,以便 dlq 中的东西会在一段时间后反弹回主队列。 此外,这为每种类型的队列设置了一个 dlq...这是必需的还是应该有一个队列来保存错误消息?

所以我想我已经按照我的预期进行了。虽然很难测试所有这些东西!

        public void PublishEntity<T>(T message) where T : class, ISendable
    {
        logger.Info($"publishing {message.UniqueId}");

        var factory = new ConnectionFactory
        {
            HostName = appSettings.RabbitHostName,
            UserName = appSettings.RabbitUsername,
            Password = appSettings.RabbitPassword
        };
        try
        {
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine($"Setting up queues for: {typeof(T).Name}");
                 // Declair dead letter queue for this type
                    channel.ExchangeDeclare("App.Dead.Letter", "direct", true);
                    var queueArgs = new Dictionary<string, object>
                    {
                        { "x-dead-letter-exchange", "App" },
                        {
                            "x-dead-letter-routing-key", $"App_{typeof(T).Name}"
                        }
                        ,{ "x-message-ttl", 30000 }
                    };

                    channel.QueueDeclare($"DLQ.App_{typeof(T).Name}",
                        true,
                        false,
                        false,
                        queueArgs);
                    channel.QueueBind($"DLQ.App_{typeof(T).Name}", "App.Dead.Letter", $"DLQ.App_{typeof(T).Name}", null);

                 // declair queue for this type
                    channel.ExchangeDeclare("App", "direct", true);
                    var args = new Dictionary<string, object>
                    {
                        { "x-dead-letter-exchange", "App.Dead.Letter" },
                        {
                            "x-dead-letter-routing-key", $"DLQ.App_{typeof(T).Name}"
                        }
                    };

                    channel.QueueDeclare($"App_{typeof(T).Name}",
                        true,
                        false,
                        false,
                        args);
                    channel.QueueBind($"App_{typeof(T).Name}", "App", $"App_{typeof(T).Name}", null);

我添加并交换了我的主队列,并将队列实际绑定到交换器。我仍然不知道为什么我需要这样做,因为它在没有这种额外复杂性的情况下工作。我想之前有什么魔法对我隐藏了?