在 DataStoreIO.Write 之后链接另一个转换

Chaining another transform after DataStoreIO.Write

我正在创建一个 Google 数据流管道,使用 Apache Beam Java SDK。我在那里进行了一些转换,最后创建了一个实体集合 (PCollection< Entity >)。我需要将其写入 Google DataStore,然后在写入所有实体后执行另一次转换。 (例如通过 PubSub 消息向多个订阅者广播已保存对象的 ID)。

现在,存储 PCollection 的方法是: entities.DatastoreIO.v1().write().withProjectId("abc")

这 return 是一个 PDone 对象,我不确定如何在 Write() 完成后链接另一个转换。由于 DatastoreIO.write() 调用不会 return PCollection,因此我无法进一步推进管道。我有 2 个问题:

  1. 如何获取写入数据存储的对象的 ID?

  2. 如何附加另一个将在保存所有实体后起作用的转换?

我们没有很好的方法来做这些事情(返回写入的数据存储实体的 ID,或等待实体被写入),尽管这与第一个类似的请求相去甚远(人们已经要求例如,这适用于 BigQuery),我们正在考虑。

现在你唯一的选择是等到整个管道完成,例如通过 pipeline.run().waitUntilFinish(),然后在主程序中执行您想要的操作(例如,您可以 运行 另一个管道)。