当实体状态改变时向主题发送消息的模式

Pattern for sending messages to topics when entity state changes

我们使用 Azure 服务总线在应用程序中的某个实体更改为某个状态时通知订阅者。现在我们在调用 dbContext.SaveChangesAsync():

之后立即执行此操作
  1. dbContext.SaveChangesAsync()

  2. topicClient.SendAsync(someMessage)

我遇到的问题是:假设 dbContext.SaveChangesAsync() 运行良好,但无论出于何种原因,对 topicClient.SendAsync() 的调用都会引发异常。现在该主题的订阅者不会知道实体的状态更改。

我尝试使用 TransactionScope,但这不起作用,因为据我所知,Azure 不使用 DTC。

(我可以调换以上两步的顺序,但是如果消息发送正常,保存失败,则消息中包含虚假数据。)

有人对如何处理这个问题有什么建议吗?好像应该是常见的,但是我在网上找不到任何东西。如果有人能指出正确的方向,我将不胜感激。

提前致谢。

您可以实施名为 poison message queue or dead letter queue 的模式。还有其他已知的同义词,例如 retry queue.

想法是尝试一个操作,如果不成功,将一些元信息放入队列中,稍后重试该操作。在你的情况下,在调用 dbContext.SaveChangesAsync() 之后,你可以将所有必要的信息放入 durable 队列中,并有一个处理程序来处理该队列,在某种 ProcessMessage() 中你可以处理调用的 topicClient.SendAsync(someMessage).
例如,如果调用服务总线不成功,您可以 return 将项目排回队列以供稍后处理。

当然,您可以仅将提到的队列专用于 topicClient.SendAsync(someMessage) 的失败调用,这可以显着减小其大小。操作顺序无关紧要,因为您可以放置​​任何元信息,这使得可以先调用 topicClient.SendAsync(someMessage) 然后尝试更新数据库。

不是您所面临问题的直接答案,但我将分享我们如何解决我们的一个应用程序中的类似问题,在该应用程序中我们必须将数据写入 Azure table 中 Table 存储。因为我们正在将数据写入单独的 table,所以我们无法使用 Azure Table 存储中可用的实体批处理事务功能。

我们解决这个问题的方法是实现类似于 eventual consistency pattern

我们所做的不是直接在 tables 中执行保存数据(这意味着发出多个网络请求,其中任何一个都可能失败),我们将需要保存的数据发送到队列中(我们使用了存储队列)。如果我们能够将数据保存在队列中,则意味着数据最终将可用。

然后我们写了一个Azure Queue Triggered Function。在该函数中,我们将数据保存在我们需要保存的 table 中。一旦所有操作都成功,该消息将被函数运行时自动删除。如果任何操作失败,消息将再次发送到队列并再次出队。

现在要了解的一件重要事情是这些保存方法必须是幂等的。假设我们正在写入 3 tables,第一个 table 的写操作成功,但第二个 table 的写操作失败。下次调用函数时,它将尝试再次写入 1st table,代码应该能够优雅地处理它。

如您所述,操作顺序无关紧要,因为数据库和消息服务这两种服务之间没有重叠事务。如果您使用的数据存储支持事务(例如 Azure SQL 服务器),您可以在不使用两阶段提交的情况下逃脱并考虑实施 Outbox pattern.

NServiceBus 提供模式 feature. You can download the outbox sample that shows how to use it with RabbitMQ. The transport can be replaced with Azure Service Bus transport 来满足您的要求。

披露:我为 NServiceBus 做出贡献。