RabbitMQ+MassTransit:如何取消处理排队的消息?

RabbitMQ+MassTransit: how to cancel queued message from processing?

在某些特殊情况下,我需要以某种方式告诉接收点的消费者某些消息不应该被处理。否则两个系统将变得不同步(我们处理一些过时的外部系统,例如,如果连接断开,我们必须丢弃该连接范围内的所有排队操作)。

冒险并手动解决问题消息?补偿行动(在我的情况下可能很难支持)?还有什么吗?

有几种方法:

  • 您可以在发送消息时设置一个time-to-live:await endpoint.Send(myMessage, c => c.TimeToLive = TimeSpan.FromHours(1));,但这将适用于所有像这样发送(或发布)的消息。在查看您的要求后,我会考虑这一点。这是技术性的,但它是一种正确的消息传递模式。

  • 为您的消息本身设置 TTL 和生成时间戳属性,让消费者决定消息是否仍然值得处理。这是更多的业务,而且可能是最正确的方法。

  • 结合技术和业务 - 在消息中保留时间戳和 TTL headers 这样它们就不会污染您的消息合同,并使用自定义中间件将它们过滤掉。在这种情况下,您需要小心记录此类丢弃,这样您就不会想知道为什么消息会时不时地消失。

  • 几乎所有不可靠的集成都可以使用 sagas 进行监控,并设置超时。例如,我们使用 saga 与 Twilio 集成。由于我们无法为他们打开 webhook,因此我们会在一段时间后轮询以检查消息状态。您可以在收到消息时启动 saga,并安排一条消息来检查处理是否仍在等待。正如评论中所讨论的,您可以使用 "human intervention required" 方式来解决问题,或者让 saga 决定删除消息。

  • 类似的方法可能是使用查找 table,您可以在其中放置与处理无关的消息列表。这样的 table 类似于 sagas 列表。看来这种方式也需要调度。在这里,对于 saga,我建议对 DropIt 消息使用单独的接收端点(queue),并且只有一个消费者。这将防止 DropIt 消息卡在等待处理的集成消息后面(有些应该已经被丢弃)

  • 使用 RMQ 管理 API 从 queue 中删除消息。这是最糟糕的方法,我不会推荐它。

据我了解,您正在构建一个向第 3 方系统发送消息的系统。换句话说,您无法控制的系统。它有一个 API 但补偿操作并不总是可能的,因为 API 不提供它,或者因为操作是在无法补偿或回滚的第 3 方系统内执行的?

如果可能,请尝试通过 sagas 解决此问题。确保 saga 以正确的顺序执行不同的步骤(发送消息)。这样无法补偿的消息最后发送。这样,如果失败可以补偿的消息将由 saga 补偿。那些不能补偿的应该最后发送,当你尽可能确定它们不需要补偿时。因为最后一条消息是同步所有系统的最后一步。

总而言之,这是分布式系统的问题之一,要保持一切同步。补偿措施是解决这个问题的方法。如果无法采取补偿措施,您将处于非常困难的境地。尝试看看企业是否可以通过变得更加灵活并接受您需要补偿的东西来提供帮助,而他们会告诉您这是不可能的。

In some exceptional situations I need somehow to tell consumer on receiving point that some messages shouldn’t be processed.

你不能将其还原为:

告诉消费者可以处理较早的消息。

通过这种方式,您可以轻松地将其转换为作用于两条消息的状态机(如 saga)。如果第二条消息从未到达,那么您可以稍后丢弃第一条消息或执行其他操作。

这里的策略是 halt/wait 直到确定不需要还原任何操作。