我想使用 kafka-connect 将数据从多个主题加载到多个表

I want to load data to multiple tables from multiple topics using kafka-connect

我想使用 kafka connect

将多个 tables 数据从 oracle 加载到 postgres

这是我使用的源配置:-

curl -i -X PUT http://localhost:8083/connectors/src_1/config \
 -H "Content-Type: application/json" \
 -d '{
 "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
 "connection.url":"jdbc:oracle:thin:@//localhost:1521/db",
 "connection.user":"<user>",
 "connection.password":"<psswd>",
 "mode":"bulk",
 "table.whitelist":"EMP,CUS",
 "poll.interval.ms":7200000,
 "quote.sql.identifiers":"never"
 }'

通过上述配置,我可以创建两个名为“EMP”和“CUS”的主题。现在,我想将这些数据从主题加载到已经在 postgres 中创建的相应 tables。

这是我的接收器配置:-

curl -X PUT http://localhost:8083/connectors/tgt_1/config \
-H "Content-Type: application/json" \
-d '{
 "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
 "connection.url":"jdbc:postgresql://localhost:5432/postgres",
 "connection.user":"<user>",
 "connection.password":"<passwd>",
 "tasks.max" : "1",
 "topics":"CUS,EMP",
 "insert.mode":"insert",
 "quote.sql.identifiers":"never"
}'

使用上述配置时,我遇到了一个名为不匹配 table 的 错误,或者自动创建被禁用。 但是当我尝试创建单独的接收器连接器并设置属性 table.name.format 与 table 名称小写它工作正常。

所以,我尝试在 table.whitelist 中提供一个 table name in smaller case,这样主题将以较小的 case 创建,并且可以与 tablename 匹配在目标。但是,源连接器既没有抛出错误也没有 运行 任务。

任何人都可以为我的问题提供 suitable 解决方案。 谢谢!

我猜你的 PostgreSQL 接收器 table 名称是小写的 (cus, emp) 并且它们不等于主题名称 (CUS, EMP). Kafka Connect JDBC Connector uses getTables() 方法检查 table 是否存在,其中 tableNamePattern 参数区分大小写(根据文档:must match the table name as it is stored in the database ).

您可以使用 ChangeTopicCase transformation from Kafka Connect Common Transformations。另一种选择是将接收器 tables 重命名为大写。