Kafka jdbc connect sink:是否可以对值和键中的字段使用pk.fields?
Kafka jdbc connect sink: Is it possible to use pk.fields for fields in value and key?
我遇到的问题是,当jdbc sink 连接器使用kafka 消息时,写入数据库时的关键变量为空。
但是,当我直接通过 kafka-avro-consumer 消费时 - 我可以看到键和值变量及其值,因为我使用了这个配置:--属性 print.key=true .
问:是否有办法确保 jdbc 连接器正在处理消息键变量值?
控制台 kafka-avro 配置
/opt/confluent-5.4.1/bin/kafka-avro-console-consumer \
--bootstrap-server "localhost:9092" \
--topic equipmentidentifier.persist \
--property parse.key=true \
--property key.separator=~ \
--property print.key=true \
--property schema.registry.url="http://localhost:8081" \
--property key.schema=[$KEY_SCHEMA] \
--property value.schema=[$IDENTIFIER_SCHEMA,$VALUE_SCHEMA]
错误:
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "assignment_table" ("created_date","custome
r","id_type","id_value") VALUES('1970-01-01 03:25:44.567+00'::timestamp,123,'BILL_OF_LADING','BOL-123') was aborted: ERROR: null value in column "equipment_ide
ntifier_type" violates not-null constraint
Detail: Failing row contains (null, null, null, null, 1970-01-01 03:25:44.567, 123, id, 56). Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: null value in column "equipment_identifier_type" violates not-null constraint
接收器配置:
task.max=1
topic=assignment
connect.class=io.confluet.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://localhost:5432/db
connection.user=test
connection.password=test
table.name.format=assignment_table
auto.create=false
insert.mode=insert
pk.fields=customer,equip_Type,equip_Value,id_Type,id_Value,cpId
transforms=flatten
transforms.flattenKey.type=org.apache.kafka.connect.transforms.Flatten$Key
transforms.flattenKey.delimiter=_
transforms.flattenKey.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flattenKey.delimiter=_
卡夫卡密钥:
{
"assignmentKey": {
"cpId": {
"long": 1001
},
"equip": {
"Identifier": {
"type": "eq",
"value": "eq_45"
}
},
"vendorId": {
"string": "vendor"
}
}
}
卡夫卡值:
{
"assigmentValue": {
"id": {
"Identifier": {
"type": "id",
"value": "56"
}
},
"timestamp": {
"long": 1234456756
},
"customer": {
"long": 123
}
}
}
您需要告诉连接器使用键中的字段,因为默认情况下它不会。
pk.mode=record_key
但是您需要使用来自键或值的字段,而不是两者都与您当前的配置相同:
pk.fields=customer,equip_Type,equip_Value,id_Type,id_Value,cpId
如果您设置 pk.mode=record_key
,则 pk.fields
将引用消息密钥中的字段。
我遇到的问题是,当jdbc sink 连接器使用kafka 消息时,写入数据库时的关键变量为空。
但是,当我直接通过 kafka-avro-consumer 消费时 - 我可以看到键和值变量及其值,因为我使用了这个配置:--属性 print.key=true .
问:是否有办法确保 jdbc 连接器正在处理消息键变量值?
控制台 kafka-avro 配置
/opt/confluent-5.4.1/bin/kafka-avro-console-consumer \
--bootstrap-server "localhost:9092" \
--topic equipmentidentifier.persist \
--property parse.key=true \
--property key.separator=~ \
--property print.key=true \
--property schema.registry.url="http://localhost:8081" \
--property key.schema=[$KEY_SCHEMA] \
--property value.schema=[$IDENTIFIER_SCHEMA,$VALUE_SCHEMA]
错误:
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "assignment_table" ("created_date","custome
r","id_type","id_value") VALUES('1970-01-01 03:25:44.567+00'::timestamp,123,'BILL_OF_LADING','BOL-123') was aborted: ERROR: null value in column "equipment_ide
ntifier_type" violates not-null constraint
Detail: Failing row contains (null, null, null, null, 1970-01-01 03:25:44.567, 123, id, 56). Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: null value in column "equipment_identifier_type" violates not-null constraint
接收器配置:
task.max=1
topic=assignment
connect.class=io.confluet.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://localhost:5432/db
connection.user=test
connection.password=test
table.name.format=assignment_table
auto.create=false
insert.mode=insert
pk.fields=customer,equip_Type,equip_Value,id_Type,id_Value,cpId
transforms=flatten
transforms.flattenKey.type=org.apache.kafka.connect.transforms.Flatten$Key
transforms.flattenKey.delimiter=_
transforms.flattenKey.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flattenKey.delimiter=_
卡夫卡密钥:
{
"assignmentKey": {
"cpId": {
"long": 1001
},
"equip": {
"Identifier": {
"type": "eq",
"value": "eq_45"
}
},
"vendorId": {
"string": "vendor"
}
}
}
卡夫卡值:
{
"assigmentValue": {
"id": {
"Identifier": {
"type": "id",
"value": "56"
}
},
"timestamp": {
"long": 1234456756
},
"customer": {
"long": 123
}
}
}
您需要告诉连接器使用键中的字段,因为默认情况下它不会。
pk.mode=record_key
但是您需要使用来自键或值的字段,而不是两者都与您当前的配置相同:
pk.fields=customer,equip_Type,equip_Value,id_Type,id_Value,cpId
如果您设置 pk.mode=record_key
,则 pk.fields
将引用消息密钥中的字段。