python Dataflow DoFn 生命周期中的 beam setup() 刷新多长时间?

How long beam setup() refresh in python Dataflow DoFn life cycle?

我有一个流式传输管道,我需要从 BigQuery 中查询作为我的管道转换的参考。由于 BigQuery 表仅在 2 周内更改,我将查询缓存放在 setup() 而不是 start_bundle() 中。通过观察日志,我看到 start_bundle() 会在 DoFn 生命周期中刷新其值,大约 50-100 个元素进程,但 setup() 永远不会刷新。有什么办法可以解决这个问题吗?

虽然你没有提供代码,但我会根据你的解释回答你的问题。

First,关于 DoFn.start_bundle(),每个包都会调用此函数,DataFlow 根据期间收集的指标来决定这些包的大小执行。

Second,DoFn.setup() is called once per worker. It will be only called again if the worker is restarted. Moreover, as a comparison DoFn.processElement() 每个元素调用一次。

由于您需要每周刷新查询两次,因此它非常适合 SideInput using "Slowly-changing lookup cache"。当您查找不时更改的 table 时,您可以使用此方法。所以你需要更新查找的结果。但是,您可以使用流模式,而不是在批处理模式下使用单个查询。它允许您根据 GlobalWindow 更新查找结果(在您的情况下是查询的结果)。之后,有了这个辅助输入,您就可以在主流 PCollection 中使用它了。

注意: 我必须指出,作为一个限制,sideInputs 无法在处理大量数据(许多 Gb 或 Tb)时正常工作。此外, 非常有用。

上面的回答很好。作为替代方案,您可以在结果的缓存版本 start_bundle() returns 中调用方法,只要它足够新鲜,否则从 BQ 进行完整读取。看,例如