EasyNetQ - 如何重试失败的消息并在消息 body/header 中保留 RetryCount?
EasyNetQ - How to retry failed messages & persist RetryCount in message body/header?
我正在使用 EasyNetQ,需要在原始 queue 上重试失败的消息。问题是:即使我成功增加了 TriedCount 变量(在每个消息的 body 中),当 EasyNetQ 在异常后将消息发布到默认错误 queue 时,更新的 TriedCount 不在消息!大概是因为它只是将原始消息转储到错误 queue 而没有消费者的更改。
更新后的 TriedCount 适用于 in-process 次重新发布,但不适用于通过 EasyNetQ Hosepipe 或 EasyNetQ 管理客户端重新发布。 Hosepipe 生成的文本文件没有更新 TriedCount。
public interface IMsgHandler<T> where T: class, IMessageType
{
Task InvokeMsgCallbackFunc(T msg);
Func<T, Task> MsgCallbackFunc { get; set; }
bool IsTryValid(T msg, string refSubscriptionId); // Calls callback only
// if Retry is valid
}
public interface IMessageType
{
int MsgTypeId { get; }
Dictionary<string, TryInfo> MsgTryInfo {get; set;}
}
public class TryInfo
{
public int TriedCount { get; set; }
/*Other information regarding msg attempt*/
}
public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
{
IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);
// Using the msgHandler allows to add a mediator between EasyNetQ and the actual callback function
// The mediator can transmit the retried msg or choose to ignore it
return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
}
我也尝试通过 Management API(粗略的代码)重新发布自己:
var client = new ManagementClient("http://localhost", "guest", "guest");
var vhost = client.GetVhostAsync("/").Result;
var errQueue = client.GetQueueAsync("EasyNetQ_Default_Error_Queue",
vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue,
Ackmodes.ack_requeue_true);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue,
crit).Result;
foreach (var errMsg in errMsgs)
{
var pubRes = client.PublishAsync(client.GetExchangeAsync(errMsg.Exchange, vhost).Result,
new PublishInfo(errMsg.RoutingKey, errMsg.Payload)).Result;
}
这有效,但仅再次发布到错误 queue,而不是原始 queue。另外,现阶段不知道如何add/update消息body中的重试信息
我探索了 this 库以将 header 添加到消息中,但我没有看到 body 中的计数是否未更新,how/why header 中的计数是否会更新。
有没有办法在不求助于高级总线的情况下保留 TriedCount(在这种情况下我可能会使用 RabbitMQ .Net 客户端本身)?
为了防止它对其他人有所帮助,我最终实施了自己的 IErrorMessageSerializer (as opposed to implementing the whole IConsumerErrorStrategy,这似乎有点矫枉过正)。我在 body(而不是 header)中添加重试信息的原因是 EasyNetQ 不处理 header 中的复杂类型(反正不是 out-of-the-box) .因此,使用字典可以为不同的消费者提供更多控制。我在创建总线时注册了自定义序列化程序,如下所示:
_defaultBus = RabbitHutch.CreateBus(currentConnString, serviceRegister => serviceRegister.Register<IErrorMessageSerializer>(serviceProvider => new RetryEnabledErrorMessageSerializer<IMessageType>(givenSubscriptionId)));
并且像这样实现了 Serialize 方法:
public class RetryEnabledErrorMessageSerializer<T> : IErrorMessageSerializer where T : class, IMessageType
{
public string Serialize(byte[] messageBody)
{
string stringifiedMsgBody = Encoding.UTF8.GetString(messageBody);
var objectifiedMsgBody = JObject.Parse(stringifiedMsgBody);
// Add/update RetryInformation into objectifiedMsgBody here
// I have a dictionary that saves <key:consumerId, val: TryInfoObj>
return JsonConvert.SerializeObject(objectifiedMsgBody);
}
}
实际重试是由一个简单的控制台 app/windows 服务通过 EasyNetQ 管理定期完成的 API:
var client = new ManagementClient(AppConfig.BaseAddress, AppConfig.RabbitUsername, AppConfig.RabbitPassword);
var vhost = client.GetVhostAsync("/").Result;
var aliveRes = client.IsAliveAsync(vhost).Result;
var errQueue = client.GetQueueAsync(Constants.EasyNetQErrorQueueName, vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue, Ackmodes.ack_requeue_false);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue, crit).Result;
foreach (var errMsg in errMsgs)
{
var innerMsg = JsonConvert.DeserializeObject<Error>(errMsg.Payload);
var pubInfo = new PublishInfo(innerMsg.RoutingKey, innerMsg.Message);
pubInfo.Properties.Add("type", innerMsg.BasicProperties.Type);
pubInfo.Properties.Add("correlation_id", innerMsg.BasicProperties.CorrelationId);
pubInfo.Properties.Add("delivery_mode", innerMsg.BasicProperties.DeliveryMode);
var pubRes = client.PublishAsync(client.GetExchangeAsync(innerMsg.Exchange, vhost).Result,
pubInfo).Result;
}
我的消费者自己知道是否启用重试,从而赋予它更多控制权,以便它可以选择处理重试的消息或忽略它。一旦被忽略,msg 显然不会被再次尝试;这就是 EasyNetQ 的工作原理。
我正在使用 EasyNetQ,需要在原始 queue 上重试失败的消息。问题是:即使我成功增加了 TriedCount 变量(在每个消息的 body 中),当 EasyNetQ 在异常后将消息发布到默认错误 queue 时,更新的 TriedCount 不在消息!大概是因为它只是将原始消息转储到错误 queue 而没有消费者的更改。
更新后的 TriedCount 适用于 in-process 次重新发布,但不适用于通过 EasyNetQ Hosepipe 或 EasyNetQ 管理客户端重新发布。 Hosepipe 生成的文本文件没有更新 TriedCount。
public interface IMsgHandler<T> where T: class, IMessageType
{
Task InvokeMsgCallbackFunc(T msg);
Func<T, Task> MsgCallbackFunc { get; set; }
bool IsTryValid(T msg, string refSubscriptionId); // Calls callback only
// if Retry is valid
}
public interface IMessageType
{
int MsgTypeId { get; }
Dictionary<string, TryInfo> MsgTryInfo {get; set;}
}
public class TryInfo
{
public int TriedCount { get; set; }
/*Other information regarding msg attempt*/
}
public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
{
IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);
// Using the msgHandler allows to add a mediator between EasyNetQ and the actual callback function
// The mediator can transmit the retried msg or choose to ignore it
return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
}
我也尝试通过 Management API(粗略的代码)重新发布自己:
var client = new ManagementClient("http://localhost", "guest", "guest");
var vhost = client.GetVhostAsync("/").Result;
var errQueue = client.GetQueueAsync("EasyNetQ_Default_Error_Queue",
vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue,
Ackmodes.ack_requeue_true);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue,
crit).Result;
foreach (var errMsg in errMsgs)
{
var pubRes = client.PublishAsync(client.GetExchangeAsync(errMsg.Exchange, vhost).Result,
new PublishInfo(errMsg.RoutingKey, errMsg.Payload)).Result;
}
这有效,但仅再次发布到错误 queue,而不是原始 queue。另外,现阶段不知道如何add/update消息body中的重试信息
我探索了 this 库以将 header 添加到消息中,但我没有看到 body 中的计数是否未更新,how/why header 中的计数是否会更新。
有没有办法在不求助于高级总线的情况下保留 TriedCount(在这种情况下我可能会使用 RabbitMQ .Net 客户端本身)?
为了防止它对其他人有所帮助,我最终实施了自己的 IErrorMessageSerializer (as opposed to implementing the whole IConsumerErrorStrategy,这似乎有点矫枉过正)。我在 body(而不是 header)中添加重试信息的原因是 EasyNetQ 不处理 header 中的复杂类型(反正不是 out-of-the-box) .因此,使用字典可以为不同的消费者提供更多控制。我在创建总线时注册了自定义序列化程序,如下所示:
_defaultBus = RabbitHutch.CreateBus(currentConnString, serviceRegister => serviceRegister.Register<IErrorMessageSerializer>(serviceProvider => new RetryEnabledErrorMessageSerializer<IMessageType>(givenSubscriptionId)));
并且像这样实现了 Serialize 方法:
public class RetryEnabledErrorMessageSerializer<T> : IErrorMessageSerializer where T : class, IMessageType
{
public string Serialize(byte[] messageBody)
{
string stringifiedMsgBody = Encoding.UTF8.GetString(messageBody);
var objectifiedMsgBody = JObject.Parse(stringifiedMsgBody);
// Add/update RetryInformation into objectifiedMsgBody here
// I have a dictionary that saves <key:consumerId, val: TryInfoObj>
return JsonConvert.SerializeObject(objectifiedMsgBody);
}
}
实际重试是由一个简单的控制台 app/windows 服务通过 EasyNetQ 管理定期完成的 API:
var client = new ManagementClient(AppConfig.BaseAddress, AppConfig.RabbitUsername, AppConfig.RabbitPassword);
var vhost = client.GetVhostAsync("/").Result;
var aliveRes = client.IsAliveAsync(vhost).Result;
var errQueue = client.GetQueueAsync(Constants.EasyNetQErrorQueueName, vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue, Ackmodes.ack_requeue_false);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue, crit).Result;
foreach (var errMsg in errMsgs)
{
var innerMsg = JsonConvert.DeserializeObject<Error>(errMsg.Payload);
var pubInfo = new PublishInfo(innerMsg.RoutingKey, innerMsg.Message);
pubInfo.Properties.Add("type", innerMsg.BasicProperties.Type);
pubInfo.Properties.Add("correlation_id", innerMsg.BasicProperties.CorrelationId);
pubInfo.Properties.Add("delivery_mode", innerMsg.BasicProperties.DeliveryMode);
var pubRes = client.PublishAsync(client.GetExchangeAsync(innerMsg.Exchange, vhost).Result,
pubInfo).Result;
}
我的消费者自己知道是否启用重试,从而赋予它更多控制权,以便它可以选择处理重试的消息或忽略它。一旦被忽略,msg 显然不会被再次尝试;这就是 EasyNetQ 的工作原理。