使用 Apache Flink 从 Kafka 消息中获取嵌套字段 SQL

Get nested fields from Kafka message using Apache Flink SQL

我正在尝试使用 Apache Flink 1.11 创建源 table,我可以在其中访问 JSON 消息中的嵌套属性。我可以从根属性中提取值,但我不确定如何访问嵌套对象。

documentation 表明它应该是 MAP 类型,但是当我设置它时,出现以下错误

: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP

这是我的 SQL

        CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )

我的 JSON 看起来像这样:

{
  "id": "message-1",
  "title": "Some Title",
  "properties": {
    "foo": "bar"
  }
}

您可以使用 ROW 提取 JSON 消息中的嵌套字段。您的 DDL 语句类似于:

CREATE TABLE input(
             id VARCHAR,
             title VARCHAR,
             properties ROW(`foo` VARCHAR)
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        );

你也可以试试

CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP<STRING, STRING>
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )

唯一的区别是:MAP<STRING, STRING> vs MAP

[2022更新]

在 Apache Flink 1.13 版本中没有系统 built-in JSON 功能。 它们在 1.14 版本中引入。检查 this

如果您使用的版本 <1.14,请参阅以下解决方案。

如何使用嵌套的 JSON 输入创建 table?

JSON 输入示例:

{
    "id": "message-1",
    "title": "Some Title",
    "properties": {
    "foo": "bar"
    "nested_foo":{
        "prop1" : "value1",
        "prop2" : "value2"
    }
}

创建语句

CREATE TABLE input(
                id VARCHAR,
                title VARCHAR,
                properties ROW(`foo` VARCHAR, `nested_foo` ROW(`prop1` VARCHAR, `prop2` VARCHAR))
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        );

如何 select 嵌套列?

SELECT properties.foo, properties.nested_foo.prop1 FROM input;

注意,如果你用

输出结果
SELECT properties FROM input

您会以行格式查看结果。 properties 栏的内容将是

+I[bar, +I[prop1,prop2]]