运行 在 EMR 上使用 flink 纱线会话的光束管道

Run beam pipeline with flink yarn session on EMR

我正在尝试 运行 来自 python SDK 的基本 wordcount beam 管道和 AWS EMR 上的 flink yarn 会话。我同时使用了 flink 运行ner 和 portable 运行ner 并得到下面列出的两个不同的错误。两种类型的 运行 人员的作业都出现在 flink UI 中,并在我的笔记本电脑上通过本地 flink 会话成功 运行。

使用 FlinkRunner,作业 运行s 作为 BeamApp-hadoop-0617202523-14894e58 并给出错误:

ERROR:root:java.lang.NoClassDefFoundError: Could not initialize class org.apache.beam.runners.core.construction.SerializablePipelineOptions

使用 PortableRunner,作业 运行s 为 BeamApp-root-0617202248-36b0d306(我相信这意味着它成功地从 beam portable 运行ner docker 提交了作业图片)并给出错误:

ERROR:root:java.util.ServiceConfigurationError: com.fasterxml.jackson.databind.Module: Provider com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule not a subtype

我假设这些是依赖性错误,并尝试在 /usr/lib/flink/lib 目录中获取提到的 jars。在应用程序启动时记录类路径时,yarn 容器日志列出了正确的 jar,但错误仍然存​​在。

Apache Beam 版本 2.22.0,flink 版本 1.10.0,emr 版本 5.30.0。

我 运行 遇到了 Apache beam + AWS EMR + Flink 的类似问题,我通过从 FlinkPipelineOptions.filesToStage 中排除 jackson-core、jackson-annotation 和 jackson-databind 依赖项解决了这个问题.

options.setFilesToStage(Arrays.stream(System.getProperty("java.class.path").split(":"))
   .filter(d -> !d.contains("com.fasterxml.jackson.core"))
   .filter(d -> Files.exists(Paths.get(d)))
   .collect(Collectors.toList()));