Kafka JDBC 连接器加载所有数据,然后增量
Kafka JDBC connector load all data, then incremental
我想弄清楚如何最初从查询中获取所有数据,然后仅使用 kafka 连接器进行增量更改。这样做的原因是我想将所有数据加载到弹性搜索中,然后让 es 与我的 kafka 流保持同步。
目前,我首先使用模式 = bulk 的连接器,然后将其更改为时间戳。这很好用。
但是,如果我们想将所有数据重新加载到 Streams 和 ES,这意味着我们必须编写一些脚本以某种方式清理或删除 kafka 流和 es 索引数据,修改连接 ini 以将模式设置为批量,重新启动一切,给它时间加载所有数据,然后再次将脚本修改为时间戳模式,然后再次重新启动一切(需要这样一个脚本的原因是偶尔,批量更新碰巧通过我们做的 etl 过程来纠正历史数据还没有控制权,而且这个过程不更新时间戳)
有没有人做过类似的事情并找到了更优雅的解决方案?
how to fetch all data from a query initially, then incrementally only changes using kafka connector.
也许这对你有帮助。例如,我有一个 table:
╔════╦═════════════╦═══════════╗
║ Id ║ Name ║ Surname ║
╠════╬═════════════╬═══════════╣
║ 1 ║ Martin ║ Scorsese ║
║ 2 ║ Steven ║ Spielberg ║
║ 3 ║ Christopher ║ Nolan ║
╚════╩═════════════╩═══════════╝
在这种情况下,我将创建一个视图:
CREATE OR REPLACE VIEW EDGE_DIRECTORS AS
SELECT 0 AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID =< 2
UNION ALL
SELECT ID AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID > 2;
在 kafka jdbc 连接器的属性文件中,您可以使用:
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
mode=incrementing
incrementing.column.name=EXID
topic.prefix=
tasks.max=1
name=gv-jdbc-source-connector
connection.url=
table.types=VIEW
table.whitelist=EDGE_DIRECTORS
因此 kafka jdbc 连接器将采取以下步骤:
- 首先EXID = 0的所有数据;
- 它将在connector.offsets文件中存储偏移值= 0;
- 新行将插入 DIRECTORS table。
- Kafka JDBC 连接器将
执行:
Select EXID, ID, NAME, SURNAME FROM EDGE_DIRECTORS
并将
请注意 EXID 已递增。
- 数据将在 Kafka Streams 中更新。
好久不见再回到这里。这种方法能够解决这个问题,永远不必使用批量模式
- 停止连接器
- 擦除每个连接器 jvm 的偏移文件
- (可选)如果你想做一个完整的擦除和加载,你可能还想删除你的主题使用 kafka/connect utils/rest api (不要忘记状态话题)
- 重启连接。
我想弄清楚如何最初从查询中获取所有数据,然后仅使用 kafka 连接器进行增量更改。这样做的原因是我想将所有数据加载到弹性搜索中,然后让 es 与我的 kafka 流保持同步。 目前,我首先使用模式 = bulk 的连接器,然后将其更改为时间戳。这很好用。
但是,如果我们想将所有数据重新加载到 Streams 和 ES,这意味着我们必须编写一些脚本以某种方式清理或删除 kafka 流和 es 索引数据,修改连接 ini 以将模式设置为批量,重新启动一切,给它时间加载所有数据,然后再次将脚本修改为时间戳模式,然后再次重新启动一切(需要这样一个脚本的原因是偶尔,批量更新碰巧通过我们做的 etl 过程来纠正历史数据还没有控制权,而且这个过程不更新时间戳)
有没有人做过类似的事情并找到了更优雅的解决方案?
how to fetch all data from a query initially, then incrementally only changes using kafka connector.
也许这对你有帮助。例如,我有一个 table:
╔════╦═════════════╦═══════════╗
║ Id ║ Name ║ Surname ║
╠════╬═════════════╬═══════════╣
║ 1 ║ Martin ║ Scorsese ║
║ 2 ║ Steven ║ Spielberg ║
║ 3 ║ Christopher ║ Nolan ║
╚════╩═════════════╩═══════════╝
在这种情况下,我将创建一个视图:
CREATE OR REPLACE VIEW EDGE_DIRECTORS AS
SELECT 0 AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID =< 2
UNION ALL
SELECT ID AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID > 2;
在 kafka jdbc 连接器的属性文件中,您可以使用:
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
mode=incrementing
incrementing.column.name=EXID
topic.prefix=
tasks.max=1
name=gv-jdbc-source-connector
connection.url=
table.types=VIEW
table.whitelist=EDGE_DIRECTORS
因此 kafka jdbc 连接器将采取以下步骤:
- 首先EXID = 0的所有数据;
- 它将在connector.offsets文件中存储偏移值= 0;
- 新行将插入 DIRECTORS table。
- Kafka JDBC 连接器将
执行:
Select EXID, ID, NAME, SURNAME FROM EDGE_DIRECTORS
并将 请注意 EXID 已递增。 - 数据将在 Kafka Streams 中更新。
好久不见再回到这里。这种方法能够解决这个问题,永远不必使用批量模式
- 停止连接器
- 擦除每个连接器 jvm 的偏移文件
- (可选)如果你想做一个完整的擦除和加载,你可能还想删除你的主题使用 kafka/connect utils/rest api (不要忘记状态话题)
- 重启连接。