Apache Flink 和 Pureconfig - 在作业启动时传递 java 属性
Apache Flink and Pureconfig - passing java properties on job startup
我想将 pureconfig 与 apache Flink 一起使用。
如何在开始作业时传递额外的 java 属性?
我尝试通过:-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"
参数传递它,但不被接受:
https://github.com/geoHeil/streaming-reference/blob/5-basic-flink-setup/Makefile#L21
flink run --class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"
并且主要 class 在尝试从配置文件实例化配置时失败。
请注意,https://github.com/geoHeil/streaming-reference 提供了完整的参考资料。您可以通过以下方式重现上述错误:
git clone git@github.com:geoHeil/streaming-reference.git
cd streaming-reference
git checkout 5-basic-flink-setup
make run-local-Tweets
并且应该看到例外情况:
ConfigurationException: Failed to start. There is a problem with the configuration: ConfigReaderFailures(ConvertFailure(KeyNotFound(foo,Set()),None,),List())
在 Spark 中,这个 属性 被称为:extraJavaOptions
。
编辑
即我尝试使用Flink: How to pass extra JVM options to TaskManager and JobManager的方法,但目前Flink(1.10.1)版本还不行
此 属性 等同于 Apache Spark 中的 spark.driver.extraJavaOptions
。我相信,它需要传递给工作经理。
如果我阅读文档 -yD
,则仅适用于 YARN。但我还需要一些在本地也能工作的东西。
更多相关文章:
正在从邮件列表中复制答案。
如果您将集群重复用于多个作业,它们需要共享 JVM_ARGS
,因为这是同一个进程。 [1] 在 Spark 上,afaik 的每个阶段都会产生新的进程。
但是,目前的建议是每个 job/application 只使用一个临时集群(这更接近 Spark 的工作方式)。因此,如果您使用 YARN,每个 job/application 都会生成一个大小恰到好处的新集群。然后您可以使用
为新的 YARN 提交提供新参数
flink run -m yarn-cluster -yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'" -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
但是,请确保路径可以从 YARN 集群中访问,因为驱动程序可能在集群上执行(不是 100% 确定)。
要将文件添加到 yarn deployment,请使用
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
如果你想在共享集群上按作业级别配置,我建议使用普通参数并手动初始化 PureConfig(没有使用过,所以不确定如何)。然后,您可能会按如下方式调用您的程序。
flink run -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" config.file='config/jobs/twitter-analysis.conf'
对于本地执行,我在配置它时也遇到了一些问题(用你的代码试过)。问题是我们之前尝试的所有参数仅传递给新生成的进程,而您的代码直接在 CLI 中执行。
FLINK_ENV_JAVA_OPTS=-Dconfig.file="`pwd`/config/jobs/twitter-analysis.conf" flink run -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
FLINK_ENV_JAVA_OPTS
通常使用 env.java.opts 从 flink-conf.yaml 解析,但不遵守 -Denv.java.opts
。我不确定这是不是故意的。
如果你可以将 env.java.opts
放在 flink-conf.yaml 中,它很可能适用于 YARN 和本地。使用 FLINK_CONF_DIR
您可以为每个作业设置不同的 conf 目录。或者,您也可以同时指定 FLINK_ENV_JAVA_OPTS
和 -yD
来注入 属性.
我想将 pureconfig 与 apache Flink 一起使用。
如何在开始作业时传递额外的 java 属性?
我尝试通过:-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"
参数传递它,但不被接受:
https://github.com/geoHeil/streaming-reference/blob/5-basic-flink-setup/Makefile#L21
flink run --class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \ "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"
并且主要 class 在尝试从配置文件实例化配置时失败。
请注意,https://github.com/geoHeil/streaming-reference 提供了完整的参考资料。您可以通过以下方式重现上述错误:
git clone git@github.com:geoHeil/streaming-reference.git
cd streaming-reference
git checkout 5-basic-flink-setup
make run-local-Tweets
并且应该看到例外情况:
ConfigurationException: Failed to start. There is a problem with the configuration: ConfigReaderFailures(ConvertFailure(KeyNotFound(foo,Set()),None,),List())
在 Spark 中,这个 属性 被称为:extraJavaOptions
。
编辑
即我尝试使用Flink: How to pass extra JVM options to TaskManager and JobManager的方法,但目前Flink(1.10.1)版本还不行
此 属性 等同于 Apache Spark 中的 spark.driver.extraJavaOptions
。我相信,它需要传递给工作经理。
如果我阅读文档 -yD
,则仅适用于 YARN。但我还需要一些在本地也能工作的东西。
更多相关文章:
正在从邮件列表中复制答案。
如果您将集群重复用于多个作业,它们需要共享 JVM_ARGS
,因为这是同一个进程。 [1] 在 Spark 上,afaik 的每个阶段都会产生新的进程。
但是,目前的建议是每个 job/application 只使用一个临时集群(这更接近 Spark 的工作方式)。因此,如果您使用 YARN,每个 job/application 都会生成一个大小恰到好处的新集群。然后您可以使用
为新的 YARN 提交提供新参数flink run -m yarn-cluster -yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'" -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
但是,请确保路径可以从 YARN 集群中访问,因为驱动程序可能在集群上执行(不是 100% 确定)。
要将文件添加到 yarn deployment,请使用
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
如果你想在共享集群上按作业级别配置,我建议使用普通参数并手动初始化 PureConfig(没有使用过,所以不确定如何)。然后,您可能会按如下方式调用您的程序。
flink run -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" config.file='config/jobs/twitter-analysis.conf'
对于本地执行,我在配置它时也遇到了一些问题(用你的代码试过)。问题是我们之前尝试的所有参数仅传递给新生成的进程,而您的代码直接在 CLI 中执行。
FLINK_ENV_JAVA_OPTS=-Dconfig.file="`pwd`/config/jobs/twitter-analysis.conf" flink run -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
FLINK_ENV_JAVA_OPTS
通常使用 env.java.opts 从 flink-conf.yaml 解析,但不遵守 -Denv.java.opts
。我不确定这是不是故意的。
如果你可以将 env.java.opts
放在 flink-conf.yaml 中,它很可能适用于 YARN 和本地。使用 FLINK_CONF_DIR
您可以为每个作业设置不同的 conf 目录。或者,您也可以同时指定 FLINK_ENV_JAVA_OPTS
和 -yD
来注入 属性.