使用 Apache Flink 从 Kafka 消息中获取嵌套字段 SQL
Get nested fields from Kafka message using Apache Flink SQL
我正在尝试使用 Apache Flink 1.11 创建源 table,我可以在其中访问 JSON 消息中的嵌套属性。我可以从根属性中提取值,但我不确定如何访问嵌套对象。
documentation 表明它应该是 MAP
类型,但是当我设置它时,出现以下错误
: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP
这是我的 SQL
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)
我的 JSON 看起来像这样:
{
"id": "message-1",
"title": "Some Title",
"properties": {
"foo": "bar"
}
}
您可以使用 ROW
提取 JSON 消息中的嵌套字段。您的 DDL 语句类似于:
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR)
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
);
你也可以试试
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP<STRING, STRING>
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)
唯一的区别是:MAP<STRING, STRING>
vs MAP
[2022更新]
在 Apache Flink 1.13 版本中没有系统 built-in JSON 功能。
它们在 1.14 版本中引入。检查 this
如果您使用的版本 <1.14,请参阅以下解决方案。
如何使用嵌套的 JSON 输入创建 table?
JSON 输入示例:
{
"id": "message-1",
"title": "Some Title",
"properties": {
"foo": "bar"
"nested_foo":{
"prop1" : "value1",
"prop2" : "value2"
}
}
创建语句
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR, `nested_foo` ROW(`prop1` VARCHAR, `prop2` VARCHAR))
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
);
如何 select 嵌套列?
SELECT properties.foo, properties.nested_foo.prop1 FROM input;
注意,如果你用
输出结果
SELECT properties FROM input
您会以行格式查看结果。 properties
栏的内容将是
+I[bar, +I[prop1,prop2]]
我正在尝试使用 Apache Flink 1.11 创建源 table,我可以在其中访问 JSON 消息中的嵌套属性。我可以从根属性中提取值,但我不确定如何访问嵌套对象。
documentation 表明它应该是 MAP
类型,但是当我设置它时,出现以下错误
: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP
这是我的 SQL
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)
我的 JSON 看起来像这样:
{
"id": "message-1",
"title": "Some Title",
"properties": {
"foo": "bar"
}
}
您可以使用 ROW
提取 JSON 消息中的嵌套字段。您的 DDL 语句类似于:
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR)
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
);
你也可以试试
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP<STRING, STRING>
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)
唯一的区别是:MAP<STRING, STRING>
vs MAP
[2022更新]
在 Apache Flink 1.13 版本中没有系统 built-in JSON 功能。 它们在 1.14 版本中引入。检查 this
如果您使用的版本 <1.14,请参阅以下解决方案。
如何使用嵌套的 JSON 输入创建 table?
JSON 输入示例:
{
"id": "message-1",
"title": "Some Title",
"properties": {
"foo": "bar"
"nested_foo":{
"prop1" : "value1",
"prop2" : "value2"
}
}
创建语句
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR, `nested_foo` ROW(`prop1` VARCHAR, `prop2` VARCHAR))
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
);
如何 select 嵌套列?
SELECT properties.foo, properties.nested_foo.prop1 FROM input;
注意,如果你用
输出结果SELECT properties FROM input
您会以行格式查看结果。 properties
栏的内容将是
+I[bar, +I[prop1,prop2]]