如何使用云存储的 pubsub 通知来触发数据流管道

How to use pubsub notifications for cloud storage to trigger dataflow pipeline

我正在尝试将 Google Cloud Dataflow 管道与 Google Cloud Pub/Sub Notifications for Google Cloud Storage 集成。 这个想法是在创建文件后立即开始处理文件。 正在发布消息,并且使用 PubsubIO.readMessagesWithAttributes() 源我设法提取文件 URI:

Pipeline p = Pipeline.create(options);
PCollection<String> uris = p.apply(PubsubIO.readMessagesWithAttributes()
            .withTimestampAttribute(PUBSUB_TIMESTAMP_LABEL_KEY)
            .fromSubscription(options.getPubsubSubscription()))
            .apply(MapElements
                    .into(TypeDescriptors.strings())
                    .via((PubsubMessage msg) -> {
                        String bucket = msg.getAttribute("bucketId");
                        String object = msg.getAttribute("objectId");
                        GcsPath uri = GcsPath.fromComponents(bucket, object);
                        return uri.toString();
                    }));

哪个PTransform可以用来启动reading/processinguris PCollection中的每个文件?

将云存储更改通知与 Google Cloud Functions 相结合应该是一个不错的选择(尽管仍处于测试阶段)。

使用 Cloud Functions,您可以使用一些 Javascript 代码启动数据流作业。这是一个非常好的 blogpost 应该让你上路。只要新文件进入存储桶或文件发生更改,您的数据流作业就会启动,并将处理这些文件。

如果您想坚持自己的方法,您可能需要使用 Google Cloud Storage Java SDK to read the files in a custom DoFn。不过不确定这种方法是否更可取。

HEAD 的 Apache Beam 包含一个 PTransform,它完全符合您的要求:TextIO.readAll() 读取 PCollection<String> 文件模式或文件名。它将在 Beam 2.2.0 中可用,但现在您可以自己从 github 存储库构建 Beam 的快照并依赖它。