如何使用 debezium 更改数据捕获并使用 jdbc 接收器在 kafka connect 中捕获 mysql 中的数据?

How to capture data in mysql with debezium change data capture and consume with jdbc sink in kafka connect?

我在使用 debezium 更改数据捕获 mysql 中捕获数据并将其使用到另一个 mysql 使用 kafka connect jdbc 接收器时遇到问题。

因为 debezium 生成到 kafka 主题的模式和有效负载与 kafka connect jdbc sink 期望的模式不兼容。

当 jdbc 接收器想要使用数据并在另一个 mysql 中创建记录时出现异常。

我该如何解决这个问题?

Debezium is indeed different than that expected by the JDBC sink. The JDBC sink expects each field in the message to correspond to a field in the row, and therefore the message corresponds to the "after" state of the row. OTOH, the Debezium MySQL connector 生成的消息结构执行更改数据捕获,这意味着它不仅仅包含行的最新状态。具体来说,连接器输出的消息具有包含行的主键或唯一键列的键,以及包含信封结构的消息值:

  • 操作,例如是插入、更新还是删除
  • 行的状态发生更改之前(插入时为空)
  • 行的状态发生更改后(删除时为空)
  • 特定于源的信息,包括服务器元数据、事务 ID、数据库和 table 名称、事件发生时的服务器时间戳以及有关事件发现位置的详细信息等。
  • 连接器生成事件的时间戳

解决这个差异的最简单方法是使用 Kafka 0.10.2.x(目前最新版本是 0.10.2.1)和 Kafka Connect 的新 Single Message Transforms (SMTs). Each Kafka Connect connector can be configured with chains of zero or more SMTs that can transform the output of source connectors before the messages are written to Kafka, or transform the messages read from Kafka before they are passed as input to sink connectors. SMTs are intentionally very simple, deal with a single message, and definitely should not access external resources or maintain any state, and therefore not a replacement for Kafka Streams 或其他流处理系统更强大,可以加入多个输入流,可以执行非常复杂的操作并跨多个消息维护状态。

如果您使用 Kafka Streams 进行任何类型的处理,那么您应该考虑在 Kafka Streams 应用程序中操作消息结构。如果没有,那么 SMT 是解决您问题的好方法。实际上,有两种方法可以使用 SMT 来调整消息结构。

第一个选项是使用带有 Debezium 连接器的 SMT 来 extract/retain 行的 "after" 状态,并在将其写入 Kafka 之前丢弃所有其他信息。当然,您会在 Kafka 主题中存储更少的信息,并丢弃一些将来可能有价值的 CDC 信息。

第二个也是 IMO 首选的选项是保持源连接器不变并将所有 CDC 消息保留在 Kafka 主题中,但是然后使用带有接收器连接器的 SMT extract/retain "after" 行的状态,并在将消息传递到 JDBC 接收器连接器之前丢弃所有其他信息。您可以使用 Kafka Connect 中包含的现有 SMT 之一,但您可以考虑编写自己的 SMT 来完全按照您的意愿执行操作。