为什么#sideInput() 方法在 Dataflow beta 中从 Context 移动到 ProcessContext

Why did #sideInput() method move from Context to ProcessContext in Dataflow beta

我想知道为什么 #sideInput() 方法移动到 ProcessContext class? 以前我可以在 #startBundle() 方法中做一些额外的处理并缓存结果。 在 #processElement() 中这样做听起来效率较低。当然我可以在将数据传递给视图之前进行预处理,但是仍然存在为每个元素调用 #sideInput() 的开销...

谢谢, G

好问题。原因是我们添加了对 windowed PCollections 作为辅助输入的支持。这会启用其他场景,包括在流模式下使用带有无界 PCollections 的侧输入。

更改之前,我们仅支持全局 windowed 的侧输入,然后在处理主输入 PCollection 的每个元素时整个侧输入 PCollection 可用。这适用于传统批处理样式处理中的有界 PCollections,但没有扩展到 windowed 或无界 PCollections。

更改后,您在 ParDo 中处理的当前元素的 window 控制侧输入的哪个子集可见。 (因此您无法访问 startBundle() 中的侧输入,其中没有当前元素,因此没有当前 window。)

例如,考虑一个示例,其中您有一个流式传输管道来处理您的网站日志并向实时使用仪表板提供实时更新。您有两个无限制的输入 PCollections:一个包含新用户注册,另一个包含用户点击。您可以通过 window 按小时计算两个 PCollections 并对用户点击执行 ParDo 将新用户注册作为辅助输入来确定哪些用户点击来自新用户。现在,当您处理给定小时内的用户点击时,您会自动看到同一小时新用户注册的子集。您可以通过更改 windowing 函数并在侧边输入上及时向前移动元素时间戳来对此进行不同的变体——比如继续 window 用户每小时点击,但使用来自的新注册最近 24 小时。

我同意此更改使得在您的辅助输入上缓存任何 post 处理变得更加困难。我们添加了 View.asMultimap 来处理将 Iterable 转换为查找 table 的常见情况。如果您的 post-processing 是按元素处理的,则可以在创建 PCollectionView 之前使用 ParDo 进行处理。对于现在的任何其他事情,我建议从 processElement 中懒惰地进行。我很想听听发生的其他模式,因此我们可以研究如何提高它们的效率。