在 Flink 中使用 Python 查询嵌套行
Querying nested row with Python in Flink
基于pyflink
walkthrough, I'm trying to now get a simple nested row query working using apache-flink==1.14.4
. I've created my table structure based upon this solution:
一条消息看起来像这样:
{"signature": {"token": "abcd1234"}}
代码的相关部分如下所示:
create_kafka_source_ddl = """
CREATE TABLE nested_msg (
`signature` ROW (
`token` STRING
)
) WITH (
'connector' = 'kafka',
'topic' = 'nested_msg',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'nested-msg',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
create_es_sink_ddl = """
CREATE TABLE es_sink (
token STRING
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://elasticsearch:9200',
'index' = 'nested_count_1',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.max-size' = '42mb',
'sink.bulk-flush.max-actions' = '32',
'sink.bulk-flush.interval' = '1000',
'sink.bulk-flush.backoff.delay' = '1000',
'format' = 'json'
)
"""
t_env.execute_sql(create_kafka_source_ddl)
t_env.execute_sql(create_es_sink_ddl)
# How do I select the nested field here?
t_env.from_path("nested_msg").select(col("signature.token").alias("token")).select(
"token"
).execute_insert("es_sink")
我在这里尝试了很多变体,但都没有成功。例外情况是:
py4j.protocol.Py4JJavaError: An error occurred while calling o48.select.
: org.apache.flink.table.api.ValidationException: Cannot resolve field [signature.token], input field list:[signature].
如何选择这样的嵌套字段以便将其插入到我的接收器中?
您可以将 col("signature.token")
更改为 col("signature").get('token')
。
基于pyflink
walkthrough, I'm trying to now get a simple nested row query working using apache-flink==1.14.4
. I've created my table structure based upon this solution:
一条消息看起来像这样:
{"signature": {"token": "abcd1234"}}
代码的相关部分如下所示:
create_kafka_source_ddl = """
CREATE TABLE nested_msg (
`signature` ROW (
`token` STRING
)
) WITH (
'connector' = 'kafka',
'topic' = 'nested_msg',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'nested-msg',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
create_es_sink_ddl = """
CREATE TABLE es_sink (
token STRING
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://elasticsearch:9200',
'index' = 'nested_count_1',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.max-size' = '42mb',
'sink.bulk-flush.max-actions' = '32',
'sink.bulk-flush.interval' = '1000',
'sink.bulk-flush.backoff.delay' = '1000',
'format' = 'json'
)
"""
t_env.execute_sql(create_kafka_source_ddl)
t_env.execute_sql(create_es_sink_ddl)
# How do I select the nested field here?
t_env.from_path("nested_msg").select(col("signature.token").alias("token")).select(
"token"
).execute_insert("es_sink")
我在这里尝试了很多变体,但都没有成功。例外情况是:
py4j.protocol.Py4JJavaError: An error occurred while calling o48.select.
: org.apache.flink.table.api.ValidationException: Cannot resolve field [signature.token], input field list:[signature].
如何选择这样的嵌套字段以便将其插入到我的接收器中?
您可以将 col("signature.token")
更改为 col("signature").get('token')
。