Apache Flink 和 Pureconfig - 在作业启动时传递 java 属性

Apache Flink and Pureconfig - passing java properties on job startup

我想将 pureconfig 与 apache Flink 一起使用。

如何在开始作业时传递额外的 java 属性?

我尝试通过:-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'" 参数传递它,但不被接受:

https://github.com/geoHeil/streaming-reference/blob/5-basic-flink-setup/Makefile#L21

flink run --class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
      "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"


-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

并且主要 class 在尝试从配置文件实例化配置时失败。

请注意,https://github.com/geoHeil/streaming-reference 提供了完整的参考资料。您可以通过以下方式重现上述错误:

git clone git@github.com:geoHeil/streaming-reference.git
cd streaming-reference
git checkout 5-basic-flink-setup
make run-local-Tweets

并且应该看到例外情况:

ConfigurationException: Failed to start. There is a problem with the configuration: ConfigReaderFailures(ConvertFailure(KeyNotFound(foo,Set()),None,),List())

在 Spark 中,这个 属性 被称为:extraJavaOptions

编辑

即我尝试使用Flink: How to pass extra JVM options to TaskManager and JobManager的方法,但目前Flink(1.10.1)版本还不行

此 属性 等同于 Apache Spark 中的 spark.driver.extraJavaOptions。我相信,它需要传递给工作经理。

如果我阅读文档 -yD,则仅适用于 YARN。但我还需要一些在本地也能工作的东西。

更多相关文章:

正在从邮件列表中复制答案。

如果您将集群重复用于多个作业,它们需要共享 JVM_ARGS,因为这是同一个进程。 [1] 在 Spark 上,afaik 的每个阶段都会产生新的进程。

但是,目前的建议是每个 job/application 只使用一个临时集群(这更接近 Spark 的工作方式)。因此,如果您使用 YARN,每个 job/application 都会生成一个大小恰到好处的新集群。然后您可以使用

为新的 YARN 提交提供新参数
flink run -m yarn-cluster -yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'" -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis 
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"

但是,请确保路径可以从 YARN 集群中访问,因为驱动程序可能在集群上执行(不是 100% 确定)。

要将文件添加到 yarn deployment,请使用

 -yt,--yarnship <arg>                 Ship files in the specified directory
                                      (t for transfer)

如果你想在共享集群上按作业级别配置,我建议使用普通参数并手动初始化 PureConfig(没有使用过,所以不确定如何)。然后,您可能会按如下方式调用您的程序。

flink run -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" config.file='config/jobs/twitter-analysis.conf'

对于本地执行,我在配置它时也遇到了一些问题(用你的代码试过)。问题是我们之前尝试的所有参数仅传递给新生成的进程,而您的代码直接在 CLI 中执行。

FLINK_ENV_JAVA_OPTS=-Dconfig.file="`pwd`/config/jobs/twitter-analysis.conf" flink run -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis     "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"

FLINK_ENV_JAVA_OPTS 通常使用 env.java.opts 从 flink-conf.yaml 解析,但不遵守 -Denv.java.opts。我不确定这是不是故意的。

如果你可以将 env.java.opts 放在 flink-conf.yaml 中,它很可能适用于 YARN 和本地。使用 FLINK_CONF_DIR 您可以为每个作业设置不同的 conf 目录。或者,您也可以同时指定 FLINK_ENV_JAVA_OPTS-yD 来注入 属性.