上传 gcs 存储桶中的所有必要文件时,apache beam 触发

apache beam trigger when all necessary files in gcs bucket is uploaded

我是 beam 的新手,所以整个触发的东西真的让我很困惑。 我有定期上传到 gcs 的文件,路径看起来像这样:node-<num>/<table_name>/<timestamp>/files_parts 我需要写一些东西,当文件的所有 8 个部分都存在时会触发。

他们的名字是这样的:file_1_part_1, file_1_part_2, file_2_part_1, file_2_part_2 (同一个目录中可能有多个文件部分,但如果有问题我可以要求更改)。

有什么方法可以创建这个触发器吗?如果不是,你建议我可以做什么?

谢谢!

如果您使用的是 Java SDK,则可以使用转换 Watch 来实现此目的。不过,我在 Python SDK 中没有看到对应项。

我觉得写个程序轮询GCS目录下的文件比较好。当文件的 8 个部分可用时,publish a message containing the file name to Pub/Sub 或类似产品。

然后在您的 Beam 管道中,使用 Pub/Sub 主题作为 streaming 源来执行您的 ETL。