ksql - 即使填充了 kafka 主题,CREATE TABLE 也会导致 table 具有空值
ksql - CREATE TABLE results in table with null values even though kafka topic is populated
使用 ksqlDB,我创建了一个带有自定义查询的 JDBC 连接器。然后,从生成的 kafka 主题中,我创建了一个 table。但是,仅针对 PRIMARY KEY 从 table returns 数据中进行选择,而对所有其他值返回 null。我正在连接的 postgres 数据库的销售额 table 不断更新新数据,我正在尝试使用 ksql 进行流式传输。
ksql> CREATE SOURCE CONNECTOR con WITH (
'connector.class' ='io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.url' = '....',
'topic.prefix' = 'sales',
...
'key' = 'id',
'query' = 'SELECT id, time, price FROM sales');
Message
Created connector CON
ksql> print sales limit 1;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/11/30 09:07:55.109 Z, key: [123], value: {"schema":{"type":"struct","fields":[{"type":"string","optional":alse,"field":"id"},{"type":"int64","optional":true,"field":"time"},{"type":"float","optional":true,"field":"price"}],"optional":false},"payload":{"id":"123","time":1,"price":10.0}}
Topic printing ceased
ksql> CREATE TABLE sales_table (id VARCHAR PRIMARY KEY, time INT, price DOUBLE) WITH (kafka_topic='sales', partitions=1, value_format='JSON');
Message
Table created
ksql> SELECT * FROM sales_table EMIT CHANGES LIMIT 1;
+-----+-----+-----+
|ID |TIME |PRICE|
+-----+-----+-----+
|123 |null |null |
Limit Reached
Query terminated
如您所见,kafka 主题的条目在时间和价格字段中具有正确的值。但是,当针对该主题创建 table 时,从 table 中进行选择会产生空时间和价格字段。只有 id(即 PRIMARY KEY 列)被正确打印。
知道为什么会这样吗?
您在 schemas.enable=true
的连接器中使用 org.apache.kafka.connect.json.JsonConverter
转换器,因此您的架构不是 (id VARCHAR PRIMARY KEY, time INT, price DOUBLE)
,因此您得到 NULL 值。
更好的方法是在源连接器中使用 io.confluent.connect.avro.AvroConverter
(或 Protobuf,或 JSON 架构),因为这样您甚至不必为 [=17= 输入架构], 你刚好有
CREATE TABLE sales_table WITH (kafka_topic='sales', value_format='AVRO');
您这样指定替代转换器:
CREATE SOURCE CONNECTOR SOURCE_01 WITH (
…
'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081'
);
但是如果您必须使用 JSON,请在您的源连接器中禁用模式:
CREATE SOURCE CONNECTOR SOURCE_01 WITH (
…
'value.converter.schemas.enable'= 'false'
);
参考:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained
使用 ksqlDB,我创建了一个带有自定义查询的 JDBC 连接器。然后,从生成的 kafka 主题中,我创建了一个 table。但是,仅针对 PRIMARY KEY 从 table returns 数据中进行选择,而对所有其他值返回 null。我正在连接的 postgres 数据库的销售额 table 不断更新新数据,我正在尝试使用 ksql 进行流式传输。
ksql> CREATE SOURCE CONNECTOR con WITH (
'connector.class' ='io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.url' = '....',
'topic.prefix' = 'sales',
...
'key' = 'id',
'query' = 'SELECT id, time, price FROM sales');
Message
Created connector CON
ksql> print sales limit 1;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/11/30 09:07:55.109 Z, key: [123], value: {"schema":{"type":"struct","fields":[{"type":"string","optional":alse,"field":"id"},{"type":"int64","optional":true,"field":"time"},{"type":"float","optional":true,"field":"price"}],"optional":false},"payload":{"id":"123","time":1,"price":10.0}}
Topic printing ceased
ksql> CREATE TABLE sales_table (id VARCHAR PRIMARY KEY, time INT, price DOUBLE) WITH (kafka_topic='sales', partitions=1, value_format='JSON');
Message
Table created
ksql> SELECT * FROM sales_table EMIT CHANGES LIMIT 1;
+-----+-----+-----+
|ID |TIME |PRICE|
+-----+-----+-----+
|123 |null |null |
Limit Reached
Query terminated
如您所见,kafka 主题的条目在时间和价格字段中具有正确的值。但是,当针对该主题创建 table 时,从 table 中进行选择会产生空时间和价格字段。只有 id(即 PRIMARY KEY 列)被正确打印。
知道为什么会这样吗?
您在 schemas.enable=true
的连接器中使用 org.apache.kafka.connect.json.JsonConverter
转换器,因此您的架构不是 (id VARCHAR PRIMARY KEY, time INT, price DOUBLE)
,因此您得到 NULL 值。
更好的方法是在源连接器中使用 io.confluent.connect.avro.AvroConverter
(或 Protobuf,或 JSON 架构),因为这样您甚至不必为 [=17= 输入架构], 你刚好有
CREATE TABLE sales_table WITH (kafka_topic='sales', value_format='AVRO');
您这样指定替代转换器:
CREATE SOURCE CONNECTOR SOURCE_01 WITH (
…
'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081'
);
但是如果您必须使用 JSON,请在您的源连接器中禁用模式:
CREATE SOURCE CONNECTOR SOURCE_01 WITH (
…
'value.converter.schemas.enable'= 'false'
);
参考:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained