如何使用云存储的 pubsub 通知来触发数据流管道
How to use pubsub notifications for cloud storage to trigger dataflow pipeline
我正在尝试将 Google Cloud Dataflow 管道与 Google Cloud Pub/Sub Notifications for Google Cloud Storage 集成。
这个想法是在创建文件后立即开始处理文件。
正在发布消息,并且使用 PubsubIO.readMessagesWithAttributes()
源我设法提取文件 URI:
Pipeline p = Pipeline.create(options);
PCollection<String> uris = p.apply(PubsubIO.readMessagesWithAttributes()
.withTimestampAttribute(PUBSUB_TIMESTAMP_LABEL_KEY)
.fromSubscription(options.getPubsubSubscription()))
.apply(MapElements
.into(TypeDescriptors.strings())
.via((PubsubMessage msg) -> {
String bucket = msg.getAttribute("bucketId");
String object = msg.getAttribute("objectId");
GcsPath uri = GcsPath.fromComponents(bucket, object);
return uri.toString();
}));
哪个PTransform
可以用来启动reading/processinguris PCollection
中的每个文件?
将云存储更改通知与 Google Cloud Functions 相结合应该是一个不错的选择(尽管仍处于测试阶段)。
使用 Cloud Functions,您可以使用一些 Javascript 代码启动数据流作业。这是一个非常好的 blogpost 应该让你上路。只要新文件进入存储桶或文件发生更改,您的数据流作业就会启动,并将处理这些文件。
如果您想坚持自己的方法,您可能需要使用 Google Cloud Storage Java SDK to read the files in a custom DoFn。不过不确定这种方法是否更可取。
HEAD 的 Apache Beam 包含一个 PTransform,它完全符合您的要求:TextIO.readAll() 读取 PCollection<String>
文件模式或文件名。它将在 Beam 2.2.0 中可用,但现在您可以自己从 github 存储库构建 Beam 的快照并依赖它。
我正在尝试将 Google Cloud Dataflow 管道与 Google Cloud Pub/Sub Notifications for Google Cloud Storage 集成。
这个想法是在创建文件后立即开始处理文件。
正在发布消息,并且使用 PubsubIO.readMessagesWithAttributes()
源我设法提取文件 URI:
Pipeline p = Pipeline.create(options);
PCollection<String> uris = p.apply(PubsubIO.readMessagesWithAttributes()
.withTimestampAttribute(PUBSUB_TIMESTAMP_LABEL_KEY)
.fromSubscription(options.getPubsubSubscription()))
.apply(MapElements
.into(TypeDescriptors.strings())
.via((PubsubMessage msg) -> {
String bucket = msg.getAttribute("bucketId");
String object = msg.getAttribute("objectId");
GcsPath uri = GcsPath.fromComponents(bucket, object);
return uri.toString();
}));
哪个PTransform
可以用来启动reading/processinguris PCollection
中的每个文件?
将云存储更改通知与 Google Cloud Functions 相结合应该是一个不错的选择(尽管仍处于测试阶段)。
使用 Cloud Functions,您可以使用一些 Javascript 代码启动数据流作业。这是一个非常好的 blogpost 应该让你上路。只要新文件进入存储桶或文件发生更改,您的数据流作业就会启动,并将处理这些文件。
如果您想坚持自己的方法,您可能需要使用 Google Cloud Storage Java SDK to read the files in a custom DoFn。不过不确定这种方法是否更可取。
HEAD 的 Apache Beam 包含一个 PTransform,它完全符合您的要求:TextIO.readAll() 读取 PCollection<String>
文件模式或文件名。它将在 Beam 2.2.0 中可用,但现在您可以自己从 github 存储库构建 Beam 的快照并依赖它。