如何从 kafka 中的主题创建具有大量 JSON 字段的 KSQL 流?
How to create KSQL Stream with large number of JSON fields from topic in kafka?
我正在将一个长 JSON 字符串传递给 kafka 主题,例如:
{
"glossary": {
"title": "example glossary",
"GlossDiv": {
"title": "S",
"GlossList": {
"GlossEntry": {
"ID": "SGML",
"SortAs": "SGML",
"GlossTerm": "Standard Generalized Markup Language",
"Acronym": "SGML",
"Abbrev": "ISO 8879:1986",
"GlossDef": {
"para": "A meta-markup language, used to create markup languages such as DocBook.",
"GlossSeeAlso": ["GML", "XML"]
},
"GlossSee": "markup"
}
}
}
}
}
并希望从具有所有字段的 kafka 主题创建流,而不指定 KSQL 中的每个字段,例如:
CREATE STREAM pageviews_original (*) WITH \
(kafka_topic='pageviews', value_format='JSON');
如果你想让KSQL自动抓取字段名,你需要使用Avro。如果你使用 Avro,数据的模式会在 Confluent Schema Registry 中注册,当你使用主题时,KSQL 会自动检索它。
如果您使用 JSON,您 必须 告诉 KSQL 列是什么。您可以在 CREATE STREAM
语句中执行此操作,对嵌套元素使用 STRUCT
数据类型。
您可以通过在 CREATE STREAM
中仅声明高级字段然后使用 EXTRACTJSONFIELD
访问您要使用的字段的嵌套元素来解决列出所有字段的问题。请注意 5.0.0 中存在一个问题,即 fixed in 5.0.1。此外,您不能将其用于您显示的示例数据中确实存在的嵌套数组等。
我正在将一个长 JSON 字符串传递给 kafka 主题,例如:
{
"glossary": {
"title": "example glossary",
"GlossDiv": {
"title": "S",
"GlossList": {
"GlossEntry": {
"ID": "SGML",
"SortAs": "SGML",
"GlossTerm": "Standard Generalized Markup Language",
"Acronym": "SGML",
"Abbrev": "ISO 8879:1986",
"GlossDef": {
"para": "A meta-markup language, used to create markup languages such as DocBook.",
"GlossSeeAlso": ["GML", "XML"]
},
"GlossSee": "markup"
}
}
}
}
}
并希望从具有所有字段的 kafka 主题创建流,而不指定 KSQL 中的每个字段,例如:
CREATE STREAM pageviews_original (*) WITH \
(kafka_topic='pageviews', value_format='JSON');
如果你想让KSQL自动抓取字段名,你需要使用Avro。如果你使用 Avro,数据的模式会在 Confluent Schema Registry 中注册,当你使用主题时,KSQL 会自动检索它。
如果您使用 JSON,您 必须 告诉 KSQL 列是什么。您可以在 CREATE STREAM
语句中执行此操作,对嵌套元素使用 STRUCT
数据类型。
您可以通过在 CREATE STREAM
中仅声明高级字段然后使用 EXTRACTJSONFIELD
访问您要使用的字段的嵌套元素来解决列出所有字段的问题。请注意 5.0.0 中存在一个问题,即 fixed in 5.0.1。此外,您不能将其用于您显示的示例数据中确实存在的嵌套数组等。