无法从 ksqldb 中的 kafka 主题读取消息
Unable to read messages from kafka topic in ksqldb
{
"event": {
"header":{
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{
"customerIdentifiers":[
{"customerIdentifier":"1234","customerIdType":"cc"},
{"customerIdentifier":"234", "customerIdType":"id"}
],
"accountIdentifiers":[
{"accountIdentifier":"123", "accountIdType":"no"},
{"accountIdentifier":"Primary","accountIdType":"da"}
],
"eventDetails":{
"transactionDateTime":"2019-03-26 05:28:13.000",
"transactionDate":"2019-03-26",
"monthAverage":"188",
"dailyAverage":"7"
}
}
}
}
为上述内容创建流 json:
CREATE STREAM STREAM_NAME(
event STRUCT<
header STRUCT<
name VARCHAR,
version VARCHAR,
producer VARCHAR,
channel VARCHAR,
countryCode VARCHAR
eventTimeStamp VARCHAR
>,
body STRUCT<
customerIdentifiers STRUCT<
customerIdentifier VARCHAR,
customerIdType VARCHAR
>,
accountIdentifiers STRUCT<
accountIdentifier VARCHAR,
accountIdType VARCHAR
>,
eventDetails STRUCT<
transactionDateTime VARCHAR,
transactionDate VARCHAR,
productDescription VARCHAR,
monthAverage VARCHAR,
dailyAverage VARCHAR
>
>
>
) WITH (
KAFKA_TOPIC = 'TOPIC1',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1
;
我无法从流中读取消息:
select * from STREAM_NAME emit changes;
有什么建议吗?
您可能 运行 遇到反序列化错误,因为您创建的流的架构与数据的架构不匹配。
重新格式化示例数据后 SQL 错误变得更加明显:
$.event.body.customerIdentifiers
元素是结构数组,但 DDL 将其定义为结构。
$.event.body.accountIdentifiers
元素是结构数组,但 DDL 将其定义为结构。
应该工作的 DDL 是:
CREATE STREAM STREAM_NAME(
event STRUCT<
header STRUCT<
name VARCHAR,
version VARCHAR,
producer VARCHAR,
channel VARCHAR,
countryCode VARCHAR
eventTimeStamp VARCHAR
>,
body STRUCT<
customerIdentifiers ARRAY<STRUCT<
customerIdentifier VARCHAR,
customerIdType VARCHAR
>>,
accountIdentifiers ARRAY<STRUCT<
accountIdentifier VARCHAR,
accountIdType VARCHAR
>>,
eventDetails STRUCT<
transactionDateTime VARCHAR,
transactionDate VARCHAR,
productDescription VARCHAR,
monthAverage VARCHAR,
dailyAverage VARCHAR
>
>
>
) WITH (
KAFKA_TOPIC = 'TOPIC1',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1
;
另请注意,数据不包含 DDL 中的以下内容(尽管这不会导致任何问题,因为字段将只是 NULL
):
$.event.header.eventTimeStamp
$.event.body.eventDetails.productDescription
{
"event": {
"header":{
"name":"abc",
"version":"1.0",
"producer":"123",
"channel":"lab",
"countryCode":"US"
},
"body":{
"customerIdentifiers":[
{"customerIdentifier":"1234","customerIdType":"cc"},
{"customerIdentifier":"234", "customerIdType":"id"}
],
"accountIdentifiers":[
{"accountIdentifier":"123", "accountIdType":"no"},
{"accountIdentifier":"Primary","accountIdType":"da"}
],
"eventDetails":{
"transactionDateTime":"2019-03-26 05:28:13.000",
"transactionDate":"2019-03-26",
"monthAverage":"188",
"dailyAverage":"7"
}
}
}
}
为上述内容创建流 json:
CREATE STREAM STREAM_NAME(
event STRUCT<
header STRUCT<
name VARCHAR,
version VARCHAR,
producer VARCHAR,
channel VARCHAR,
countryCode VARCHAR
eventTimeStamp VARCHAR
>,
body STRUCT<
customerIdentifiers STRUCT<
customerIdentifier VARCHAR,
customerIdType VARCHAR
>,
accountIdentifiers STRUCT<
accountIdentifier VARCHAR,
accountIdType VARCHAR
>,
eventDetails STRUCT<
transactionDateTime VARCHAR,
transactionDate VARCHAR,
productDescription VARCHAR,
monthAverage VARCHAR,
dailyAverage VARCHAR
>
>
>
) WITH (
KAFKA_TOPIC = 'TOPIC1',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1
;
我无法从流中读取消息:
select * from STREAM_NAME emit changes;
有什么建议吗?
您可能 运行 遇到反序列化错误,因为您创建的流的架构与数据的架构不匹配。
重新格式化示例数据后 SQL 错误变得更加明显:
$.event.body.customerIdentifiers
元素是结构数组,但 DDL 将其定义为结构。$.event.body.accountIdentifiers
元素是结构数组,但 DDL 将其定义为结构。
应该工作的 DDL 是:
CREATE STREAM STREAM_NAME(
event STRUCT<
header STRUCT<
name VARCHAR,
version VARCHAR,
producer VARCHAR,
channel VARCHAR,
countryCode VARCHAR
eventTimeStamp VARCHAR
>,
body STRUCT<
customerIdentifiers ARRAY<STRUCT<
customerIdentifier VARCHAR,
customerIdType VARCHAR
>>,
accountIdentifiers ARRAY<STRUCT<
accountIdentifier VARCHAR,
accountIdType VARCHAR
>>,
eventDetails STRUCT<
transactionDateTime VARCHAR,
transactionDate VARCHAR,
productDescription VARCHAR,
monthAverage VARCHAR,
dailyAverage VARCHAR
>
>
>
) WITH (
KAFKA_TOPIC = 'TOPIC1',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1
;
另请注意,数据不包含 DDL 中的以下内容(尽管这不会导致任何问题,因为字段将只是 NULL
):
$.event.header.eventTimeStamp
$.event.body.eventDetails.productDescription