Kafka-confluent:如何在 JDBC 接收器连接器中使用 pk.mode=record_key 进行更新插入和删除模式?
Kafka-confluent: How to use pk.mode=record_key for upsert and delete mode in JDBC sink connector?
在 Kafka confluent 中,我们如何在 MySQL table 中使用 pk.mode=record_key
作为复合键的同时使用 upsert 使用源作为 CSV 文件?使用 pk.mode=record_values
时,upsert 模式正在运行。是否需要进行任何其他配置?
如果我尝试使用 pk.mode=record_key
,我会收到此错误。错误 - 由以下原因引起:org.apache.kafka.connect.errors.ConnectException
:只需要定义一个 PK 列,因为记录的键模式是原始类型。
下面是我的 JDBC 接收器连接器配置:
{
"name": "<name>",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "<topic name>",
"connection.url": "<url>",
"connection.user": "<user name>",
"connection.password": "*******",
"insert.mode": "upsert",
"batch.size": "50000",
"table.name.format": "<table name>",
"pk.mode": "record_key",
"pk.fields": "field1,field2",
"auto.create": "true",
"auto.evolve": "true",
"max.retries": "10",
"retry.backoff.ms": "3000",
"mode": "bulk",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081"
}
}
您需要使用 pk.mode
个 record.value
。
这意味着从消息的 值 中获取字段并将它们用作目标 table 中的主键并用于 UPSERT
目的。
如果你设置 record.key
它将尝试从 Kafka 消息 key 中获取关键字段。除非您确实在消息密钥中获得了值,否则这不是您要使用的设置。
这些可能对您有进一步的帮助:
- Kafka Connect JDBC Sink deep-dive: Working with Primary Keys
- https://rmoff.dev/kafka-jdbc-video
- https://rmoff.dev/ksqldb-jdbc-sink-video
在 Kafka confluent 中,我们如何在 MySQL table 中使用 pk.mode=record_key
作为复合键的同时使用 upsert 使用源作为 CSV 文件?使用 pk.mode=record_values
时,upsert 模式正在运行。是否需要进行任何其他配置?
如果我尝试使用 pk.mode=record_key
,我会收到此错误。错误 - 由以下原因引起:org.apache.kafka.connect.errors.ConnectException
:只需要定义一个 PK 列,因为记录的键模式是原始类型。
下面是我的 JDBC 接收器连接器配置:
{
"name": "<name>",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "<topic name>",
"connection.url": "<url>",
"connection.user": "<user name>",
"connection.password": "*******",
"insert.mode": "upsert",
"batch.size": "50000",
"table.name.format": "<table name>",
"pk.mode": "record_key",
"pk.fields": "field1,field2",
"auto.create": "true",
"auto.evolve": "true",
"max.retries": "10",
"retry.backoff.ms": "3000",
"mode": "bulk",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081"
}
}
您需要使用 pk.mode
个 record.value
。
这意味着从消息的 值 中获取字段并将它们用作目标 table 中的主键并用于 UPSERT
目的。
如果你设置 record.key
它将尝试从 Kafka 消息 key 中获取关键字段。除非您确实在消息密钥中获得了值,否则这不是您要使用的设置。
这些可能对您有进一步的帮助:
- Kafka Connect JDBC Sink deep-dive: Working with Primary Keys
- https://rmoff.dev/kafka-jdbc-video
- https://rmoff.dev/ksqldb-jdbc-sink-video