PyFlink - UNNEST 问题:查询使用了不受支持的 SQL 功能?
PyFlink - Issue with UNNEST: query uses an unsupported SQL feature?
我正在尝试在 Table API.
中使用 UNNEST 函数展平数组
我是做错了什么还是不受支持的功能?不过此页面建议:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html
谢谢!
代码
Python udf
@udf(input_types=DataTypes.STRING(), result_type=DataTypes.ARRAY(DataTypes.STRING()))
def explode(s):
return 3*[s]
t_env.register_function("explode", explode)
处理中
tab = t_env.from_path('mySource').select("id, explode(dummy) as dummy_list")
t_env.register_table("temp_table", tab)
t_env.sql_query("SELECT t.item as dummy_item FROM UNNEST(select dummy_list from temp_table) AS t(item)").insert_into("mySink")
执行
t_env.execute("dummy_unnest")
错误
TableException: Cannot generate a valid execution plan for the given query:
LogicalProject(dummy_item=[[=14=]])
Uncollect
LogicalProject(EXPR[=14=]=[org$apache$flink$table$functions$python$PythonScalarFunction8596b4671476ee325743dba92ed6c7()])
LogicalTableScan(table=[[default_catalog, default_database, mySource]])
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
我想你可以将查询更改为
select id, dummy_item from temp_table CROSS JOIN UNNEST(dummy_list) AS t (dummy_item)
我正在尝试在 Table API.
中使用 UNNEST 函数展平数组我是做错了什么还是不受支持的功能?不过此页面建议:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html
谢谢!
代码
Python udf
@udf(input_types=DataTypes.STRING(), result_type=DataTypes.ARRAY(DataTypes.STRING()))
def explode(s):
return 3*[s]
t_env.register_function("explode", explode)
处理中
tab = t_env.from_path('mySource').select("id, explode(dummy) as dummy_list")
t_env.register_table("temp_table", tab)
t_env.sql_query("SELECT t.item as dummy_item FROM UNNEST(select dummy_list from temp_table) AS t(item)").insert_into("mySink")
执行
t_env.execute("dummy_unnest")
错误
TableException: Cannot generate a valid execution plan for the given query:
LogicalProject(dummy_item=[[=14=]])
Uncollect
LogicalProject(EXPR[=14=]=[org$apache$flink$table$functions$python$PythonScalarFunction8596b4671476ee325743dba92ed6c7()])
LogicalTableScan(table=[[default_catalog, default_database, mySource]])
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
我想你可以将查询更改为
select id, dummy_item from temp_table CROSS JOIN UNNEST(dummy_list) AS t (dummy_item)