异常时重新排队消息
Re-queue message on exception
我正在寻找一种可靠的方法来重新排队目前无法正确处理的消息。
我一直在查看 http://dotnetcodr.com/2014/06/16/rabbitmq-in-net-c-basic-error-handling-in-receiver/,似乎支持在 RabbitMQ API 中重新排队消息。
else //reject the message but push back to queue for later re-try
{
Console.WriteLine("Rejecting message and putting it back to the queue: {0}", message);
model.BasicReject(deliveryArguments.DeliveryTag, true);
}
但是我使用的是 EasyNetQ。
所以想知道我将如何在这里做类似的事情。
bus.Subscribe<MyMessage>("my_subscription_id", msg => {
try
{
// do work... could be long running
}
catch ()
{
// something went wrong - requeue message
}
});
这是一个好方法吗?不是 ACK
如果 do work
超过 RabbitMQ 服务器等待 ACK
超时,消息可能会导致问题。
据我所知,无法使用 EasyNetQ 手动 ack
、nack
或 reject
消息。
我看到你opened an issue ticket with the EasyNetQ team,关于这个...但还没有答案。
FWIW,这是一件非常合适的事情。我使用的所有库都支持此功能集(在 NodeJS 中)并且很常见。我很惊讶 EasyNetQ 不支持这个。
所以我想到了这个解决方案。它用 EasyNetQ 替换了默认的错误策略。
public class DeadLetterStrategy : DefaultConsumerErrorStrategy
{
public DeadLetterStrategy(IConnectionFactory connectionFactory, ISerializer serializer, IEasyNetQLogger logger, IConventions conventions, ITypeNameSerializer typeNameSerializer)
: base(connectionFactory, serializer, logger, conventions, typeNameSerializer)
{
}
public override AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception)
{
object deathHeaderObject;
if (!context.Properties.Headers.TryGetValue("x-death", out deathHeaderObject))
return AckStrategies.NackWithoutRequeue;
var deathHeaders = deathHeaderObject as IList;
if (deathHeaders == null)
return AckStrategies.NackWithoutRequeue;
var retries = 0;
foreach (IDictionary header in deathHeaders)
{
var count = int.Parse(header["count"].ToString());
retries += count;
}
if (retries < 3)
return AckStrategies.NackWithoutRequeue;
return base.HandleConsumerError(context, exception);
}
}
你这样替换它:
RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>())
您必须使用 AdvancedBus
,所以您必须手动设置所有内容。
using (var bus = RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>()))
{
var deadExchange = bus.Advanced.ExchangeDeclare("exchange.text.dead", ExchangeType.Direct);
var textExchange = bus.Advanced.ExchangeDeclare("exchange.text", ExchangeType.Direct);
var queue = bus.Advanced.QueueDeclare("queue.text", deadLetterExchange: deadExchange.Name);
bus.Advanced.Bind(deadExchange, queue, "");
bus.Advanced.Bind(textExchange, queue, "");
bus.Advanced.Consume<TextMessage>(queue, (message, info) => HandleTextMessage(message, info));
}
这将死信失败消息 3 次。之后会进入EasyNetQ提供的默认错误队列进行错误处理。 您可以订阅该队列。
当异常传播出您的使用者方法时,消息为死信。所以这会触发死信。
static void HandleTextMessage(IMessage<TextMessage> textMessage, MessageReceivedInfo info)
{
throw new Exception("This is a test!");
}
我正在寻找一种可靠的方法来重新排队目前无法正确处理的消息。
我一直在查看 http://dotnetcodr.com/2014/06/16/rabbitmq-in-net-c-basic-error-handling-in-receiver/,似乎支持在 RabbitMQ API 中重新排队消息。
else //reject the message but push back to queue for later re-try
{
Console.WriteLine("Rejecting message and putting it back to the queue: {0}", message);
model.BasicReject(deliveryArguments.DeliveryTag, true);
}
但是我使用的是 EasyNetQ。 所以想知道我将如何在这里做类似的事情。
bus.Subscribe<MyMessage>("my_subscription_id", msg => {
try
{
// do work... could be long running
}
catch ()
{
// something went wrong - requeue message
}
});
这是一个好方法吗?不是 ACK
如果 do work
超过 RabbitMQ 服务器等待 ACK
超时,消息可能会导致问题。
据我所知,无法使用 EasyNetQ 手动 ack
、nack
或 reject
消息。
我看到你opened an issue ticket with the EasyNetQ team,关于这个...但还没有答案。
FWIW,这是一件非常合适的事情。我使用的所有库都支持此功能集(在 NodeJS 中)并且很常见。我很惊讶 EasyNetQ 不支持这个。
所以我想到了这个解决方案。它用 EasyNetQ 替换了默认的错误策略。
public class DeadLetterStrategy : DefaultConsumerErrorStrategy
{
public DeadLetterStrategy(IConnectionFactory connectionFactory, ISerializer serializer, IEasyNetQLogger logger, IConventions conventions, ITypeNameSerializer typeNameSerializer)
: base(connectionFactory, serializer, logger, conventions, typeNameSerializer)
{
}
public override AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception)
{
object deathHeaderObject;
if (!context.Properties.Headers.TryGetValue("x-death", out deathHeaderObject))
return AckStrategies.NackWithoutRequeue;
var deathHeaders = deathHeaderObject as IList;
if (deathHeaders == null)
return AckStrategies.NackWithoutRequeue;
var retries = 0;
foreach (IDictionary header in deathHeaders)
{
var count = int.Parse(header["count"].ToString());
retries += count;
}
if (retries < 3)
return AckStrategies.NackWithoutRequeue;
return base.HandleConsumerError(context, exception);
}
}
你这样替换它:
RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>())
您必须使用 AdvancedBus
,所以您必须手动设置所有内容。
using (var bus = RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>()))
{
var deadExchange = bus.Advanced.ExchangeDeclare("exchange.text.dead", ExchangeType.Direct);
var textExchange = bus.Advanced.ExchangeDeclare("exchange.text", ExchangeType.Direct);
var queue = bus.Advanced.QueueDeclare("queue.text", deadLetterExchange: deadExchange.Name);
bus.Advanced.Bind(deadExchange, queue, "");
bus.Advanced.Bind(textExchange, queue, "");
bus.Advanced.Consume<TextMessage>(queue, (message, info) => HandleTextMessage(message, info));
}
这将死信失败消息 3 次。之后会进入EasyNetQ提供的默认错误队列进行错误处理。 您可以订阅该队列。
当异常传播出您的使用者方法时,消息为死信。所以这会触发死信。
static void HandleTextMessage(IMessage<TextMessage> textMessage, MessageReceivedInfo info)
{
throw new Exception("This is a test!");
}