确保事件最终发布到消息队列系统的最佳方式

Best way to ensure an event is eventually published to a message queuing sytem

请假设您有如下方法:

public void PlaceOrder(Order order)
{
     this.SaveOrderToDataBase(order);
     this.bus.Publish(new OrderPlaced(Order));    
}

订单保存到数据库后,一个事件被发布到消息队列系统,所以同一台或另一台机器上的其他子系统可以处理它。

但是,如果 this.bus.Publish(new OrderPlaced(Order)) 调用失败会怎样?或者机器在将订单保存到数据库后就崩溃了?该事件未发布,其他子系统无法处理。这是unacceptable。如果发生这种情况,我需要确保事件最终发布。

我可以使用哪些 acceptable 策略?哪个最好看?

注意:我不想使用分布式事务。

编辑:

Paul Sasik 非常接近,我认为我可以达到100%。这是我的想法:

首先在数据库中创建一个 table 事件,如下所示:

CREATE TABLE Events (EventId int PRIMARY KEY)

您可能希望使用 guid 而不是 int,或者您可以使用序列或标识。

然后执行以下伪代码:

open transaction
save order and event via A SINGLE transaction
in case of failure, report error and return
place order in message queue
in case of failure, report error, roll back transaction and return
commit transaction

所有事件都必须包含 EventId。当事件订阅者收到事件时,他们首先检查数据库中是否存在 EventId。

这样你就可以获得 100% 的可靠性,而不仅仅是 99.999%

您可以使 this.bus.Publish 调用成为 this.SaveOrderToDataBase 的数据库事务的一部分。这意味着 this.SaveOrderToDataBase 在事务范围内执行,如果 db 调用失败,则永远不会调用 mq,如果 mq 调用失败,则回滚 db 事务,使两个系统处于一致状态。如果两个调用都成功,则提交数据库事务。

伪代码:

open transaction
save order via transaction
in case of failure, report error and return
place order in message queue
in case of failure, report error, roll back transaction and return
commit transaction

您没有提到任何特定的数据库技术,所以这里是 link 到 a wiki article on transactions。即使您不熟悉交易,它也是一个很好的起点。还有一点好消息:它们并不难实现。

在此 video and on this blog post

中解释了确保事件最终发布到消息队列系统的正确方法

基本上您需要在执行业务逻辑操作的同一个事务中将要发送的消息存储到数据库中,然后将消息异步发送到总线并在另一个事务中从数据库中删除消息:

public void PlaceOrder(Order order)
{
     BeginTransaction();
     Try 
     {
         SaveOrderToDataBase(order);
         ev = new OrderPlaced(Order);
         SaveEventToDataBase(ev);
         CommitTransaction();
     }
     Catch 
     {
          RollbackTransaction();
          return;
     }

     PublishEventAsync(ev);    
}

async Task PublishEventAsync(BussinesEvent ev) 
{
    BegintTransaction();
    try 
    {
         await DeleteEventAsync(ev);
         await bus.PublishAsync(ev);
         CommitTransaction();
    }
    catch 
    {
         RollbackTransaction();
    }

}

因为 PublishEventAsync 可能会失败,您必须稍后重试,所以您需要一个后台进程来重试失败的发送,如下所示:

foreach (ev in eventsThatNeedsToBeSent) {
    await PublishEventAsync(ev);
}