为什么无法使用 Kafka Connect 中的 JDBC Sink Connector 删除记录
Why record cannot get deleted using JDBC Sink Connector in Kafka Connect
我的接收器属性:
{
"name": "jdbc-oracle",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "orders",
"connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
"connection.user": "ersin",
"connection.password": "ersin!",
"auto.create": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "id",
"insert.mode": "upsert",
"plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
"name": "jdbc-oracle"
},
"tasks": [
{
"connector": "jdbc-oracle",
"task": 0
}
],
"type": "sink"
}
我的 connect-avro-distributed.properties :
bootstrap.servers=10.0.0.0:9092
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
我这样发送数据:
./bin/kafka-avro-console-producer \
--broker-list 10.0.0.0:9092 --topic orders \
--property parse.key="true" \
--property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
--property key.separator="$" \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":["null","int"],"default": null},{"name":"product","type": ["null","string"],"default": null}, {"name":"quantity", "type": ["null","int"],"default": null}, {"name":"price","type": ["null","int"],"default": null}]}' \
--property schema.registry.url=http://10.0.0.0:8081
我可以像这样在oracle上插入或更新数据
{"id":3}${"id": {"int":2}, "product": {"string":"Yağız Gülbahar"}, "quantity": {"int":1071}, "price": {"int":1453}}
但是当我把它放在 oracle 上删除记录时它不能删除数据只是更新列为 null
{"id":2}${"id": null, "product": null , "quantity": null , "price": null }
我该如何解决这个问题?
提前致谢
实际上你需要制作一个墓碑记录。在 Kafka 删除中使用 JDBC Sink Connector 的工作原理如下:
The connector can delete rows in a database table when it consumes a
tombstone record, which is a Kafka record that has a non-null key and
a null value. This behavior is disabled by default, meaning that any
tombstone records will result in a failure of the connector, making it
easy to upgrade the JDBC connector and keep prior behavior.
Deletes can be enabled with delete.enabled=true
, but only when the
pk.mode
is set to record_key
. This is because deleting a row from the
table requires the primary key be used as criteria.
Enabling delete mode does not affect the insert.mode
.
另请注意,此类记录只会在 delete.retention.ms
毫秒后删除,目前默认为 24 小时。
因此,请尝试在您的属性中减少此配置,看看它是否有效。为此,您需要 运行 以下命令:
./bin/kafka-topics.sh \
--alter \
--zookeeper localhost:2181 \
--topic orders \
--config retention.ms=100
现在配置完成后,您需要做的就是生成一条具有非空键和空值的消息。请注意,由于用户输入被解析为 UTF-8,因此不能使用 Kafka 控制台消费者来生成空记录。
因此,
{"id":2}${"id": null, "product": null , "quantity": null , "price": null }
不是真正的逻辑删除消息。
但是您可以使用 kafkacat
,但它仅适用于 JSON 条消息:
# Produce a tombstone (a "delete" for compacted topics) for key
# "abc" by providing an empty message value which -Z interpretes as NULL:
echo "abc:" | kafkacat -b mybroker -t mytopic -Z -K:
但在您的情况下,这不起作用,因为您需要发送 Avro 消息。因此,我建议用您喜欢的语言编写一个非常简单的 Avro Producer,这样您就可以实际发送墓碑消息了。
我的接收器属性:
{
"name": "jdbc-oracle",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "orders",
"connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
"connection.user": "ersin",
"connection.password": "ersin!",
"auto.create": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "id",
"insert.mode": "upsert",
"plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
"name": "jdbc-oracle"
},
"tasks": [
{
"connector": "jdbc-oracle",
"task": 0
}
],
"type": "sink"
}
我的 connect-avro-distributed.properties :
bootstrap.servers=10.0.0.0:9092
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
我这样发送数据:
./bin/kafka-avro-console-producer \
--broker-list 10.0.0.0:9092 --topic orders \
--property parse.key="true" \
--property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
--property key.separator="$" \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":["null","int"],"default": null},{"name":"product","type": ["null","string"],"default": null}, {"name":"quantity", "type": ["null","int"],"default": null}, {"name":"price","type": ["null","int"],"default": null}]}' \
--property schema.registry.url=http://10.0.0.0:8081
我可以像这样在oracle上插入或更新数据
{"id":3}${"id": {"int":2}, "product": {"string":"Yağız Gülbahar"}, "quantity": {"int":1071}, "price": {"int":1453}}
但是当我把它放在 oracle 上删除记录时它不能删除数据只是更新列为 null
{"id":2}${"id": null, "product": null , "quantity": null , "price": null }
我该如何解决这个问题?
提前致谢
实际上你需要制作一个墓碑记录。在 Kafka 删除中使用 JDBC Sink Connector 的工作原理如下:
The connector can delete rows in a database table when it consumes a tombstone record, which is a Kafka record that has a non-null key and a null value. This behavior is disabled by default, meaning that any tombstone records will result in a failure of the connector, making it easy to upgrade the JDBC connector and keep prior behavior.
Deletes can be enabled with
delete.enabled=true
, but only when thepk.mode
is set torecord_key
. This is because deleting a row from the table requires the primary key be used as criteria.Enabling delete mode does not affect the
insert.mode
.
另请注意,此类记录只会在 delete.retention.ms
毫秒后删除,目前默认为 24 小时。
因此,请尝试在您的属性中减少此配置,看看它是否有效。为此,您需要 运行 以下命令:
./bin/kafka-topics.sh \
--alter \
--zookeeper localhost:2181 \
--topic orders \
--config retention.ms=100
现在配置完成后,您需要做的就是生成一条具有非空键和空值的消息。请注意,由于用户输入被解析为 UTF-8,因此不能使用 Kafka 控制台消费者来生成空记录。 因此,
{"id":2}${"id": null, "product": null , "quantity": null , "price": null }
不是真正的逻辑删除消息。
但是您可以使用 kafkacat
,但它仅适用于 JSON 条消息:
# Produce a tombstone (a "delete" for compacted topics) for key
# "abc" by providing an empty message value which -Z interpretes as NULL:
echo "abc:" | kafkacat -b mybroker -t mytopic -Z -K:
但在您的情况下,这不起作用,因为您需要发送 Avro 消息。因此,我建议用您喜欢的语言编写一个非常简单的 Avro Producer,这样您就可以实际发送墓碑消息了。