MassTransit 在内存中存储消息重试尝试?
MassTransit store Message retry attempts in memory?
MassTransit 重试策略允许我们设置重试次数。在消费者端,我们可以通过调用
来获取它
context.GetRetryAttempt()
但是当消费者应用程序重新启动时,它从 0 开始。
我需要它应该从服务器停止之前离开的地方开始。
作为 RabbitMQ 死信交换帮助我实现它。此处附有代码示例。我可以在 MassTransit 中做类似的事情吗?
public class Consumer<T> : IConsumer<T>
{
private readonly ConsumerConfigOptions _consumerConfigOptions;
private readonly RabbitMqPoc.Interface.IConnectionFactory _connectionFactory;
public event ReceiveMessage<T> ReceiveMessageEvent;
private AckType AckType = AckType.Accept;
private bool IsRequeue = false;
private const string RetryExchange = "RetryExchange";
private const string RetryKeyName = "x-retries";
public Consumer(ConsumerConfigOptions consumerConfigOptions,
RabbitMqPoc.Interface.IConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
_consumerConfigOptions = consumerConfigOptions;
}
public void Consume(string queue)
{
IModel channel = _connectionFactory.GetConnection().CreateModel();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var header = ea.BasicProperties.Headers;
byte[] body = ea.Body;
try
{
//throw exception here
CheckForAcknowledge(channel,ea);
}
catch (Exception exception)
{
Requeue(channel, ea, body);
}
};
channel.BasicConsume(queue: queue,
autoAck: false,
consumer: consumer);
}
private void Requeue(IModel channel, BasicDeliverEventArgs ea, byte[] body)
{
var header = ea.BasicProperties.Headers;
if (header.ContainsKey(RetryKeyName))
{
SetRetryCount(header, channel, ea, body);
}
else
{
RePublishOnDlx(channel, ea, body, 0);
}
}
private void SetRetryCount(IDictionary<string, object> header, IModel channel, BasicDeliverEventArgs ea, byte[] body)
{
int currentRetryAttempt = (int)header[RetryKeyName] + 1;
if (currentRetryAttempt >= _consumerConfigOptions.RetryAttempts)
{
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
else
{
RePublishOnDlx(channel, ea, body, currentRetryAttempt);
}
}
private void RePublishOnDlx(IModel channel, BasicDeliverEventArgs eventArgs, byte[] body, int retryAttempt)
{
IBasicProperties basicProperties = channel.CreateBasicProperties();
basicProperties.Headers = new Dictionary<string, object>
{
{ RetryKeyName, retryAttempt }
};
channel.BasicNack(eventArgs.DeliveryTag, false, false);
channel.BasicPublish(RetryExchange, string.Empty, basicProperties, body);
}
public void Acknowledge(AckType ackType = AckType.Accept, bool isRequeue = false)
{
AckType = ackType;
IsRequeue = isRequeue;
}
private void CheckForAcknowledge(IModel channel, BasicDeliverEventArgs ea)
{
if (AckType == AckType.Accept)
{
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
else
{
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: IsRequeue);
}
}
你是对的,MassTransit 中的 UseMessageRetry
完全是 in-memory。如果重试次数用尽,消息将移至 _error queue。如果进程在重试挂起时退出,则消息将被取消并保留在 queue 上。当发生这种情况并且进程重新启动时,您是对的,重试策略从零开始 — 因为原始消息仍在 queue 中并且无法修改 headers。
如果需要,您可以使用延迟交换 RabbitMQ plug-in 重新传送消息,以使用代理执行重试。一旦您在总线上配置了延迟交换 plug-in 和消息调度程序,UseScheduledRedelivery
过滤器就会执行此操作。
cfg.UseDelayedExchangeMessageScheduler();
MassTransit 重试策略允许我们设置重试次数。在消费者端,我们可以通过调用
来获取它context.GetRetryAttempt()
但是当消费者应用程序重新启动时,它从 0 开始。 我需要它应该从服务器停止之前离开的地方开始。
作为 RabbitMQ 死信交换帮助我实现它。此处附有代码示例。我可以在 MassTransit 中做类似的事情吗?
public class Consumer<T> : IConsumer<T>
{
private readonly ConsumerConfigOptions _consumerConfigOptions;
private readonly RabbitMqPoc.Interface.IConnectionFactory _connectionFactory;
public event ReceiveMessage<T> ReceiveMessageEvent;
private AckType AckType = AckType.Accept;
private bool IsRequeue = false;
private const string RetryExchange = "RetryExchange";
private const string RetryKeyName = "x-retries";
public Consumer(ConsumerConfigOptions consumerConfigOptions,
RabbitMqPoc.Interface.IConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
_consumerConfigOptions = consumerConfigOptions;
}
public void Consume(string queue)
{
IModel channel = _connectionFactory.GetConnection().CreateModel();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var header = ea.BasicProperties.Headers;
byte[] body = ea.Body;
try
{
//throw exception here
CheckForAcknowledge(channel,ea);
}
catch (Exception exception)
{
Requeue(channel, ea, body);
}
};
channel.BasicConsume(queue: queue,
autoAck: false,
consumer: consumer);
}
private void Requeue(IModel channel, BasicDeliverEventArgs ea, byte[] body)
{
var header = ea.BasicProperties.Headers;
if (header.ContainsKey(RetryKeyName))
{
SetRetryCount(header, channel, ea, body);
}
else
{
RePublishOnDlx(channel, ea, body, 0);
}
}
private void SetRetryCount(IDictionary<string, object> header, IModel channel, BasicDeliverEventArgs ea, byte[] body)
{
int currentRetryAttempt = (int)header[RetryKeyName] + 1;
if (currentRetryAttempt >= _consumerConfigOptions.RetryAttempts)
{
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
else
{
RePublishOnDlx(channel, ea, body, currentRetryAttempt);
}
}
private void RePublishOnDlx(IModel channel, BasicDeliverEventArgs eventArgs, byte[] body, int retryAttempt)
{
IBasicProperties basicProperties = channel.CreateBasicProperties();
basicProperties.Headers = new Dictionary<string, object>
{
{ RetryKeyName, retryAttempt }
};
channel.BasicNack(eventArgs.DeliveryTag, false, false);
channel.BasicPublish(RetryExchange, string.Empty, basicProperties, body);
}
public void Acknowledge(AckType ackType = AckType.Accept, bool isRequeue = false)
{
AckType = ackType;
IsRequeue = isRequeue;
}
private void CheckForAcknowledge(IModel channel, BasicDeliverEventArgs ea)
{
if (AckType == AckType.Accept)
{
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
else
{
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: IsRequeue);
}
}
你是对的,MassTransit 中的 UseMessageRetry
完全是 in-memory。如果重试次数用尽,消息将移至 _error queue。如果进程在重试挂起时退出,则消息将被取消并保留在 queue 上。当发生这种情况并且进程重新启动时,您是对的,重试策略从零开始 — 因为原始消息仍在 queue 中并且无法修改 headers。
如果需要,您可以使用延迟交换 RabbitMQ plug-in 重新传送消息,以使用代理执行重试。一旦您在总线上配置了延迟交换 plug-in 和消息调度程序,UseScheduledRedelivery
过滤器就会执行此操作。
cfg.UseDelayedExchangeMessageScheduler();