在没有订阅者的情况下,RabbitMQ 消息可以存活多长时间?
How long a RabbitMQ Message stays alive without Subscribers?
我正在使用 MassTransit 和 RabbitMQ 创建一个简单的 Publisher/Subscriber。
发布者有以下代码来初始化总线:
/** create the bus */
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
});
/** start the bus and publish */
bus.Start();
bus.Publish<IPersonLogin>(new {FirstName = "John", LastName = "Smith"});
并且订阅者有这个初始化代码:
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint(host, "person_login", e =>
{
e.Consumer<PersonLoginConsumer>();
});
});
如果我关闭订阅者并发布 2 条消息,消息不会丢失,一旦订阅者恢复正常,消息就会被处理。
所以我的问题是:
- 我如何确保消息一直保留在 RabbitMQ 的队列中,直到一个订阅者出现并接收它?
- 如果服务器重启并且某些消息未被任何订阅者处理,会发生什么情况?它们会丢失还是在订阅者重启后立即被处理?
- 这是确保处理每条消息的正确模式吗?还是我应该使用不同的策略?
放在首位。
- 如果没有任何订阅者,RabbitMQ 将不知道应该将消息传递到哪个队列。那么消息将无法投递。(不确定是否将其移至错误队列或跳过)
- 如果交换器已经存在,它将被放入已订阅该事件的消费者队列中。因此,托管您的消费者的端点可以关闭,消息仍将被传递。
- 当消息被传送到队列时,消费者将获取您的消息并进行处理。如果在处理您的消息时发生异常,它将被移至 endpoint_error 队列。 (取决于您的 RetryPolicy)。部署修复并将您的消息移回主队列,消息将被处理,就好像什么都没发生一样。
关于常见陷阱的常见问题的好读物
默认情况下,队列中的任何消息都将保留在那里,直到发生以下三种情况之一:
- 消息被消费
- 消息"time to live"过期(默认为永远存在)
- 服务器崩溃或重启
如果您有一个装满消息的队列,消息通常会一直存在,直到发生这三种情况之一。希望您能尽快让您的消费者在线,以便您可以使用消息并处理它们。
如果您希望消息在一段时间后自动删除(假设它们没有首先被消费),您只需设置一个生存时间 (ttl)
对于崩溃...如果您使消息持久保存到磁盘,则该消息可以在崩溃/重新启动后幸存下来。不过,如果在消息从交换器路由到队列之前服务器崩溃,消息仍有可能丢失。
我正在使用 MassTransit 和 RabbitMQ 创建一个简单的 Publisher/Subscriber。 发布者有以下代码来初始化总线:
/** create the bus */
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
});
/** start the bus and publish */
bus.Start();
bus.Publish<IPersonLogin>(new {FirstName = "John", LastName = "Smith"});
并且订阅者有这个初始化代码:
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint(host, "person_login", e =>
{
e.Consumer<PersonLoginConsumer>();
});
});
如果我关闭订阅者并发布 2 条消息,消息不会丢失,一旦订阅者恢复正常,消息就会被处理。
所以我的问题是:
- 我如何确保消息一直保留在 RabbitMQ 的队列中,直到一个订阅者出现并接收它?
- 如果服务器重启并且某些消息未被任何订阅者处理,会发生什么情况?它们会丢失还是在订阅者重启后立即被处理?
- 这是确保处理每条消息的正确模式吗?还是我应该使用不同的策略?
放在首位。
- 如果没有任何订阅者,RabbitMQ 将不知道应该将消息传递到哪个队列。那么消息将无法投递。(不确定是否将其移至错误队列或跳过)
- 如果交换器已经存在,它将被放入已订阅该事件的消费者队列中。因此,托管您的消费者的端点可以关闭,消息仍将被传递。
- 当消息被传送到队列时,消费者将获取您的消息并进行处理。如果在处理您的消息时发生异常,它将被移至 endpoint_error 队列。 (取决于您的 RetryPolicy)。部署修复并将您的消息移回主队列,消息将被处理,就好像什么都没发生一样。
关于常见陷阱的常见问题的好读物
默认情况下,队列中的任何消息都将保留在那里,直到发生以下三种情况之一:
- 消息被消费
- 消息"time to live"过期(默认为永远存在)
- 服务器崩溃或重启
如果您有一个装满消息的队列,消息通常会一直存在,直到发生这三种情况之一。希望您能尽快让您的消费者在线,以便您可以使用消息并处理它们。
如果您希望消息在一段时间后自动删除(假设它们没有首先被消费),您只需设置一个生存时间 (ttl)
对于崩溃...如果您使消息持久保存到磁盘,则该消息可以在崩溃/重新启动后幸存下来。不过,如果在消息从交换器路由到队列之前服务器崩溃,消息仍有可能丢失。