源与 PTransform

Source Vs PTransform

我是这个项目的新手,我正在尝试在 Dataflow 和数据库之间创建一个连接器。

文档明确指出我应该使用 Source 和 Sink,但我看到很多人直接使用与 PInput 或 PDone 关联的 PTransform。

source/sink API 处于实验阶段(用 PTransform 解释了所有示例),但似乎更容易将其与自定义运行器集成(例如:spark)。

如果我参考代码,这两种方法都用到了。我看不到任何使用 PTransform API.

会更有趣的用例

Source/Sink API 是否应该替换 PTranform API?

我是否遗漏了可以清楚地区分这两种方法的内容?

Source/Sink API 是否足够稳定以被认为是编码输入和输出的好方法?

多谢指教!

Dataflow的哲学是PTransform是抽象和可组合性的主要单元,即任何self-contained数据处理任务都应该封装为一个PTransform。这包括连接到 third-party 存储系统的任务:从某处摄取数据或将其导出到某处。

以 Google 云数据存储为例。在代码片段中:

    PCollection<Entity> entities =
      p.apply(DatastoreIO.readFrom(dataset, query));
    ...
    p.apply(some processing)
     .apply(DatastoreIO.writeTo(dataset));

DatastoreIO.readFrom(dataset, query)的return类型是PTransform<PBegin, PCollection<Entity>>的子class,DatastoreIO.writeTo(dataset)的类型是class的子class PTransform<PCollection<Entity>, PDone>.

的确,这些功能是使用 SourceSink classes 实现的,但对于只想读取或写入数据存储区的用户而言,这是一个通常无关紧要的实现细节 (但是,请参阅本答案末尾关于公开 SourceSink class 的注释).任何连接器,或者就此而言,任何其他数据处理任务都是 PTransform.

注意:目前从某处读取的连接器往往是 PTransform<PBegin, PCollection<T>>,而写入某处的连接器往往是 PTransform<PCollection<T>, PDone>,但我们正在考虑实现它的选项更容易以更灵活的方式使用连接器(例如,从 PCollection 个文件名中读取)。

但是,对于想要实施新连接器的人来说,这个细节当然很重要。特别是,您可能会问:

问:如果我可以将连接器实现为 PTransform,为什么我还需要 SourceSink classes?

答:如果您可以仅使用 built-in 转换(例如 ParDoGroupByKey 等)来实现您的连接器,那是完全有效的 然而,SourceSink classes 提供了一些 low-level 功能,如果您需要它们,它们会很麻烦或无法发展自己。

例如,BoundedSourceUnboundedSource 提供用于控制并行化发生方式的挂钩(初始和动态工作重新平衡 - BoundedSource.splitIntoBundlesBoundedReader.splitAtFraction),而这些挂钩当前未公开任意 DoFns.

您可以通过编写一个将文件名作为输入、读​​取文件并发出 SomeRecordDoFn<FilePath, SomeRecord> 从技术上实现文件格式的解析器,但是这个 DoFn 不会能够将文件的部分内容动态地并行化到多个工作程序上,以防文件在运行时变得非常大。另一方面,FileBasedSource 具有此功能 built-in,以及处理 glob 文件模式等。

同样,您可以尝试通过实现一个 DoFn 来实现流系统的连接器,该连接器将虚拟元素作为输入,建立连接并将所有元素流式传输到 ProcessingContext.output(),但是 DoFns 目前不支持从单个包中写入无限量的输出,也不明确支持数据流为流管道提供强一致性保证所需的检查点和重复数据删除机制。 UnboundedSource,另一方面,支持所有这些。

Sink(更准确地说,Write.to() PTransform)也很有趣:它只是一个复合转换,如果你愿意,你可以自己编写(即它没有hard-coded Dataflow runner 或后端支持),但它的开发考虑了将数据并行写入存储系统时出现的典型分布式容错问题,它提供了强制你的钩子记住这些问题:例如,因为数据包是并行写入的,并且一些包可能会被重试或复制以实现容错,所以有一个 "committing" 的钩子只是结果成功完成捆绑包 (WriteOperation.finalize)。

总结: 使用 SourceSink API 开发连接器可帮助您以一种在分布式环境中运行良好的方式构建代码处理设置和源 API 使您可以访问框架的高级功能。但是,如果您的连接器非常简单,两者都不需要,那么您可以自由地 assemble 来自其他 built-in 转换的连接器。

问:假设我决定使用SourceSink。那么我如何将我的连接器打包为一个库:我应该只提供 SourceSink class,还是应该将它包装成 PTransform?

答:您的连接器最终应打包为 PTransform,以便用户可以 p.apply() 在他们的管道中使用它。但是,在幕后,您的转换可以使用 SourceSink classes.

一个常见的模式是公开 SourceSink classes,利用 Fluent Builder 模式,让用户将它们包装成 Read.from()Write.to()自行改造,但这不是严格要求。