让 RabbitMq 按我的意愿行事(死信和错误重新排队)
Getting RabbitMq to behave as I want (dead letter and re-queue on errors)
我有一个简单的兔子设置,目前正在做我想做的事...
它根据消息的类型发布消息。每种类型都有自己的队列。
发布消息后,即使没有消费者使用它们,它们也会位于队列中(如果没有消费者到达,则永远坐在那里)。
当有消费者(只有一个!)时,它会吃掉消息。
如果由于某种原因它无法处理一条消息(例如,它在父消息到达之前收到一条子消息),它 nack
将消息放回队列中。
如果它看到同一条消息六次,它会nack
发送消息。
一切正常,但目前在六次尝试后它丢弃了消息。
我想要的是将消息传递给 'dead letter queue' 并在一段时间后(比如 5 分钟)将该消息重新排队到它来自的特定队列的末尾。
我绝对是 cargo cult 编程,我不太了解所有 exchange/queue/binding/routing 键和其他涉及的奥秘......感谢手握!
public void PublishEntity<T>(T message) where T : class, ISendable
{
logger.Info($"publishing {message.UniqueId}");
var factory = new ConnectionFactory
{
HostName = appSettings.RabbitHostName,
UserName = appSettings.RabbitUsername,
Password = appSettings.RabbitPassword
};
try
{
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
Console.WriteLine($"Setting up queues for: {typeof(T).Name}");
channel.QueueDeclare($"App_{typeof(T).Name}",
true,
false,
false,
null);
var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);
channel.TxSelect();
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = new Dictionary<string, object>
{
{ "Id", Guid.NewGuid().ToString() }
};
channel.BasicPublish("",
$"App_{typeof(T).Name}",
properties,
body);
Data.MarkAsSent(message);
channel.TxCommit();
}
}
}
ISendable
只需确保邮件具有 Data.MarkAsSent(message);
中使用的某些属性,以在数据库中进行标记。
接收器有类似的代码块来处理每种类型。正如我所说,这是有效的。
添加死信队列东西需要做什么?
我这样的尝试创建了死信队列,但没有任何东西移动到它们。
public void PublishEntity<T>(T message) where T : class, ISendable
{
logger.Info($"publishing {message.UniqueId}");
var factory = new ConnectionFactory
{
HostName = appSettings.RabbitHostName,
UserName = appSettings.RabbitUsername,
Password = appSettings.RabbitPassword
};
try
{
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
Console.WriteLine($"Setting up queues for: {typeof(T).Name}");
channel.ExchangeDeclare("App.Dead.Letter", "direct", true);
var args = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "App.Dead.Letter" },
{
"x-dead-letter-routing-key", $"DLQ.App_{typeof(T).Name}"
}
};
channel.QueueDeclare($"App_{typeof(T).Name}",
true,
false,
false,
args);
channel.QueueDeclare($"DLQ.App_{typeof(T).Name}",
true,
false,
false,
null);
var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);
channel.TxSelect();
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = new Dictionary<string, object>
{
{ "Id", Guid.NewGuid().ToString() }
};
channel.BasicPublish("",
$"App_{typeof(T).Name}",
properties,
body);
Data.MarkAsSent(message);
channel.TxCommit();
}
}
}
我的接收器有这个魔力
catch (Exception ex)
{
var attemptsToHandle = MarkFailedToHandleMessage(logId, ex);
if (attemptsToHandle > 5)
{
//If we have seen this message many times then don't re-que.
channel.BasicNack(ea.DeliveryTag, false, false);
return;
}
// re-que so we can re-try later.
channel.BasicNack(ea.DeliveryTag, false, true);
return;
}
呸...很多代码。谢谢,如果你已经做到了这一点....
我想问的是我的代码中有哪些明显的问题使事情落入死信队列。
以及我需要添加什么额外的东西,以便 dlq 中的东西会在一段时间后反弹回主队列。
此外,这为每种类型的队列设置了一个 dlq...这是必需的还是应该有一个队列来保存错误消息?
所以我想我已经按照我的预期进行了。虽然很难测试所有这些东西!
public void PublishEntity<T>(T message) where T : class, ISendable
{
logger.Info($"publishing {message.UniqueId}");
var factory = new ConnectionFactory
{
HostName = appSettings.RabbitHostName,
UserName = appSettings.RabbitUsername,
Password = appSettings.RabbitPassword
};
try
{
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
Console.WriteLine($"Setting up queues for: {typeof(T).Name}");
// Declair dead letter queue for this type
channel.ExchangeDeclare("App.Dead.Letter", "direct", true);
var queueArgs = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "App" },
{
"x-dead-letter-routing-key", $"App_{typeof(T).Name}"
}
,{ "x-message-ttl", 30000 }
};
channel.QueueDeclare($"DLQ.App_{typeof(T).Name}",
true,
false,
false,
queueArgs);
channel.QueueBind($"DLQ.App_{typeof(T).Name}", "App.Dead.Letter", $"DLQ.App_{typeof(T).Name}", null);
// declair queue for this type
channel.ExchangeDeclare("App", "direct", true);
var args = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "App.Dead.Letter" },
{
"x-dead-letter-routing-key", $"DLQ.App_{typeof(T).Name}"
}
};
channel.QueueDeclare($"App_{typeof(T).Name}",
true,
false,
false,
args);
channel.QueueBind($"App_{typeof(T).Name}", "App", $"App_{typeof(T).Name}", null);
我添加并交换了我的主队列,并将队列实际绑定到交换器。我仍然不知道为什么我需要这样做,因为它在没有这种额外复杂性的情况下工作。我想之前有什么魔法对我隐藏了?
我有一个简单的兔子设置,目前正在做我想做的事...
它根据消息的类型发布消息。每种类型都有自己的队列。
发布消息后,即使没有消费者使用它们,它们也会位于队列中(如果没有消费者到达,则永远坐在那里)。
当有消费者(只有一个!)时,它会吃掉消息。
如果由于某种原因它无法处理一条消息(例如,它在父消息到达之前收到一条子消息),它 nack
将消息放回队列中。
如果它看到同一条消息六次,它会nack
发送消息。
一切正常,但目前在六次尝试后它丢弃了消息。
我想要的是将消息传递给 'dead letter queue' 并在一段时间后(比如 5 分钟)将该消息重新排队到它来自的特定队列的末尾。
我绝对是 cargo cult 编程,我不太了解所有 exchange/queue/binding/routing 键和其他涉及的奥秘......感谢手握!
public void PublishEntity<T>(T message) where T : class, ISendable
{
logger.Info($"publishing {message.UniqueId}");
var factory = new ConnectionFactory
{
HostName = appSettings.RabbitHostName,
UserName = appSettings.RabbitUsername,
Password = appSettings.RabbitPassword
};
try
{
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
Console.WriteLine($"Setting up queues for: {typeof(T).Name}");
channel.QueueDeclare($"App_{typeof(T).Name}",
true,
false,
false,
null);
var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);
channel.TxSelect();
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = new Dictionary<string, object>
{
{ "Id", Guid.NewGuid().ToString() }
};
channel.BasicPublish("",
$"App_{typeof(T).Name}",
properties,
body);
Data.MarkAsSent(message);
channel.TxCommit();
}
}
}
ISendable
只需确保邮件具有 Data.MarkAsSent(message);
中使用的某些属性,以在数据库中进行标记。
接收器有类似的代码块来处理每种类型。正如我所说,这是有效的。
添加死信队列东西需要做什么?
我这样的尝试创建了死信队列,但没有任何东西移动到它们。
public void PublishEntity<T>(T message) where T : class, ISendable
{
logger.Info($"publishing {message.UniqueId}");
var factory = new ConnectionFactory
{
HostName = appSettings.RabbitHostName,
UserName = appSettings.RabbitUsername,
Password = appSettings.RabbitPassword
};
try
{
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
Console.WriteLine($"Setting up queues for: {typeof(T).Name}");
channel.ExchangeDeclare("App.Dead.Letter", "direct", true);
var args = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "App.Dead.Letter" },
{
"x-dead-letter-routing-key", $"DLQ.App_{typeof(T).Name}"
}
};
channel.QueueDeclare($"App_{typeof(T).Name}",
true,
false,
false,
args);
channel.QueueDeclare($"DLQ.App_{typeof(T).Name}",
true,
false,
false,
null);
var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);
channel.TxSelect();
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = new Dictionary<string, object>
{
{ "Id", Guid.NewGuid().ToString() }
};
channel.BasicPublish("",
$"App_{typeof(T).Name}",
properties,
body);
Data.MarkAsSent(message);
channel.TxCommit();
}
}
}
我的接收器有这个魔力
catch (Exception ex)
{
var attemptsToHandle = MarkFailedToHandleMessage(logId, ex);
if (attemptsToHandle > 5)
{
//If we have seen this message many times then don't re-que.
channel.BasicNack(ea.DeliveryTag, false, false);
return;
}
// re-que so we can re-try later.
channel.BasicNack(ea.DeliveryTag, false, true);
return;
}
呸...很多代码。谢谢,如果你已经做到了这一点....
我想问的是我的代码中有哪些明显的问题使事情落入死信队列。
以及我需要添加什么额外的东西,以便 dlq 中的东西会在一段时间后反弹回主队列。 此外,这为每种类型的队列设置了一个 dlq...这是必需的还是应该有一个队列来保存错误消息?
所以我想我已经按照我的预期进行了。虽然很难测试所有这些东西!
public void PublishEntity<T>(T message) where T : class, ISendable
{
logger.Info($"publishing {message.UniqueId}");
var factory = new ConnectionFactory
{
HostName = appSettings.RabbitHostName,
UserName = appSettings.RabbitUsername,
Password = appSettings.RabbitPassword
};
try
{
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
Console.WriteLine($"Setting up queues for: {typeof(T).Name}");
// Declair dead letter queue for this type
channel.ExchangeDeclare("App.Dead.Letter", "direct", true);
var queueArgs = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "App" },
{
"x-dead-letter-routing-key", $"App_{typeof(T).Name}"
}
,{ "x-message-ttl", 30000 }
};
channel.QueueDeclare($"DLQ.App_{typeof(T).Name}",
true,
false,
false,
queueArgs);
channel.QueueBind($"DLQ.App_{typeof(T).Name}", "App.Dead.Letter", $"DLQ.App_{typeof(T).Name}", null);
// declair queue for this type
channel.ExchangeDeclare("App", "direct", true);
var args = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "App.Dead.Letter" },
{
"x-dead-letter-routing-key", $"DLQ.App_{typeof(T).Name}"
}
};
channel.QueueDeclare($"App_{typeof(T).Name}",
true,
false,
false,
args);
channel.QueueBind($"App_{typeof(T).Name}", "App", $"App_{typeof(T).Name}", null);
我添加并交换了我的主队列,并将队列实际绑定到交换器。我仍然不知道为什么我需要这样做,因为它在没有这种额外复杂性的情况下工作。我想之前有什么魔法对我隐藏了?