Dataflow/Beam 模板、生产化、初始化和 ValueProvider

Dataflow/Beam Templates, Productionization, Initialization, and ValueProviders

我在 Google Cloud Dataflow 上有一个 Apache Beam 作业 运行ning,作为其初始化的一部分,它需要 运行 一些基本的 sanity/availability 服务检查、pub/sub 订阅、GCS blob 等。它是一个旨在 运行 无限处理数十万条 pub/sub 消息的流媒体管道。

目前它需要一整堆必需的可变参数:它需要 运行 哪个 Google 云项目,它将在哪个存储桶和目录前缀中存储文件,哪个 pub/sub 它需要读取的订阅,等等。在调用 pipeline.run 之前,它会使用这些参数进行一些工作 - 验证、字符串拆分等。在目前的形式中,为了开始一项工作,我们一直将这些参数传递给 PipelineOptionsFactory 并每次都发出一个新的编译,但似乎应该有更好的方法。我已经将参数设置为 ValueProvider 对象,但是因为它们是在 pipeline.run 之外调用的,Maven 在编译时抱怨 ValueProvider.get() 在 [=46= 之外被调用]时间背景(是的,它是。)

我已经尝试使用 Google“Creating Templates”文档中的 NestedValueProviders,但是如果我尝试使用 NestedValueProvider.of 到 [=45],我的 IDE 会抱怨=] 文档中显示的字符串。我能够让 NestedValueProviders 编译的唯一方法如下:

NestedValueProvider<String, String> pid = NestedValueProvider.of(
        pipelineOptions.getDataflowProjectId(),
        (SerializableFunction<String, String>) s -> s
);

(String pid = NestedValueProvider.of(...) 导致以下错误:"incompatible types: no instance(s) of type variable(s) T,X exist so that org.apache.beam.sdk.options.ValueProvider.NestedValueProvider conforms to java.lang.String")

我的管道选项中有以下内容:

ValueProvider<String> getDataflowProjectId();
void setDataflowProjectId(ValueProvider<String> value);

由于我们要处理的消息量很大,在管道前端为每条通过的消息添加这些检查并不实际;我们很快就会达到其中一些调用的每日帐户管理限制。

模板是我想做的事情的正确方法吗?我如何着手实际生产这个?应该(可以吗?)我用 Maven 编译成一个 jar,然后只 运行 本地 dev/qa/prod 盒子上的 jar 和我的参数,根本不用 ValueProviders 吗?或者是否可以为 ValueProvider 提供默认值并将其作为传递给模板的选项的一部分进行覆盖?

如有任何关于如何进行的建议,我们将不胜感激。谢谢!

当前实现模板的方式没有意义执行 "post-template creation" 但 "pre-pipeline start" initialization/validation.

所有现有验证都在模板创建期间执行。如果验证检测到值不可用(由于是 ValueProvider),则跳过验证。

在某些情况下,可以通过添加 运行 时间检查来近似验证,作为自定义源的初始拆分的一部分或 DoFn@Setup 方法的一部分.在后一种情况下,@Setup 方法将为创建的每个 DoFn 实例 运行 一次。如果管道是 Batch,在特定实例发生 4 次失败后,管道将失败。

生产管道的另一个选择是构建 运行 管道的 JAR,并有一个 运行 启动管道的 JAR 的生产过程。

关于您收到的编译错误 -- NestedValueProvider returns 和 ValueProvider -- 无法从中得到 String。但是,您可以将验证码放入 SerializableFunction 中,即 NestedValueProvider 中的 运行。 虽然我相信这将在每次访问值时重新运行验证,但让NestedValueProvider缓存翻译后的值并不是不合理的。