使用 kafka-connect 从多个主题更新到多个表

Upserting into multiple tables from multiples topics using kafka-connect

我正在尝试使用 JDBC 接收器连接器读取 2 个 kafka 主题,并将其插入到我手动创建的 2 个 Oracle table 中。每个 table 都有 1 个主键我想在 upsert 模式下使用它。如果我仅将 pk.fields 中的 1 个主题和 1 个字段用于连接器,则连接器工作正常,但如果我在 pk.fields 中输入多个列,每个 table 中的一个,它无法识别架构。我有没有遗漏任何东西,请提出建议。

name=oracle_sink_prod
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=KAFKA1011,JAFKA1011
connection.url=URL
connection.user=UID
connection.password=PASSWD
auto.create=false
table.name.format=KAFKA1011,JAFKA1011
pk.mode=record_value
pk.fields= ID,COMPANY 
auto.evolve=true
insert.mode=upsert

//ID is pk of kafka1011 table and COMPANY is of other

如果PK不同,就创建两个不同的sink connector。他们可以 运行 在同一个 Kafka Connect worker 上。

您还可以选择使用 Kafka 消息本身的密钥。有关详细信息,请参阅 doc。这是更具可扩展性的选项,然后您只需要确保您的消息被正确键入,以便它向下流到 JDBC 接收器。