Apache Flink Python Table API UDF 依赖问题
Apache Flink Python Table API UDF Dependencies Problem
通过将涉及用户定义函数 (UDF) 提交到本地集群来启动 Python Table API 作业后,它崩溃并显示
py4j.protocol.Py4JJavaError 由
引起
java.util.ServiceConfigurationError: org.apache.beam.sdk.options.PipelineOptionsRegistrar: org.apache.beam.sdk.options.DefaultPipelineOptionsRegistrar 不是子类型.
我知道这是一个关于库 path/classloading 依赖性的错误。我已经尝试按照以下 link 中的所有说明进行操作:https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html
我已经使用 classloader.parent-first-patterns-additional
配置选项尝试了多种不同的配置。 org.apache.beam.sdk.[...]
的不同条目导致了不同的额外错误消息。
以下引用 apache beam 的依赖项位于 lib 路径中:
- beam-model-fn-execution-2.20.jar
- beam-model-job-management-2.20.jar
- beam-model-pipeline-2.20.jar
- beam-运行ners-core-construction-java-2.20.jar
- beam-运行ners-java-fn-execution-2.20.jar
- beam-sdks-java-core-2.20.jar
- beam-sdks-java-fn-execution-2.20.jar
- beam-vendor-grpc-1_21_0-0.1.jar
- beam-vendor-grpc-1_26_0.0.3.jar
- beam-vendor-guava-26_0-jre-0.1.jar
- beam-vendor-sdks-java-extensions-protobuf-2.20.jar
我也可以排除是我的代码造成的,因为我已经测试了以下项目网站的示例代码:https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
t_env.register_function("add", add)
t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.create_temporary_table('mySource')
t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field('sum', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('sum', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
t_env.from_path('mySource')\
.select("add(a, b)") \
.insert_into('mySink')
t_env.execute("tutorial_job")
执行这段代码时,出现同样的错误信息。
是否有人描述了可以使用 UDF 运行 Python Table API 作业的 Flink 集群的配置?非常感谢您提前提供所有提示!
Apache Flink新版本1.10.1已解决该问题。现在可以使用命令 run -py path/to/script
通过二进制文件执行问题中显示的示例脚本,没有任何问题。
至于依赖,它们已经包含在已经交付的flink_table_x.xx-1.10.1.jar
中。因此,不需要向 lib-path 添加进一步的依赖项,这是通过 debugging/configuration 尝试在问题中完成的。
通过将涉及用户定义函数 (UDF) 提交到本地集群来启动 Python Table API 作业后,它崩溃并显示 py4j.protocol.Py4JJavaError 由
引起java.util.ServiceConfigurationError: org.apache.beam.sdk.options.PipelineOptionsRegistrar: org.apache.beam.sdk.options.DefaultPipelineOptionsRegistrar 不是子类型.
我知道这是一个关于库 path/classloading 依赖性的错误。我已经尝试按照以下 link 中的所有说明进行操作:https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html
我已经使用 classloader.parent-first-patterns-additional
配置选项尝试了多种不同的配置。 org.apache.beam.sdk.[...]
的不同条目导致了不同的额外错误消息。
以下引用 apache beam 的依赖项位于 lib 路径中:
- beam-model-fn-execution-2.20.jar
- beam-model-job-management-2.20.jar
- beam-model-pipeline-2.20.jar
- beam-运行ners-core-construction-java-2.20.jar
- beam-运行ners-java-fn-execution-2.20.jar
- beam-sdks-java-core-2.20.jar
- beam-sdks-java-fn-execution-2.20.jar
- beam-vendor-grpc-1_21_0-0.1.jar
- beam-vendor-grpc-1_26_0.0.3.jar
- beam-vendor-guava-26_0-jre-0.1.jar
- beam-vendor-sdks-java-extensions-protobuf-2.20.jar
我也可以排除是我的代码造成的,因为我已经测试了以下项目网站的示例代码:https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
t_env.register_function("add", add)
t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.create_temporary_table('mySource')
t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field('sum', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('sum', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
t_env.from_path('mySource')\
.select("add(a, b)") \
.insert_into('mySink')
t_env.execute("tutorial_job")
执行这段代码时,出现同样的错误信息。
是否有人描述了可以使用 UDF 运行 Python Table API 作业的 Flink 集群的配置?非常感谢您提前提供所有提示!
Apache Flink新版本1.10.1已解决该问题。现在可以使用命令 run -py path/to/script
通过二进制文件执行问题中显示的示例脚本,没有任何问题。
至于依赖,它们已经包含在已经交付的flink_table_x.xx-1.10.1.jar
中。因此,不需要向 lib-path 添加进一步的依赖项,这是通过 debugging/configuration 尝试在问题中完成的。