使用自动生成的 ID 从 Beam 添加文档到 Firestore

Add document to Firestore from Beam with auto generated ID

我想将 Apache Beam Java 与最近发布的 Firestore 连接器一起使用,以将新文档添加到 Firestore 集合。虽然我认为这应该是一项相对容易的任务,但创建 com.google.firestore.v1.Document 对象的需要似乎使事情变得有点困难。我使用 this blog post on Using Firestore and Apache Beam for data processing 作为起点。

我实际上只想写一个简单的转换,将 MyClass 对象映射到 Firestore 文档,然后将其添加到 Firestore 集合。

我现在得到的是光束 SimpleFunction,它将 MyClass 个对象映射到 Documents:

public static class Mapper extends SimpleFunction<MyClass, Document> {

    @Override
    public Document apply(final MyClass record) {
      final String project = "my-project";
      final String database = "(default)";
      final String collection = "my-collection";
      final String documentId = someUnecessaryIdComputation();
      return Document
          .newBuilder()
          .setName("projects/" + project + "/databases/" + database + "/documents/" + collection
              + "/" + documentId)
          .putFields("key",
              Value.newBuilder().setStringValue(record.getValue()).build())
          // ...
          .build();
    }

  }

和一个 DoFn 将这些 Document 转换为 Write 个配置了 update 的对象(可能也可以简化为 SimpleFunction 但被复制了来自博客 post):

private static final class CreateUpdateOperation extends DoFn<Document, Write> {

    @ProcessElement
    public void processElement(ProcessContext c) {
      final Write write = Write.newBuilder()
          .setUpdate(c.element())
          .build();
      c.output(write);
    }
  }

我在我的管道中使用这两个函数如下:

pipeline.apply(MapElements.via(new Mapper()))
  .apply(ParDo.of(new CreateUpdateOperation()))
  .apply(FirestoreIO.v1().write().batchWrite().build());

这里的主要缺点是:

有没有什么方法可以使用 Firestore 连接器添加文档而无需显式设置文档 ID、项目 ID 和数据库?

我同意,这不是最方便的 API(目前我没有看到更好的)。它似乎是为修改现有文档而不是创建新文档而设计的。

我认为进行 higher-level 转换是有意义的;我提交了 https://issues.apache.org/jira/browse/BEAM-13994 。与此同时,你可以做类似

的事情
class FirestoreWrite extends PTransform<PCollection<Map<String, Object>>, PDone> {
  private String projectId;  // Auto-infer from environment
  private String database = "(defaut)";
  private String collection;
  
  public PDone expand(PCollection<Map<String, Object>> data) {
    return data
        .apply(ParDo.of(new DoFn() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            builder = Document
                .newBuilder()
                .setName("projects/" + projectId + "/databases/" + database + "/documents/" + collection + "/" + randomDocumentId());
            // [loop over data setting values from c.element()]
            c.output(builder.build());
          }
        }))
        .apply(new CreateUpdateOperation())
        .apply(FirestoreIO.v1().write().batchWrite().build());
  }
}

通常 re-usable 并且可能值得为 Beam 做出贡献。