Kafka 在 MSSQL 中连接多对多表
Kafka connect many to many tables in MSSQL
我目前正在研究 Kafka Connect 以将我们的一些数据库流式传输到数据湖。为了测试 Kafka Connect,我设置了一个数据库,其中包含我们的一个项目数据库。到目前为止一切顺利。
下一步我使用以下属性模式配置 Kafka Connect:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "updated_at,created_at",
"incrementing.column.name": "id",
"dialect.name": "SqlServerDatabaseDialect",
"validate.non.null": "false",
"tasks.max": "1",
"mode": "timestamp+incrementing",
"topic.prefix": "mssql-jdbc-",
"poll.interval.ms": "10000",
}
虽然这适用于我获得 ID 和 created_at / updated_at 字段的大多数 table,但它不适用于我的 tables 在这里我用一个 table 和一个复合键解决了我的多对多关系。请注意,我将通用 JDBC 配置与 Microsoft 的 JDBC 驱动程序一起使用。
有没有办法为这些特殊情况配置 Kafka Connect?
您可能需要创建多个连接器,而不是一个连接器来拉动所有 table。如果您想使用不同的方法或不同的 ID/timestamp 列来获取数据,就会出现这种情况。
正如@cricket_007 所说,您可以使用 query
选项来拉回查询的结果——它可以是 SELECT
表示您的多 table 连接。即使从单个 table 对象中提取数据,JDBC 连接器本身也只是从给定的 table 发出 SELECT *
,并使用 WHERE
谓词来限制根据递增 ID/timestamp 选择的行。
另一种方法是使用基于日志的更改数据捕获 (CDC),并将所有更改直接从数据库流式传输到 Kafka。
无论您使用 JDBC 还是基于日志的 CDC,您都可以使用流处理来解析 Kafka 本身的连接。这方面的一个例子是 Kafka Streams 或 KSQL。我已经写了关于后者的 lot here。
您可能还会发现 this article 有用,详细描述了您将数据库与 Kafka 集成的选项。
免责声明:我在开源 KSQL 项目背后的公司 Confluent 工作。
我目前正在研究 Kafka Connect 以将我们的一些数据库流式传输到数据湖。为了测试 Kafka Connect,我设置了一个数据库,其中包含我们的一个项目数据库。到目前为止一切顺利。
下一步我使用以下属性模式配置 Kafka Connect:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "updated_at,created_at",
"incrementing.column.name": "id",
"dialect.name": "SqlServerDatabaseDialect",
"validate.non.null": "false",
"tasks.max": "1",
"mode": "timestamp+incrementing",
"topic.prefix": "mssql-jdbc-",
"poll.interval.ms": "10000",
}
虽然这适用于我获得 ID 和 created_at / updated_at 字段的大多数 table,但它不适用于我的 tables 在这里我用一个 table 和一个复合键解决了我的多对多关系。请注意,我将通用 JDBC 配置与 Microsoft 的 JDBC 驱动程序一起使用。
有没有办法为这些特殊情况配置 Kafka Connect?
您可能需要创建多个连接器,而不是一个连接器来拉动所有 table。如果您想使用不同的方法或不同的 ID/timestamp 列来获取数据,就会出现这种情况。
正如@cricket_007 所说,您可以使用 query
选项来拉回查询的结果——它可以是 SELECT
表示您的多 table 连接。即使从单个 table 对象中提取数据,JDBC 连接器本身也只是从给定的 table 发出 SELECT *
,并使用 WHERE
谓词来限制根据递增 ID/timestamp 选择的行。
另一种方法是使用基于日志的更改数据捕获 (CDC),并将所有更改直接从数据库流式传输到 Kafka。
无论您使用 JDBC 还是基于日志的 CDC,您都可以使用流处理来解析 Kafka 本身的连接。这方面的一个例子是 Kafka Streams 或 KSQL。我已经写了关于后者的 lot here。
您可能还会发现 this article 有用,详细描述了您将数据库与 Kafka 集成的选项。
免责声明:我在开源 KSQL 项目背后的公司 Confluent 工作。