为什么从 Kafka JDBC Source Connector 生成的密钥以 L 为前缀?
Why is the key generated from Kafka JDBC Source Connector getting prefixed with an L?
我正在尝试将 Kafka 连接配置为为从 Oracle 19c table 生成的消息生成密钥。我在尝试遵循 Confluent's docs 中显示的设置时遇到了一些意外行为。当在 VARCHAR 字段上将 ValueToKey SMT 与 ExtractField SMT 链接时,我的密钥以 L 和一些空的 Unicode 字符为前缀。
这是我的 Kafka Connect 生成的消息:
[{
"topic":"BUSINESS_AUD",
"partition":0,
"offset":9,
"timestamp":1617723230767,
"timestampType":"CREATE_TIME",
"headers":[],
"key":"\u0000\u0000\u0000\u0000\u0001\u0002L{B4832FC8-BBCF-488C-9720-97C4D3283FEF}",
"value":{
"AUD_ID":{
"long":11042260
},
"REV":{
"long":80325258
},
"ID":{
"long":31549560804
},
"BUSINESSID":{
"string":"{B4832FC8-BBCF-488C-9720-97C4D3283FEF}"
},
"BUSINESS_PROPERTY_LU_ID":{
"long":24
},
"VALUE":{
"string":"business value"
},
"DTYPE":"VERSION"
}
}]
这是我的源模式:
create table AUDITDB.BUSINESS_AUD
(
AUD_ID NUMBER(38) not null
constraint PKBUSINESS_AUD
primary key,
REV NUMBER(38) not null
constraint FKBUSINESS_AUD
references AUDITDB.REVISION,
REVTYPE NUMBER(3),
ID NUMBER(38),
BUSINESSID VARCHAR2(38),
BUSINESS_PROPERTY_LU_ID NUMBER(38),
VALUE VARCHAR2(1200) default NULL,
DTYPE VARCHAR2(15) not null
)
这就是我配置 jdbc 连接器的方式:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_oracle_BUSINESS_AUD",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"errors.log.enable": true,
"errors.log.include.messages": true,
"connection.url": "jdbc:oracle:thin:@10.0.0.8:7511:t1fnet",
"connection.user": "oracleUser",
"connection.password": "oracleUserPassword",
"mode": "incrementing",
"incrementing.column.name" : "AUD_ID",
"numeric.mapping" : "best_fit",
"poll.interval.ms": "5000",
"transforms":"createKey,ExtractField",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"BUSINESSID",
"transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractField.field":"BUSINESSID",
"query" : "SELECT CAST(DMDA.AUD_ID AS NUMBER(18)) AS AUD_ID, CAST(DMDA.REV AS NUMBER(18)) AS REV, CAST(DMDA.ID AS NUMBER(18)) AS ID, DMDA.BUSINESSID, CAST(DMDA.BUSINESS_PROPERTY_LU_ID AS NUMBER(18)) AS BUSINESS_PROPERTY_LU_ID, DMDA.VALUE, DMDA.DTYPE FROM AUDITDB.BUSINESS_AUD DMDA",
"topic.prefix": "BUSINESS_AUD"
}
}'
我不确定 L 是从哪里来的。当我尝试使用 NUMBER 字段(如 ID)进行相同的设置时,我只是得到了 Unicode 垃圾:
[{
"topic":"BUSINESS_AUD",
"partition":0,
"offset":24149,
"timestamp":1617732719435,
"timestampType":"CREATE_TIME",
"headers":[
],
"key":"\u0000\u0000\u0000\u0000\u0001\u0002�����\u0001",
"value":{
"AUD_ID":{
"long":11205147
},
"REV":{
"long":81016468
},
"ID":{
"long":31549704671
},
"BUSINESSID":{
"string":"{03E796CC-C0AB-4CBD-930E-CA99D9A31362}"
},
"BUSINESS_PROPERTY_LU_ID":{
"long":17
},
"VALUE":{
"string":"{03E796CC-C0AB-4CBD-930E-CA99D9A31362}"
},
"DTYPE":"VERSION"
}
}]
我相当确定问题在于我如何使用 ExtractField SMT,因为当我从链中删除该 SMT 时,它会产生我对 ValueToKey SMT 的期望:
[{
"topic":"BUSINESS_AUD",
"partition":0,
"offset":27311,
"timestamp":1617733541872,
"timestampType":"CREATE_TIME",
"headers":[
],
"key":{
"BUSINESSID":{
"string":"{C2D8CAA4-C964-4AFE-B194-21651187BD23}"
}
},
"value":{
"AUD_ID":{
"long":11213627
},
"REV":{
"long":81114719
},
"ID":{
"long":31549717943
},
"BUSINESSID":{
"string":"{C2D8CAA4-C964-4AFE-B194-21651187BD23}"
},
"BUSINESS_PROPERTY_LU_ID":{
"long":24
},
"VALUE":{
"string":"businessValue"
},
"DTYPE":"VERSION"
}
}]
这让我相信 ExtractField 在处理从 ValueToKey 输出的对象时遇到困难,我只是不确定要更改什么才能获得预期的交互。
我真的被困在这里了,任何帮助都将不胜感激。
所以,我要说的是,显示的输出是使用 Avro 作为键的错误(或者,至少对于原始字段,如字符串或整数)
我不确定 L,但它是二进制数据的 UTF8 解码的结果
前缀 \u0000\u0000\u0000\u0000\u0001
是 0x0
+ (int 1)
,这是注册表中 BUSINESS_AUD-key
的架构 ID。
一般来说,由于键很少是结构化类型,您应该尝试使用 respective key.converter
class for the type you're extracting,例如 StringConverter
我正在尝试将 Kafka 连接配置为为从 Oracle 19c table 生成的消息生成密钥。我在尝试遵循 Confluent's docs 中显示的设置时遇到了一些意外行为。当在 VARCHAR 字段上将 ValueToKey SMT 与 ExtractField SMT 链接时,我的密钥以 L 和一些空的 Unicode 字符为前缀。 这是我的 Kafka Connect 生成的消息:
[{
"topic":"BUSINESS_AUD",
"partition":0,
"offset":9,
"timestamp":1617723230767,
"timestampType":"CREATE_TIME",
"headers":[],
"key":"\u0000\u0000\u0000\u0000\u0001\u0002L{B4832FC8-BBCF-488C-9720-97C4D3283FEF}",
"value":{
"AUD_ID":{
"long":11042260
},
"REV":{
"long":80325258
},
"ID":{
"long":31549560804
},
"BUSINESSID":{
"string":"{B4832FC8-BBCF-488C-9720-97C4D3283FEF}"
},
"BUSINESS_PROPERTY_LU_ID":{
"long":24
},
"VALUE":{
"string":"business value"
},
"DTYPE":"VERSION"
}
}]
这是我的源模式:
create table AUDITDB.BUSINESS_AUD
(
AUD_ID NUMBER(38) not null
constraint PKBUSINESS_AUD
primary key,
REV NUMBER(38) not null
constraint FKBUSINESS_AUD
references AUDITDB.REVISION,
REVTYPE NUMBER(3),
ID NUMBER(38),
BUSINESSID VARCHAR2(38),
BUSINESS_PROPERTY_LU_ID NUMBER(38),
VALUE VARCHAR2(1200) default NULL,
DTYPE VARCHAR2(15) not null
)
这就是我配置 jdbc 连接器的方式:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_oracle_BUSINESS_AUD",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"errors.log.enable": true,
"errors.log.include.messages": true,
"connection.url": "jdbc:oracle:thin:@10.0.0.8:7511:t1fnet",
"connection.user": "oracleUser",
"connection.password": "oracleUserPassword",
"mode": "incrementing",
"incrementing.column.name" : "AUD_ID",
"numeric.mapping" : "best_fit",
"poll.interval.ms": "5000",
"transforms":"createKey,ExtractField",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"BUSINESSID",
"transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractField.field":"BUSINESSID",
"query" : "SELECT CAST(DMDA.AUD_ID AS NUMBER(18)) AS AUD_ID, CAST(DMDA.REV AS NUMBER(18)) AS REV, CAST(DMDA.ID AS NUMBER(18)) AS ID, DMDA.BUSINESSID, CAST(DMDA.BUSINESS_PROPERTY_LU_ID AS NUMBER(18)) AS BUSINESS_PROPERTY_LU_ID, DMDA.VALUE, DMDA.DTYPE FROM AUDITDB.BUSINESS_AUD DMDA",
"topic.prefix": "BUSINESS_AUD"
}
}'
我不确定 L 是从哪里来的。当我尝试使用 NUMBER 字段(如 ID)进行相同的设置时,我只是得到了 Unicode 垃圾:
[{
"topic":"BUSINESS_AUD",
"partition":0,
"offset":24149,
"timestamp":1617732719435,
"timestampType":"CREATE_TIME",
"headers":[
],
"key":"\u0000\u0000\u0000\u0000\u0001\u0002�����\u0001",
"value":{
"AUD_ID":{
"long":11205147
},
"REV":{
"long":81016468
},
"ID":{
"long":31549704671
},
"BUSINESSID":{
"string":"{03E796CC-C0AB-4CBD-930E-CA99D9A31362}"
},
"BUSINESS_PROPERTY_LU_ID":{
"long":17
},
"VALUE":{
"string":"{03E796CC-C0AB-4CBD-930E-CA99D9A31362}"
},
"DTYPE":"VERSION"
}
}]
我相当确定问题在于我如何使用 ExtractField SMT,因为当我从链中删除该 SMT 时,它会产生我对 ValueToKey SMT 的期望:
[{
"topic":"BUSINESS_AUD",
"partition":0,
"offset":27311,
"timestamp":1617733541872,
"timestampType":"CREATE_TIME",
"headers":[
],
"key":{
"BUSINESSID":{
"string":"{C2D8CAA4-C964-4AFE-B194-21651187BD23}"
}
},
"value":{
"AUD_ID":{
"long":11213627
},
"REV":{
"long":81114719
},
"ID":{
"long":31549717943
},
"BUSINESSID":{
"string":"{C2D8CAA4-C964-4AFE-B194-21651187BD23}"
},
"BUSINESS_PROPERTY_LU_ID":{
"long":24
},
"VALUE":{
"string":"businessValue"
},
"DTYPE":"VERSION"
}
}]
这让我相信 ExtractField 在处理从 ValueToKey 输出的对象时遇到困难,我只是不确定要更改什么才能获得预期的交互。
我真的被困在这里了,任何帮助都将不胜感激。
所以,我要说的是,显示的输出是使用 Avro 作为键的错误(或者,至少对于原始字段,如字符串或整数)
我不确定 L,但它是二进制数据的 UTF8 解码的结果
前缀 \u0000\u0000\u0000\u0000\u0001
是 0x0
+ (int 1)
,这是注册表中 BUSINESS_AUD-key
的架构 ID。
一般来说,由于键很少是结构化类型,您应该尝试使用 respective key.converter
class for the type you're extracting,例如 StringConverter