JDBC 合流连接器模式
JDBC confluent connector mode
我在 JDBC kafka 源连接器中使用自定义查询 任何人都可以告诉我在 JDBC kafka 源连接器中使用自定义查询时的模式是什么,如果我正在使用批量mode 然后它将重新插入kafka主题中的所有数据。
注意:-我的 table 中没有任何主键或时间戳列。
您可以使用 incrementing or timestamp
incrementing
- use a strictly incrementing column on each table to
detect only new rows. Note that this will not detect modifications or
deletions of existing rows.
timestamp
- use a timestamp (or
timestamp-like) column to detect new and modified rows. This assumes
the column is updated with each write, and that values are
monotonically incrementing, but not necessarily unique.
timestamp+incrementing
- use two columns, a timestamp column that
detects new and modified rows and a strictly incrementing column which
provides a globally unique ID for updates so each row can be assigned
a unique stream offset.
timestamp
示例:
name=mysql-source-test
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass
table.whitelist=users,products
mode=timestamp
timestamp.column.name=last_modified
topic.prefix=mysql-test-
incrementing
示例:
name=mysql-source-test
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass
table.whitelist=users,products
mode=incrementing
incrementing.column.name=id
topic.prefix=mysql-test-
timestamp+incrementing
示例:
name=mysql-source-test
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass
table.whitelist=users,products
mode=timestamp+incrementing
incrementing.column.name=id
timestamp.column.name=last_modified
topic.prefix=mysql-test-
如果您没有时间戳或递增 ID 列,则无法进行 query-based CDC,您只能进行批量加载。
您的替代方法是将 log-based CDC 与 Debezium 等工具结合使用。
本次演讲详细介绍了每个选项和可用工具:http://rmoff.dev/ksny19-no-more-silos
我在 JDBC kafka 源连接器中使用自定义查询 任何人都可以告诉我在 JDBC kafka 源连接器中使用自定义查询时的模式是什么,如果我正在使用批量mode 然后它将重新插入kafka主题中的所有数据。 注意:-我的 table 中没有任何主键或时间戳列。
您可以使用 incrementing or timestamp
incrementing
- use a strictly incrementing column on each table to detect only new rows. Note that this will not detect modifications or deletions of existing rows.
timestamp
- use a timestamp (or timestamp-like) column to detect new and modified rows. This assumes the column is updated with each write, and that values are monotonically incrementing, but not necessarily unique.
timestamp+incrementing
- use two columns, a timestamp column that detects new and modified rows and a strictly incrementing column which provides a globally unique ID for updates so each row can be assigned a unique stream offset.
timestamp
示例:
name=mysql-source-test
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass
table.whitelist=users,products
mode=timestamp
timestamp.column.name=last_modified
topic.prefix=mysql-test-
incrementing
示例:
name=mysql-source-test
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass
table.whitelist=users,products
mode=incrementing
incrementing.column.name=id
topic.prefix=mysql-test-
timestamp+incrementing
示例:
name=mysql-source-test
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass
table.whitelist=users,products
mode=timestamp+incrementing
incrementing.column.name=id
timestamp.column.name=last_modified
topic.prefix=mysql-test-
如果您没有时间戳或递增 ID 列,则无法进行 query-based CDC,您只能进行批量加载。
您的替代方法是将 log-based CDC 与 Debezium 等工具结合使用。
本次演讲详细介绍了每个选项和可用工具:http://rmoff.dev/ksny19-no-more-silos