如何读取嵌套的 avro 字段来创建流?
How to read the nested avro fields for creating streams?
我在 Kafka 主题中有以下 AVRO 消息。
{
"table": {
"string": "Schema.xDEAL"
},
"op_type": {
"string": "Insert"
},
"op_ts": {
"string": "2018-03-16 09:03:25.000462"
},
"current_ts": {
"string": "2018-03-16 10:03:37.778000"
},
"pos": {
"string": "00000000000000010722"
},
"before": null,
"after": {
"row": {
"DEA_PID_DEAL": {
"string": "AAAAAAAA"
},
"DEA_NME_DEAL": {
"string": "MY OGG DEAL"
},
"DEA_NME_ALIAS_NAME": {
"string": "MY OGG DEAL"
},
"DEA_NUM_DEAL_CNTL": {
"string": "4swb6zs4"
}
}
}
}
当我运行下面的查询。它创建具有空值的流。
CREATE STREAM tls_deal (DEA_PID_DEAL VARCHAR, DEA_NME_DEAL varchar, DEA_NME_ALIAS_NAME VARCHAR, DEA_NUM_DEAL_CNTL VARCHAR) WITH (kafka_topic='deal-ogg-topic',value_format='AVRO', key = 'DEA_PID_DEAL');
但是当我将 AVRO 消息更改为跟随它时,它起作用了。
{
"table": {
"string": "Schema.xDEAL"
},
"op_type": {
"string": "Insert"
},
"op_ts": {
"string": "2018-03-16 09:03:25.000462"
},
"current_ts": {
"string": "2018-03-16 10:03:37.778000"
},
"pos": {
"string": "00000000000000010722"
},
"DEA_PID_DEAL": {
"string": "AAAAAAAA"
},
"DEA_NME_DEAL": {
"string": "MY OGG DEAL"
},
"DEA_NME_ALIAS_NAME": {
"string": "MY OGG DEAL"
},
"DEA_NUM_DEAL_CNTL": {
"string": "4swb6zs4"
}
}
现在,如果我 运行 上面的查询将填充数据。
我的问题是,如果我需要从嵌套字段填充流,我该如何处理?
我无法在 KSQL 文档页面中找到解决方案。
提前致谢。感谢您的帮助。 :)
KSQL 当前(2018 年 3 月 22 日/v0.5)不支持嵌套 Avro。您可以使用 Single Message Transform 来展平来自 Kafka Connect 的数据。例如,Debezium 附带 UnwrapFromEnvelope
。
正如 Robin 所述,当前 不支持此功能(2018 年 3 月 22 日 / v0.5)。但是,这是一个跟踪的功能请求。您可能想要 up-vote 或在 KSQL 存储库中跟踪此 Github 问题:
我在 Kafka 主题中有以下 AVRO 消息。
{
"table": {
"string": "Schema.xDEAL"
},
"op_type": {
"string": "Insert"
},
"op_ts": {
"string": "2018-03-16 09:03:25.000462"
},
"current_ts": {
"string": "2018-03-16 10:03:37.778000"
},
"pos": {
"string": "00000000000000010722"
},
"before": null,
"after": {
"row": {
"DEA_PID_DEAL": {
"string": "AAAAAAAA"
},
"DEA_NME_DEAL": {
"string": "MY OGG DEAL"
},
"DEA_NME_ALIAS_NAME": {
"string": "MY OGG DEAL"
},
"DEA_NUM_DEAL_CNTL": {
"string": "4swb6zs4"
}
}
}
}
当我运行下面的查询。它创建具有空值的流。
CREATE STREAM tls_deal (DEA_PID_DEAL VARCHAR, DEA_NME_DEAL varchar, DEA_NME_ALIAS_NAME VARCHAR, DEA_NUM_DEAL_CNTL VARCHAR) WITH (kafka_topic='deal-ogg-topic',value_format='AVRO', key = 'DEA_PID_DEAL');
但是当我将 AVRO 消息更改为跟随它时,它起作用了。
{
"table": {
"string": "Schema.xDEAL"
},
"op_type": {
"string": "Insert"
},
"op_ts": {
"string": "2018-03-16 09:03:25.000462"
},
"current_ts": {
"string": "2018-03-16 10:03:37.778000"
},
"pos": {
"string": "00000000000000010722"
},
"DEA_PID_DEAL": {
"string": "AAAAAAAA"
},
"DEA_NME_DEAL": {
"string": "MY OGG DEAL"
},
"DEA_NME_ALIAS_NAME": {
"string": "MY OGG DEAL"
},
"DEA_NUM_DEAL_CNTL": {
"string": "4swb6zs4"
}
}
现在,如果我 运行 上面的查询将填充数据。
我的问题是,如果我需要从嵌套字段填充流,我该如何处理?
我无法在 KSQL 文档页面中找到解决方案。
提前致谢。感谢您的帮助。 :)
KSQL 当前(2018 年 3 月 22 日/v0.5)不支持嵌套 Avro。您可以使用 Single Message Transform 来展平来自 Kafka Connect 的数据。例如,Debezium 附带 UnwrapFromEnvelope
。
正如 Robin 所述,当前 不支持此功能(2018 年 3 月 22 日 / v0.5)。但是,这是一个跟踪的功能请求。您可能想要 up-vote 或在 KSQL 存储库中跟踪此 Github 问题: