使用 Kafka 连接时获取 JSON 中的编码值:JdbcSourceConnector
Getting encoded Values in JSON while using Kafka connect :JdbcSourceConnector
我在独立配置中使用 Kafka Connect 从 Oracle 读取数据。
在阅读某个主题的消息时,我收到的结果是 JSON。我不需要将我的值编码到 JSON 对象中。我该如何解决这个问题?
connect-standalone.properties
:
bootstrap.servers=********:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
jdbc-kafka-connect.properties
:
name=jdbctransconnector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:oracle:thin:@**.**.*.***:1521/****?verifyServerCertificate=false&useSSL=true&requireSSL=true
connection.user=*******
connection.password=*******
query=select * from(select cast(tab1.PARAM_INSTANCE_ID as NUMERIC(11,0)) AS PARAM_INSTANCE_ID,tab1.entity_id,tab2.param_name,tab1.value from table1 tab1,table2 tab2 where tab1.param_spec_id=tab2.param_basic_spec_id)
tasks.max= 1
auto.create= true
auto.evolve= true
mode=incrementing
incrementing.column.name=PARAM_INSTANCE_ID
topic.prefix=EGPV
来自 Kafka 消费者的示例消息:
{
"PARAM_INSTANCE_ID":"B6frdA==",
"ENTITY_ID":"AXpUSLCacxAucbLv12PipNeV7ag2XAFudUuzxrP1xDvtlnZOuxf8KxSAAAAAAAAAAAAAAAAAAAAA",
"PARAM_NAME":"Action",
"VALUE":null
}
param_instance_id
和 entity_id
是源数据库中的数字列。如何在 Kafka 中获取相同的格式?
param_instance_id and entity_id are number columns in source database
未设置默认值 numeric.mapping
属性,因此数字采用 base64 编码。
根据文档中的说明进行调整,但您可以从 best_fit
开始
https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html
我在独立配置中使用 Kafka Connect 从 Oracle 读取数据。
在阅读某个主题的消息时,我收到的结果是 JSON。我不需要将我的值编码到 JSON 对象中。我该如何解决这个问题?
connect-standalone.properties
:
bootstrap.servers=********:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
jdbc-kafka-connect.properties
:
name=jdbctransconnector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:oracle:thin:@**.**.*.***:1521/****?verifyServerCertificate=false&useSSL=true&requireSSL=true
connection.user=*******
connection.password=*******
query=select * from(select cast(tab1.PARAM_INSTANCE_ID as NUMERIC(11,0)) AS PARAM_INSTANCE_ID,tab1.entity_id,tab2.param_name,tab1.value from table1 tab1,table2 tab2 where tab1.param_spec_id=tab2.param_basic_spec_id)
tasks.max= 1
auto.create= true
auto.evolve= true
mode=incrementing
incrementing.column.name=PARAM_INSTANCE_ID
topic.prefix=EGPV
来自 Kafka 消费者的示例消息:
{
"PARAM_INSTANCE_ID":"B6frdA==",
"ENTITY_ID":"AXpUSLCacxAucbLv12PipNeV7ag2XAFudUuzxrP1xDvtlnZOuxf8KxSAAAAAAAAAAAAAAAAAAAAA",
"PARAM_NAME":"Action",
"VALUE":null
}
param_instance_id
和 entity_id
是源数据库中的数字列。如何在 Kafka 中获取相同的格式?
param_instance_id and entity_id are number columns in source database
未设置默认值 numeric.mapping
属性,因此数字采用 base64 编码。
根据文档中的说明进行调整,但您可以从 best_fit
https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html