如何使用 MassTransit 从 RabbitMQ DeadLetter 队列中检索消息?
How to retrieve messages from RabbitMQ DeadLetter queue using MassTransit?
我正在尝试在一段时间后未收到消息时通知用户,使用 MassTransit 和 RabbitMQ。
根据我的阅读,在发布消息时使用 TimeToLive 属性 设置超时。当指定的时间用完时,消息应自动添加到 死信 队列,在末尾以“_skipped”命名。
如何从死信队列中检索消息?在我下面的尝试中,消息会立即添加到两个队列中,并且永远不会超时。
我想我可以使用 sagas 来做到这一点,但对于这样一个简单的问题来说,它似乎是一个过于复杂的解决方案,所以我想尽可能避免使用它。
static void Main(string[] args)
{
var bus = CreateBus("rabbitmq://localhost/", "guest", "guest", true);
var msg = new TestMessage("First Message");
LogMessageSent(msg);
bus.Publish(msg, c => c.TimeToLive = TimeSpan.FromSeconds(15));
Console.ReadKey();
bus.Stop();
bus = CreateBus("rabbitmq://localhost/", "guest", "guest", false);
msg = new TestMessage("SecondMessage");
LogMessageSent(msg);
bus.Publish(msg, c => c.TimeToLive = TimeSpan.FromSeconds(15));
Console.ReadKey();
bus.Stop();
}
private static IBusControl CreateBus(string rabbitUrl, string username, string password, bool enableEndpoint)
{
var bus = Bus.Factory.CreateUsingRabbitMq(c =>
{
var host = c.Host(new Uri(rabbitUrl), h =>
{
h.Username(username);
h.Password(password);
});
if (enableEndpoint)
{
c.ReceiveEndpoint(host, "TestQueue", x =>
{
x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue"));
});
}
c.ReceiveEndpoint(host, "TestQueue_skipped", x =>
{
x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue_skipped"));
});
});
bus.Start();
return bus;
}
private static void LogMessageSent(TestMessage msg)
{
Console.WriteLine(string.Format("{0} - Message \"{1}\" sent.", DateTime.Now.ToString("HH:mm:ss"), msg.Content));
}
private static Task LogMessageReceived(TestMessage msg, string queueName)
{
Console.WriteLine(string.Format("{0} - Message \"{1}\" received on queue \"{2}\".", DateTime.Now.ToString("HH:mm:ss"), msg.Content, queueName));
return Task.CompletedTask;
}
public class TestMessage
{
public string Content { get; }
public TestMessage(string content)
{
Content = content;
}
}
因为您正在呼叫 Publish
,所以消息被发送给每个订阅者。由于每个接收端点都在添加消费者,因此会为该消息类型创建一个订阅(以及 RabbitMQ 中的后续交换绑定)。您可以通过在 skipped 接收端点上指定 BindMessageExchanges = false
来禁用此功能。您将需要手动删除代理上的交换绑定。
至于您的 TimeToLive 问题,事实并非如此。 TimeToLive 传递给代理,如果消息过期,它将被移动到代理指定的死信队列(如果已配置)。它不会移动到 skipped 队列,这在 MassTransit 中具有不同的含义。在 MassTransit 中,skipped 队列用于传送到接收端点但没有在该端点上配置消费者来使用消息的消息。
对于 RabbitMQ,您可以使用以下方法在 MassTransit 中配置死信队列:
endpoint.BindDeadLetterQueue("dead-letter-queue-name");
这将配置代理,以便将达到其 TTL 的消息移动到指定的 exchange/queue。然后您在该接收端点上的消费者将能够使用它们(同样,请务必在死信接收端点上设置 BindMessageExchanges = false
。
例如:
c.ReceiveEndpoint(host, "TestQueue_expired", x =>
{
x.BindMessageExchanges = false;
x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue_expired"));
});
然后您的原始接收端点:
c.ReceiveEndpoint(host, "TestQueue", x =>
{
x.BindDeadLetterQueue("TestQueue_expired");
x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue"));
});
我正在尝试在一段时间后未收到消息时通知用户,使用 MassTransit 和 RabbitMQ。
根据我的阅读,在发布消息时使用 TimeToLive 属性 设置超时。当指定的时间用完时,消息应自动添加到 死信 队列,在末尾以“_skipped”命名。
如何从死信队列中检索消息?在我下面的尝试中,消息会立即添加到两个队列中,并且永远不会超时。
我想我可以使用 sagas 来做到这一点,但对于这样一个简单的问题来说,它似乎是一个过于复杂的解决方案,所以我想尽可能避免使用它。
static void Main(string[] args)
{
var bus = CreateBus("rabbitmq://localhost/", "guest", "guest", true);
var msg = new TestMessage("First Message");
LogMessageSent(msg);
bus.Publish(msg, c => c.TimeToLive = TimeSpan.FromSeconds(15));
Console.ReadKey();
bus.Stop();
bus = CreateBus("rabbitmq://localhost/", "guest", "guest", false);
msg = new TestMessage("SecondMessage");
LogMessageSent(msg);
bus.Publish(msg, c => c.TimeToLive = TimeSpan.FromSeconds(15));
Console.ReadKey();
bus.Stop();
}
private static IBusControl CreateBus(string rabbitUrl, string username, string password, bool enableEndpoint)
{
var bus = Bus.Factory.CreateUsingRabbitMq(c =>
{
var host = c.Host(new Uri(rabbitUrl), h =>
{
h.Username(username);
h.Password(password);
});
if (enableEndpoint)
{
c.ReceiveEndpoint(host, "TestQueue", x =>
{
x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue"));
});
}
c.ReceiveEndpoint(host, "TestQueue_skipped", x =>
{
x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue_skipped"));
});
});
bus.Start();
return bus;
}
private static void LogMessageSent(TestMessage msg)
{
Console.WriteLine(string.Format("{0} - Message \"{1}\" sent.", DateTime.Now.ToString("HH:mm:ss"), msg.Content));
}
private static Task LogMessageReceived(TestMessage msg, string queueName)
{
Console.WriteLine(string.Format("{0} - Message \"{1}\" received on queue \"{2}\".", DateTime.Now.ToString("HH:mm:ss"), msg.Content, queueName));
return Task.CompletedTask;
}
public class TestMessage
{
public string Content { get; }
public TestMessage(string content)
{
Content = content;
}
}
因为您正在呼叫 Publish
,所以消息被发送给每个订阅者。由于每个接收端点都在添加消费者,因此会为该消息类型创建一个订阅(以及 RabbitMQ 中的后续交换绑定)。您可以通过在 skipped 接收端点上指定 BindMessageExchanges = false
来禁用此功能。您将需要手动删除代理上的交换绑定。
至于您的 TimeToLive 问题,事实并非如此。 TimeToLive 传递给代理,如果消息过期,它将被移动到代理指定的死信队列(如果已配置)。它不会移动到 skipped 队列,这在 MassTransit 中具有不同的含义。在 MassTransit 中,skipped 队列用于传送到接收端点但没有在该端点上配置消费者来使用消息的消息。
对于 RabbitMQ,您可以使用以下方法在 MassTransit 中配置死信队列:
endpoint.BindDeadLetterQueue("dead-letter-queue-name");
这将配置代理,以便将达到其 TTL 的消息移动到指定的 exchange/queue。然后您在该接收端点上的消费者将能够使用它们(同样,请务必在死信接收端点上设置 BindMessageExchanges = false
。
例如:
c.ReceiveEndpoint(host, "TestQueue_expired", x =>
{
x.BindMessageExchanges = false;
x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue_expired"));
});
然后您的原始接收端点:
c.ReceiveEndpoint(host, "TestQueue", x =>
{
x.BindDeadLetterQueue("TestQueue_expired");
x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue"));
});