消息代理不在时如何处理发布事件?

How to handle publishing event when message broker is out?

我在想如何在消息代理突然关闭时处理发送事件。请看一下这段代码

using (var uow = uowProvider.Create())
{
    ...
    ...
    var policy = offer.Buy(customer);

    uow.Policies.Add(policy);

    // DB changes are saved here! but what would happen if...
    await uow.CommitChanges();

    // ...eventPublisher throw an exception?
    await eventPublisher.PublishMessage(PolicyCreated(policy));

    return true;        
}

恕我直言,如果 eventPublisher 抛出异常,则不会发布事件 PolicyCreated。我不知道如何处理这种情况。该事件必须在系统中发布。我想唯一好的解决方案是创建某种重试机制,但我不确定...

您需要查看 Udi Dahan 对 Reliable Messaging without Distributed Transactions 的讨论。

但非常粗略地说,PolicyCreated 事件成为工作单元的一部分;要么是因为它保存在 Policy 表示本身中,要么是因为它保存在与 Policies 存储库参与同一事务的 EventRepository 中。

捕获数据库中的信息后,重试发布相对简单 - 从数据库中读取事件,发布,可选择将数据库中的事件标记为已成功发布,以便可以清理它们.

对于可靠的系统,您需要在本地保存事件。如果您的经纪人出现故障,您必须重试并发布事件。

实现此目的的方法有很多,但最常见的是发件箱模式。就像您的邮箱一样,您的 event/message 保留在本地并且您不断重试直到它被发送并且您将消息标记为在本地数据库中发布。

您可以在此处阅读更多信息Publish Events

我想详细说明一下@Imran Arshad 和@VoiceOfUnreason 提供的答案,这些答案当然是正确的。

发布消息基本上有 3 种模式:

  • exactly once delivery(需要分布式事务)
  • 最多一次传递(没有分布式事务但可能会丢失消息 - 就像 actor 模型一样)
  • 至少传递一次(没有分布式事务但可能有重复消息)

以下都是以你的例子为例

对于 exactly once 交付,数据库和队列都需要提供在分布式事务中登记的能力。一些队列不提供开箱即用的这个功能(比如 RabbitMQ),即使你可以自己推出它也可能不是最好的选择。分布式事务通常很慢。

对于 at most once 传递,我们必须接受我们可能会错过消息,我猜在大多数用例中这会很麻烦。您可以通过跟踪进度并拾取错过的消息并在需要时重新发送它们来解决这个问题。

对于 at least once 传递,我们需要确保消息是幂等的。当我们收到重复的消息(通常是边缘情况)时,它们应该被忽略,或者它们的结果应该与处理的初始消息相同。

现在,有几种方法可以解决您的问题。您可以启动数据库事务并更改数据库。在提交之前,您执行消息发送。如果失败,那么您的交易将被回滚。这适用于发送一条消息,但在您的情况下,某些订阅者可能已经收到一条消息。这使事情变得复杂,因为您的所有订阅者都需要收到消息,或者 none 的订阅者可以收到消息。

您可以让您的订户检查状态是否确实为真以及是否应继续处理。这给订阅者带来了负担并引入了一些耦合。如果状态不允许处理,它可以推迟操作,或者忽略它。

另一种选择是不发布事件,而是向自己发送指示步骤完成的命令。命令处理程序将执行发布并重试,直到所有订阅者队列都收到消息。这将要求相关订阅者忽略那些他们已经处理过的消息(幂等性)。

outbox 是一种存储转发 方法,最终会将消息发送给所有订阅者。您可以将 outbox 包含在数据库事务中。在我的 Shuttle.Esb service bus one of the folks that used it came across a weird side-effect that I had not planned. He used a sql-based queue 作为发件箱和队列连接到同一个数据库。因此,它包含在数据库事务中,如果未提交,将与所有其他更改一起回滚。很抱歉推销我自己的产品,但我相信其他服务总线产品可能具有相同的功能。

因此,需要考虑很多事情和各种技术来降低队列中断的风险。但是,我会将队列交互移动到数据库提交之前。