在 Flink 中使用 Python 查询嵌套行

Querying nested row with Python in Flink

基于pyflink walkthrough, I'm trying to now get a simple nested row query working using apache-flink==1.14.4. I've created my table structure based upon this solution:

一条消息看起来像这样:

{"signature": {"token": "abcd1234"}}

代码的相关部分如下所示:

create_kafka_source_ddl = """
    CREATE TABLE nested_msg (
        `signature` ROW (
            `token` STRING
        )
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'nested_msg',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'nested-msg',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json'
    )
"""

create_es_sink_ddl = """
    CREATE TABLE es_sink (
        token STRING
    ) WITH (
        'connector' = 'elasticsearch-7',
        'hosts' = 'http://elasticsearch:9200',
        'index' = 'nested_count_1',
        'document-id.key-delimiter' = '$',
        'sink.bulk-flush.max-size' = '42mb',
        'sink.bulk-flush.max-actions' = '32',
        'sink.bulk-flush.interval' = '1000',
        'sink.bulk-flush.backoff.delay' = '1000',
        'format' = 'json'
    )
"""

t_env.execute_sql(create_kafka_source_ddl)
t_env.execute_sql(create_es_sink_ddl)
# How do I select the nested field here?
t_env.from_path("nested_msg").select(col("signature.token").alias("token")).select(
    "token"
).execute_insert("es_sink")

我在这里尝试了很多变体,但都没有成功。例外情况是:

py4j.protocol.Py4JJavaError: An error occurred while calling o48.select.
: org.apache.flink.table.api.ValidationException: Cannot resolve field [signature.token], input field list:[signature].

如何选择这样的嵌套字段以便将其插入到我的接收器中?

您可以将 col("signature.token") 更改为 col("signature").get('token')