Kafka 在 MSSQL 中连接多对多表

Kafka connect many to many tables in MSSQL

我目前正在研究 Kafka Connect 以将我们的一些数据库流式传输到数据湖。为了测试 Kafka Connect,我设置了一个数据库,其中包含我们的一个项目数据库。到目前为止一切顺利。

下一步我使用以下属性模式配置 Kafka Connect:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "timestamp.column.name": "updated_at,created_at",
  "incrementing.column.name": "id",
  "dialect.name": "SqlServerDatabaseDialect",
  "validate.non.null": "false",
  "tasks.max": "1",
  "mode": "timestamp+incrementing",
  "topic.prefix": "mssql-jdbc-",
  "poll.interval.ms": "10000",
}

虽然这适用于我获得 ID 和 created_at / updated_at 字段的大多数 table,但它不适用于我的 tables 在这里我用一个 table 和一个复合键解决了我的多对多关系。请注意,我将通用 JDBC 配置与 Microsoft 的 JDBC 驱动程序一起使用。

有没有办法为这些特殊情况配置 Kafka Connect?

您可能需要创建多个连接器,而不是一个连接器来拉动所有 table。如果您想使用不同的方法或不同的 ID/timestamp 列来获取数据,就会出现这种情况。 正如@cricket_007 所说,您可以使用 query 选项来拉回查询的结果——它可以是 SELECT 表示您的多 table 连接。即使从单个 table 对象中提取数据,JDBC 连接器本身也只是从给定的 table 发出 SELECT *,并使用 WHERE 谓词来限制根据递增 ID/timestamp 选择的行。

另一种方法是使用基于日志的更改数据捕获 (CDC),并将所有更改直接从数据库流式传输到 Kafka。

无论您使用 JDBC 还是基于日志的 CDC,您都可以使用流处理来解析 Kafka 本身的连接。这方面的一个例子是 Kafka Streams 或 KSQL。我已经写了关于后者的 lot here

您可能还会发现 this article 有用,详细描述了您将数据库与 Kafka 集成的选项。

免责声明:我在开源 KSQL 项目背后的公司 Confluent 工作。