如何使用 EasyNetQ / RabbitMQ 进行错误处理
How to do error handling with EasyNetQ / RabbitMQ
我在 C# 中将 RabbitMQ 与 EasyNetQ 库一起使用。我在这里使用 pub/sub 模式。我还有一些问题希望有人能帮助我:
- 当使用消息时出现错误,它会自动移至错误队列。我如何实现重试(以便它被放回原始队列,当它处理 X 次失败时,它被移动到死信队列)?
- 据我所知,总是有 1 个错误队列用于转储来自所有其他队列的消息。我怎样才能让每种类型有 1 个错误队列,以便每个队列都有自己关联的错误队列?
- 如何轻松重试错误队列中的消息?我试过 Hosepipe,但它只是将消息重新发布到错误队列而不是原始队列。我也不太喜欢这个选项,因为我不想在控制台中摆弄。最好我只是针对错误队列进行编程。
有人吗?
您 运行 使用 EasyNetQ/RabbitMQ 遇到的问题是,与 SQS 或 Azure 服务 Bus/Queues 等其他消息服务相比,它 "raw" 多得多,但我我会尽力为您指明正确的方向。
问题一
这将由您来完成。最简单的方法是在RabbitMQ/EasyNetQ中No-Ack一个消息,它会被放在队列的头部让你重试。这并不是真正可取的,因为它几乎会立即重试(没有时间延迟),并且还会阻止处理其他消息(如果您有一个预取计数为 1 的订阅者)。
我见过使用 "MessageEnvelope" 的其他实现。因此,包装器 class 当消息失败时,您会在 MessageEnvelope 上增加一个重试变量,并将消息重新传送回队列。您将不得不这样做并围绕您的消息处理程序编写包装代码,这不是 EasyNetQ 的功能。
使用上面的方法,我也看到人们使用信封,但允许邮件是死信。一旦它进入死信队列,就会有另一个 application/worker 从死信队列中读取项目。
上面的所有这些方法都有一个小问题,那就是没有什么好的方法可以增加 logarithmic/exponential/any 处理消息的延迟。在将消息返回到队列之前,您可以在代码中 "hold" 消息一段时间,但这不是一个很好的解决方法。
在所有这些选项中,您自己的自定义应用程序读取死信队列并根据包含重试计数的信封决定是否重新路由消息可能是最好的方法。
问题2.
您可以使用高级 API 为每个队列指定死信交换。 (https://github.com/EasyNetQ/EasyNetQ/wiki/The-Advanced-API#declaring-queues)。然而,这意味着您将不得不在几乎所有地方使用高级 API,因为使用 subscribe/publish 的简单 IBus 实现会查找基于消息类型和订阅者名称命名的队列。使用队列的自定义声明意味着您将自己处理队列的命名,这意味着当您订阅时,您将需要知道您想要的名称等。不再为您自动订阅!
问题3
一个错误Queue/Dead Letter Queue 只是另一个队列。您可以收听此队列并使用它执行您需要执行的操作。但是实际上并没有任何开箱即用的解决方案听起来可以满足您的需求。
我完全按照您的描述实施了。以下是根据我的经验并与您的每个问题相关的一些提示。
Q1(如何重试X次):
为此,您可以使用 IMessage.Body.BasicProperties.Headers
。当您使用错误消息 queue 时,只需添加带有您选择的名称的 header。在出现错误 queue 的每条消息上查找此 header 并递增它。这将为您提供 运行 重试次数。
非常重要当消息超过 X 的重试限制时,您有一个策略来确定该怎么做。您不想丢失该消息。就我而言,此时我将消息写入磁盘。它为您提供了很多有用的调试信息,以便稍后返回,因为 EasyNetQ 会自动用错误信息包装您的原始消息。它还具有原始消息,因此您可以根据需要手动(或通过一些批处理 re-processing 代码自动)稍后以某种受控方式重新queue 消息。
您可以查看 Hosepipe 实用程序中的代码,了解执行此操作的好方法。事实上,如果您遵循在那里看到的模式,那么您甚至可以在需要时使用 Hosepipe 重新queue 消息。
Q2(如何根据原始 queue 创建错误 queue):
您可以使用 EasyNetQ Advanced Bus 干净地完成此操作。使用 IBus.Advanced.Container.Resolve<IConventions>
进入约定界面。然后你可以设置错误 queue 命名的约定为 conventions.ErrorExchangeNamingConvention
和 conventions.ErrorQueueNamingConvention
。在我的例子中,我将约定设置为基于原始 queue 的名称,这样我每次创建 queue 时都会得到一对 queue/queue_error 对 queue。
Q3(如何处理报错queues中的消息):
您可以像处理任何其他 queue 一样为错误声明消费者 queue。同样,AdvancedBus 允许您通过指定 queue 的类型是 EasyNetQ.SystemMessage.Error
来干净地完成此操作。所以,IAdvancedBus.Consume<EasyNetQ.SystemMessage.Error>()
会让你到达那里。重试只是意味着重新发布到原始交换(注意您在 header 中输入的重试次数(请参阅我对上面 Q1 的回答),以及错误消息中的信息,您消耗了错误 queue 可以帮你找到转载的目标
我知道这是一个旧的 post 但是 - 以防万一它可以帮助别人 - 这里是 my self-answered question (我需要问它因为现有的帮助还不够)它解释了我如何在其原始 queue 上实现了重试失败消息。以下内容应回答您的问题 #1 和 #3。对于 #2,您可能必须使用我没有使用过的 Advanced API(我认为它违背了 EasyNetQ 的目的;不妨使用RabbitMQ 客户端直接)。不过,也可以考虑实施 IConsumerErrorStrategy。
1) 由于一条消息可能有多个消费者,并且都可能不需要重试一条消息,所以我在消息的 body 中有一个 Dictionary<consumerId, RetryInfo>
,因为 EasyNetQ 没有 (开箱即用)支持消息中的复杂类型 headers.
public interface IMessageType
{
int MsgTypeId { get; }
Dictionary<string, TryInfo> MsgTryInfo {get; set;}
}
2) 我已经实现了一个 class RetryEnabledErrorMessageSerializer : IErrorMessageSerializer
,它在每次被框架调用时只更新 TryCount 和其他信息。我通过 EasyNetQ 提供的 IoC 支持在 per-consumer 基础上将此自定义序列化器附加到框架。
public class RetryEnabledErrorMessageSerializer<T> : IErrorMessageSerializer where T : class, IMessageType
{
public string Serialize(byte[] messageBody)
{
string stringifiedMsgBody = Encoding.UTF8.GetString(messageBody);
var objectifiedMsgBody = JObject.Parse(stringifiedMsgBody);
// Add/update RetryInformation into objectifiedMsgBody here
// I have a dictionary that saves <key:consumerId, val: TryInfoObj>
return JsonConvert.SerializeObject(objectifiedMsgBody);
}
}
在我的 EasyNetQ 包装器中 class:
public void SetupMessageBroker(string givenSubscriptionId, bool enableRetry = false)
{
if (enableRetry)
{
_defaultBus = RabbitHutch.CreateBus(currentConnString,
serviceRegister => serviceRegister.Register<IErrorMessageSerializer>(serviceProvider => new RetryEnabledErrorMessageSerializer<IMessageType>(givenSubscriptionId))
);
}
else // EasyNetQ's DefaultErrorMessageSerializer will wrap error messages
{
_defaultBus = RabbitHutch.CreateBus(currentConnString);
}
}
public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
{
IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);
// Using the msgHandler allows to add a mediator between EasyNetQ and the actual callback function
// The mediator can transmit the retried msg or choose to ignore it
return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
}
3) 将消息添加到默认错误 queue 后,您可以拥有一个简单的控制台 app/windows 服务,该服务会定期在其原始 queue 上重新发布现有错误消息。类似于:
var client = new ManagementClient(AppConfig.BaseAddress, AppConfig.RabbitUsername, AppConfig.RabbitPassword);
var vhost = client.GetVhostAsync("/").Result;
var aliveRes = client.IsAliveAsync(vhost).Result;
var errQueue = client.GetQueueAsync(Constants.EasyNetQErrorQueueName, vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue, Ackmodes.ack_requeue_false);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue, crit).Result;
foreach (var errMsg in errMsgs)
{
var innerMsg = JsonConvert.DeserializeObject<Error>(errMsg.Payload);
var pubInfo = new PublishInfo(innerMsg.RoutingKey, innerMsg.Message);
pubInfo.Properties.Add("type", innerMsg.BasicProperties.Type);
pubInfo.Properties.Add("correlation_id", innerMsg.BasicProperties.CorrelationId);
pubInfo.Properties.Add("delivery_mode", innerMsg.BasicProperties.DeliveryMode);
var pubRes = client.PublishAsync(client.GetExchangeAsync(innerMsg.Exchange, vhost).Result, pubInfo).Result;
}
4) 我有一个包含回调函数的 MessageHandler class。每当将消息传递给消费者时,它都会转到 MessageHandler,它决定消息尝试是否有效,如果有效,则调用实际的回调。如果尝试无效(maxRetriesExceeded/the 消费者无论如何都不需要重试),我将忽略该消息。在这种情况下,您可以选择死信消息。
public interface IMsgHandler<T> where T: class, IMessageType
{
Task InvokeMsgCallbackFunc(T msg);
Func<T, Task> MsgCallbackFunc { get; set; }
bool IsTryValid(T msg, string refSubscriptionId); // Calls callback only
// if Retry is valid
}
这里是调用回调的MsgHandler
中的中介函数:
public async Task InvokeMsgCallbackFunc(T msg)
{
if (IsTryValid(msg, CurrSubscriptionId))
{
await this.MsgCallbackFunc(msg);
}
else
{
// Do whatever you want
}
}
这里,我为此实现了一个Nuget包(EasyDeadLetter),可以在任何项目中以最少的改动轻松实现。
您需要做的就是遵循以下四个步骤:
首先,用 QeueuAttribute
装饰你的 class 对象
[Queue(“Product.Report”, ExchangeName = “Product.Report”)]
public class ProductReport { }
第二步是使用相同的 QueueAttribute 定义您的 dead-letter 队列,并从 Main 对象 class.[=16 继承 dead-letter 对象=]
[Queue(“Product.Report.DeadLetter”, ExchangeName =
“Product.Report.DeadLetter”)]
public class ProductReportDeadLetter : ProductReport { }
现在,是时候用 EasyDeadLetter 属性装饰主队列对象并设置 dead-letter 队列的类型了。
[EasyDeadLetter(DeadLetterType =
typeof(ProductReportDeadLetter))]
[Queue(“Product.Report”, ExchangeName = “Product.Report”)]
public class ProductReport { }
最后一步,您需要将 EasyDeadLetterStrategy 注册为默认错误处理程序(IConsumerErrorStrategy)。
services.AddSingleton<IBus>
(RabbitHutch.CreateBus(“connectionString”,
serviceRegister =>
{
serviceRegister.Register<IConsumerErrorStrategy,
EasyDeadLetterStrategy>();
}));
就是这样。从现在开始,任何失败的消息都将移至相关的 dead-letter 队列。
在此处查看更多详细信息:
GitHub Repository
NuGet Package
我在 C# 中将 RabbitMQ 与 EasyNetQ 库一起使用。我在这里使用 pub/sub 模式。我还有一些问题希望有人能帮助我:
- 当使用消息时出现错误,它会自动移至错误队列。我如何实现重试(以便它被放回原始队列,当它处理 X 次失败时,它被移动到死信队列)?
- 据我所知,总是有 1 个错误队列用于转储来自所有其他队列的消息。我怎样才能让每种类型有 1 个错误队列,以便每个队列都有自己关联的错误队列?
- 如何轻松重试错误队列中的消息?我试过 Hosepipe,但它只是将消息重新发布到错误队列而不是原始队列。我也不太喜欢这个选项,因为我不想在控制台中摆弄。最好我只是针对错误队列进行编程。
有人吗?
您 运行 使用 EasyNetQ/RabbitMQ 遇到的问题是,与 SQS 或 Azure 服务 Bus/Queues 等其他消息服务相比,它 "raw" 多得多,但我我会尽力为您指明正确的方向。
问题一
这将由您来完成。最简单的方法是在RabbitMQ/EasyNetQ中No-Ack一个消息,它会被放在队列的头部让你重试。这并不是真正可取的,因为它几乎会立即重试(没有时间延迟),并且还会阻止处理其他消息(如果您有一个预取计数为 1 的订阅者)。
我见过使用 "MessageEnvelope" 的其他实现。因此,包装器 class 当消息失败时,您会在 MessageEnvelope 上增加一个重试变量,并将消息重新传送回队列。您将不得不这样做并围绕您的消息处理程序编写包装代码,这不是 EasyNetQ 的功能。
使用上面的方法,我也看到人们使用信封,但允许邮件是死信。一旦它进入死信队列,就会有另一个 application/worker 从死信队列中读取项目。
上面的所有这些方法都有一个小问题,那就是没有什么好的方法可以增加 logarithmic/exponential/any 处理消息的延迟。在将消息返回到队列之前,您可以在代码中 "hold" 消息一段时间,但这不是一个很好的解决方法。
在所有这些选项中,您自己的自定义应用程序读取死信队列并根据包含重试计数的信封决定是否重新路由消息可能是最好的方法。
问题2.
您可以使用高级 API 为每个队列指定死信交换。 (https://github.com/EasyNetQ/EasyNetQ/wiki/The-Advanced-API#declaring-queues)。然而,这意味着您将不得不在几乎所有地方使用高级 API,因为使用 subscribe/publish 的简单 IBus 实现会查找基于消息类型和订阅者名称命名的队列。使用队列的自定义声明意味着您将自己处理队列的命名,这意味着当您订阅时,您将需要知道您想要的名称等。不再为您自动订阅!
问题3
一个错误Queue/Dead Letter Queue 只是另一个队列。您可以收听此队列并使用它执行您需要执行的操作。但是实际上并没有任何开箱即用的解决方案听起来可以满足您的需求。
我完全按照您的描述实施了。以下是根据我的经验并与您的每个问题相关的一些提示。
Q1(如何重试X次):
为此,您可以使用 IMessage.Body.BasicProperties.Headers
。当您使用错误消息 queue 时,只需添加带有您选择的名称的 header。在出现错误 queue 的每条消息上查找此 header 并递增它。这将为您提供 运行 重试次数。
非常重要当消息超过 X 的重试限制时,您有一个策略来确定该怎么做。您不想丢失该消息。就我而言,此时我将消息写入磁盘。它为您提供了很多有用的调试信息,以便稍后返回,因为 EasyNetQ 会自动用错误信息包装您的原始消息。它还具有原始消息,因此您可以根据需要手动(或通过一些批处理 re-processing 代码自动)稍后以某种受控方式重新queue 消息。
您可以查看 Hosepipe 实用程序中的代码,了解执行此操作的好方法。事实上,如果您遵循在那里看到的模式,那么您甚至可以在需要时使用 Hosepipe 重新queue 消息。
Q2(如何根据原始 queue 创建错误 queue):
您可以使用 EasyNetQ Advanced Bus 干净地完成此操作。使用 IBus.Advanced.Container.Resolve<IConventions>
进入约定界面。然后你可以设置错误 queue 命名的约定为 conventions.ErrorExchangeNamingConvention
和 conventions.ErrorQueueNamingConvention
。在我的例子中,我将约定设置为基于原始 queue 的名称,这样我每次创建 queue 时都会得到一对 queue/queue_error 对 queue。
Q3(如何处理报错queues中的消息):
您可以像处理任何其他 queue 一样为错误声明消费者 queue。同样,AdvancedBus 允许您通过指定 queue 的类型是 EasyNetQ.SystemMessage.Error
来干净地完成此操作。所以,IAdvancedBus.Consume<EasyNetQ.SystemMessage.Error>()
会让你到达那里。重试只是意味着重新发布到原始交换(注意您在 header 中输入的重试次数(请参阅我对上面 Q1 的回答),以及错误消息中的信息,您消耗了错误 queue 可以帮你找到转载的目标
我知道这是一个旧的 post 但是 - 以防万一它可以帮助别人 - 这里是 my self-answered question (我需要问它因为现有的帮助还不够)它解释了我如何在其原始 queue 上实现了重试失败消息。以下内容应回答您的问题 #1 和 #3。对于 #2,您可能必须使用我没有使用过的 Advanced API(我认为它违背了 EasyNetQ 的目的;不妨使用RabbitMQ 客户端直接)。不过,也可以考虑实施 IConsumerErrorStrategy。
1) 由于一条消息可能有多个消费者,并且都可能不需要重试一条消息,所以我在消息的 body 中有一个 Dictionary<consumerId, RetryInfo>
,因为 EasyNetQ 没有 (开箱即用)支持消息中的复杂类型 headers.
public interface IMessageType
{
int MsgTypeId { get; }
Dictionary<string, TryInfo> MsgTryInfo {get; set;}
}
2) 我已经实现了一个 class RetryEnabledErrorMessageSerializer : IErrorMessageSerializer
,它在每次被框架调用时只更新 TryCount 和其他信息。我通过 EasyNetQ 提供的 IoC 支持在 per-consumer 基础上将此自定义序列化器附加到框架。
public class RetryEnabledErrorMessageSerializer<T> : IErrorMessageSerializer where T : class, IMessageType
{
public string Serialize(byte[] messageBody)
{
string stringifiedMsgBody = Encoding.UTF8.GetString(messageBody);
var objectifiedMsgBody = JObject.Parse(stringifiedMsgBody);
// Add/update RetryInformation into objectifiedMsgBody here
// I have a dictionary that saves <key:consumerId, val: TryInfoObj>
return JsonConvert.SerializeObject(objectifiedMsgBody);
}
}
在我的 EasyNetQ 包装器中 class:
public void SetupMessageBroker(string givenSubscriptionId, bool enableRetry = false)
{
if (enableRetry)
{
_defaultBus = RabbitHutch.CreateBus(currentConnString,
serviceRegister => serviceRegister.Register<IErrorMessageSerializer>(serviceProvider => new RetryEnabledErrorMessageSerializer<IMessageType>(givenSubscriptionId))
);
}
else // EasyNetQ's DefaultErrorMessageSerializer will wrap error messages
{
_defaultBus = RabbitHutch.CreateBus(currentConnString);
}
}
public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
{
IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);
// Using the msgHandler allows to add a mediator between EasyNetQ and the actual callback function
// The mediator can transmit the retried msg or choose to ignore it
return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
}
3) 将消息添加到默认错误 queue 后,您可以拥有一个简单的控制台 app/windows 服务,该服务会定期在其原始 queue 上重新发布现有错误消息。类似于:
var client = new ManagementClient(AppConfig.BaseAddress, AppConfig.RabbitUsername, AppConfig.RabbitPassword);
var vhost = client.GetVhostAsync("/").Result;
var aliveRes = client.IsAliveAsync(vhost).Result;
var errQueue = client.GetQueueAsync(Constants.EasyNetQErrorQueueName, vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue, Ackmodes.ack_requeue_false);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue, crit).Result;
foreach (var errMsg in errMsgs)
{
var innerMsg = JsonConvert.DeserializeObject<Error>(errMsg.Payload);
var pubInfo = new PublishInfo(innerMsg.RoutingKey, innerMsg.Message);
pubInfo.Properties.Add("type", innerMsg.BasicProperties.Type);
pubInfo.Properties.Add("correlation_id", innerMsg.BasicProperties.CorrelationId);
pubInfo.Properties.Add("delivery_mode", innerMsg.BasicProperties.DeliveryMode);
var pubRes = client.PublishAsync(client.GetExchangeAsync(innerMsg.Exchange, vhost).Result, pubInfo).Result;
}
4) 我有一个包含回调函数的 MessageHandler class。每当将消息传递给消费者时,它都会转到 MessageHandler,它决定消息尝试是否有效,如果有效,则调用实际的回调。如果尝试无效(maxRetriesExceeded/the 消费者无论如何都不需要重试),我将忽略该消息。在这种情况下,您可以选择死信消息。
public interface IMsgHandler<T> where T: class, IMessageType
{
Task InvokeMsgCallbackFunc(T msg);
Func<T, Task> MsgCallbackFunc { get; set; }
bool IsTryValid(T msg, string refSubscriptionId); // Calls callback only
// if Retry is valid
}
这里是调用回调的MsgHandler
中的中介函数:
public async Task InvokeMsgCallbackFunc(T msg)
{
if (IsTryValid(msg, CurrSubscriptionId))
{
await this.MsgCallbackFunc(msg);
}
else
{
// Do whatever you want
}
}
这里,我为此实现了一个Nuget包(EasyDeadLetter),可以在任何项目中以最少的改动轻松实现。 您需要做的就是遵循以下四个步骤:
首先,用 QeueuAttribute
装饰你的 class 对象[Queue(“Product.Report”, ExchangeName = “Product.Report”)] public class ProductReport { }
第二步是使用相同的 QueueAttribute 定义您的 dead-letter 队列,并从 Main 对象 class.[=16 继承 dead-letter 对象=]
[Queue(“Product.Report.DeadLetter”, ExchangeName = “Product.Report.DeadLetter”)] public class ProductReportDeadLetter : ProductReport { }
现在,是时候用 EasyDeadLetter 属性装饰主队列对象并设置 dead-letter 队列的类型了。
[EasyDeadLetter(DeadLetterType = typeof(ProductReportDeadLetter))] [Queue(“Product.Report”, ExchangeName = “Product.Report”)] public class ProductReport { }
最后一步,您需要将 EasyDeadLetterStrategy 注册为默认错误处理程序(IConsumerErrorStrategy)。
services.AddSingleton<IBus> (RabbitHutch.CreateBus(“connectionString”, serviceRegister => { serviceRegister.Register<IConsumerErrorStrategy, EasyDeadLetterStrategy>(); }));
就是这样。从现在开始,任何失败的消息都将移至相关的 dead-letter 队列。
在此处查看更多详细信息: GitHub Repository NuGet Package