在数据库和 Kafka 生产者之间同步事务

Synchronising transactions between database and Kafka producer

我们有一个微服务架构,使用Kafka作为服务之间的通信机制。一些服务有自己的数据库。假设用户调用服务 A,这将导致在该服务的数据库中创建一条记录(或一组记录)。此外,此事件应该作为 Kafka 主题上的一个项目报告给其他服务。确保仅在成功更新 Kafka 主题时才写入数据库记录的最佳方法是什么(实质上是围绕数据库更新和 Kafka 更新创建分布式事务)?

我们正在考虑使用 spring-kafka (in a Spring Boot WebFlux service), and I can see that it has a KafkaTransactionManager, but from what I understand this is more about Kafka transactions themselves (ensuring consistency across the Kafka producers and consumers), rather than synchronising transactions across two systems (see : “Kafka doesn't support XA and you have to deal with the possibility that the DB tx might commit while the Kafka tx rolls back.”). Additionally, I think this class relies on Spring’s transaction framework which, at least as far as I currently understand, is thread-bound, and won’t work if using a reactive approach (e.g. WebFlux) where different parts of an operation may execute on different threads. (We are using reactive-pg-client,因此正在手动处理交易,而不是使用 Spring 的框架。)

我能想到的一些选项:

  1. 数据不写入数据库:只写入Kafka。然后使用消费者(在服务 A 中)更新数据库。这看起来可能不是最有效的,并且会出现问题,因为用户调用的服务无法立即看到它应该刚刚创建的数据库更改。
  2. 不要直接写入 Kafka:只写入数据库,并使用类似 Debezium 的方式将更改报告给 Kafka。这里的问题是更改是基于单个数据库记录的,而要存储在 Kafka 中的业务重大事件可能涉及来自多个表的数据组合。
  3. 先写入数据库(如果失败,什么也不做,抛出异常)。然后,在写入 Kafka 时,假设写入可能会失败。使用内置的自动重试功能让它继续尝试一段时间。如果最终完全失败,请尝试写入死信队列并为管理员创建某种手动机制来解决它。如果写入 DLQ 失败(即 Kafka 完全关闭),只需以其他方式记录(例如到数据库),然后再次创建某种手动机制让管理员解决问题。

有人对以上有任何想法或建议,或者能够纠正我以上假设中的任何错误吗?

提前致谢!

我建议使用方法 2 的稍作改动的变体。

仅写入您的数据库,但除了实际的 table 写入之外,还将 "events" 写入同一数据库中的特殊 table;这些事件记录将包含您需要的聚合。以最简单的方式,您只需插入另一个实体,例如由 JPA 映射,其中包含 JSON 属性 和聚合负载。当然,这可以通过事务侦听器/框架组件的某种方式实现自动化。

然后使用 Debezium 从 table 中捕获更改并将它们流式传输到 Kafka。这样你就拥有了两者:Kafka 中的最终一致状态(Kafka 中的事件可能会落后,或者你可能会在重启后第二次看到一些事件,但最终它们会反映数据库状态)而不需要分布式事务,以及您所追求的业务级事件语义。

(免责声明:我是 Debezium 的负责人;有趣的是,我正在写博客 post 更详细地讨论这种方法)

这是post

https://debezium.io/blog/2018/09/20/materializing-aggregate-views-with-hibernate-and-debezium/

https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/

首先,我不得不说我不是 Kafka,也不是 Spring 专家,但我认为在写入独立资源时,这更多是一个概念上的挑战,解决方案应该是适应性的table 到你的技术堆栈。此外,我应该说这个解决方案试图在没有像 Debezium 这样的外部组件的情况下解决问题,因为在我看来,每个额外的组件都会在测试、维护和 运行 应用程序方面带来挑战,而在选择这样的选项时经常被低估.也不是每个数据库都可以用作 Debezium 源。

为确保我们谈论的是相同的目标,让我们以一个简化的航空公司示例来说明情况,客户可以在其中购买机票。成功订购后,客户将收到一条由外部消息系统(我们必须与之交谈的系统)发送的消息(邮件、推送通知等)。

在我们的数据库(我们存储订单的地方)和 JMS 提供程序之间具有 XA 事务的传统 JMS 世界中,它看起来如下所示:客户端将订单设置到我们开始事务的应用程序。该应用程序将订单存储在其数据库中。然后消息被发送到 JMS,您可以提交事务。即使在与自己的资源对话时,这两个操作也会参与事务。由于 XA 事务保证 ACID 我们很好。

让我们将 Kafka(或任何其他无法参与 XA 事务的资源)带入游戏。由于不再有同步两个事务的协调器,因此以下的主要思想是将处理分成具有持久状态的两部分。

当您将订单存储在数据库中时,您还可以将消息(带有聚合数据)存储在同一数据库中(例如,作为 CLOB 列中的 JSON),之后您希望将其发送到 Kafka .相同的资源——有 ACID 保证,到目前为止一切正常。现在你需要一种机制来轮询你的“KafkaTasks”-Table 以获取应该发送到 Kafka-Topic 的新任务(例如,使用定时器服务,也许可以在 Spring 中使用 @Scheduled 注释) .消息成功发送到 Kafka 后,您可以删除任务条目。这确保了只有当订单也成功存储在应用程序数据库中时,才会向 Kafka 发送消息。我们是否获得了与使用 XA 事务时相同的保证?不幸的是,不,因为仍有可能写入 Kafka 有效但删除任务失败。在这种情况下,重试机制(您需要一个问题中提到的机制)将重新处理任务并发送消息两次。如果您的业务案例对这种“至少一次”保证感到满意,那么您已经完成了一个恕我直言的半复杂解决方案,该解决方案可以很容易地实现为框架功能,因此每个人都不必为细节烦恼。

如果你需要“exactly-once”那么你不能将你的状态存储在应用程序数据库中(在这种情况下“任务的删除”是“状态”)但是你必须将它存储在Kafka中(假设你在两个 Kafka 主题之间有 ACID 保证)。举个例子:假设您在 table(ID 1 到 100)中有 100 个任务,任务作业处理前 10 个。您将 Kafka 消息写入它们的主题,并将 ID 为 10 的另一条消息写入“您的主题” ”。全部在同一个 Kafka 事务中。在下一个周期中,你消费了你的主题(值为 10),并取这个值来获取接下来的 10 个任务(并删除已经处理的任务)。

如果有更简单的(应用程序中的)解决方案具有相同的保证,我期待收到您的来信!

抱歉,回答很长,但希望对您有所帮助。

上述所有方法都是解决问题的最佳方法,并且是定义明确的模式。您可以在下面提供的链接中探索这些内容。

模式:交易发件箱

通过将事件或消息保存在数据库的发件箱中,将其发布为数据库事务的一部分。 http://microservices.io/patterns/data/transactional-outbox.html

模式:轮询发布者

通过轮询数据库中的发件箱来发布消息。 http://microservices.io/patterns/data/polling-publisher.html

模式:事务日志拖尾

通过跟踪事务日志发布对数据库所做的更改。 http://microservices.io/patterns/data/transaction-log-tailing.html

Debezium 是一个有效的答案,但(正如我所经历的)它可能需要一些额外的开销 运行 一个额外的 pod 并确保 pod 不会倒下。这可能只是我抱怨一些背靠背的实例,其中 pods OOM 出错并且没有恢复,网络规则推出丢弃了一些消息,WAL 对 aws aurora 数据库的访问开始表现得很奇怪......它似乎所有可能出错的事情都发生了。并不是说 Debezium 不好,它非常稳定,但对于开发人员来说 运行 它通常成为一种网络技能而不是编码技能。

作为使用正常编码解决方案的 KISS 解决方案,99.99% 的时间(并通知您 .01%)将是:

  • 开始交易
  • 同步保存到数据库
  • -> 如果失败,则退出。
  • 向kafka异步发送消息。
  • 阻塞直到主题报告它已经收到 留言。
  • -> 如果超时或失败则中止交易。
  • -> 如果成功提交交易。

我建议使用一种新方法 2 阶段消息。在这种新方法中,需要的代码少得多,而且您不再需要 Debeziums。

https://betterprogramming.pub/an-alternative-to-outbox-pattern-7564562843ae

对于这种新方法,您需要做的是:

  1. 写入数据库时​​,将事件记录写入辅助table。
  2. DTM
  3. 提交 2 阶段消息
  4. 写一个服务查询辅助中是否保存了一个事件table.

借助DTM SDK,您可以在Go中用8行代码完成上述3个步骤,比其他解决方案代码更少。

msg := dtmcli.NewMsg(DtmServer, gid).
  Add(busi.Busi+"/TransIn", &TransReq{Amount: 30})
err := msg.DoAndSubmitDB(busi.Busi+"/QueryPrepared", db, func(tx *sql.Tx) error {
    return AdjustBalance(tx, busi.TransOutUID, -req.Amount)
})
app.GET(BusiAPI+"/QueryPrepared", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
    return MustBarrierFromGin(c).QueryPrepared(db)
}))

您的每个来源选项都有其缺点:

  1. 用户不能立即看到它刚刚创建的数据库更改。
  2. Debezium 将捕获数据库的日志,这可能比您想要的事件大得多。 Debezium 的部署和维护也不是一件容易的事。
  3. built-in auto-retry 功能”并不便宜,它可能需要很多代码或维护工作。