pyflink 中的 Py4JJavaError Table api
Py4JJavaError in pyflink Table api
此代码将 pandas 转换为 flink table 进行转换,而不是再次转换回 pandas。当我使用 filter
filter
而不是 select
时它完美地工作但是当我添加 group_by
和 order_by
.
时给我一个错误
import pandas as pd
import numpy as np
f_s_env = StreamExecutionEnvironment.get_execution_environment()
f_s_settings = EnvironmentSettings.new_instance().use_old_planner().in_streaming_mode().build()
table_env = StreamTableEnvironment.create(f_s_env, environment_settings=f_s_settings)
df = pd.read_csv("dataBase/New/candidate.csv")
col = ['candidate_id', 'candidate_source_id', 'candidate_first_name',
'candidate_middle_name', 'candidate_last_name', 'candidate_email',
'created_date', 'last_modified_date', 'last_modified_by']
table = table_env.from_pandas(df,col)
table.filter("candidate_id > 322445")\
.filter("candidate_first_name === 'Libby'")\
.group_by("candidate_id, candidate_source_id")\
.select("candidate_id, candidate_source_id")\
.order_by("candidate_id").to_pandas()
我的错误是
Py4JJavaError: An error occurred while calling o3164.orderBy.
: org.apache.flink.table.api.ValidationException: A limit operation on unbounded tables is currently not supported.
at org.apache.flink.table.operations.utils.SortOperationFactory.failIfStreaming(SortOperationFactory.java:131)
at org.apache.flink.table.operations.utils.SortOperationFactory.createSort(SortOperationFactory.java:63)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.sort(OperationTreeBuilder.java:409)
at org.apache.flink.table.api.internal.TableImpl.orderBy(TableImpl.java:401)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:745)
此代码将 pandas 转换为 flink table 进行转换,而不是再次转换回 pandas。当我使用 filter
filter
而不是 select
时它完美地工作但是当我添加 group_by
和 order_by
.
import pandas as pd
import numpy as np
f_s_env = StreamExecutionEnvironment.get_execution_environment()
f_s_settings = EnvironmentSettings.new_instance().use_old_planner().in_streaming_mode().build()
table_env = StreamTableEnvironment.create(f_s_env, environment_settings=f_s_settings)
df = pd.read_csv("dataBase/New/candidate.csv")
col = ['candidate_id', 'candidate_source_id', 'candidate_first_name',
'candidate_middle_name', 'candidate_last_name', 'candidate_email',
'created_date', 'last_modified_date', 'last_modified_by']
table = table_env.from_pandas(df,col)
table.filter("candidate_id > 322445")\
.filter("candidate_first_name === 'Libby'")\
.group_by("candidate_id, candidate_source_id")\
.select("candidate_id, candidate_source_id")\
.order_by("candidate_id").to_pandas()
我的错误是
Py4JJavaError: An error occurred while calling o3164.orderBy.
: org.apache.flink.table.api.ValidationException: A limit operation on unbounded tables is currently not supported.
at org.apache.flink.table.operations.utils.SortOperationFactory.failIfStreaming(SortOperationFactory.java:131)
at org.apache.flink.table.operations.utils.SortOperationFactory.createSort(SortOperationFactory.java:63)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.sort(OperationTreeBuilder.java:409)
at org.apache.flink.table.api.internal.TableImpl.orderBy(TableImpl.java:401)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:745)