Apache Beam:刷新我正在使用 MongoDbIO.read() 从 MongoDB 读取的 sideinput

Apache Beam : Refreshing a sideinput which i am reading from the MongoDB using MongoDbIO.read()

我正在从 MongoDB 中读取一个 PCollection mongo 数据,并将此 PCollection 用作我的 ParDo(DoFN).withSideInputs(PCollection)

的 sideInput

从后端我的 MongoDB collection 每天或每月或每年更新 。我需要在我的管道中增加新的价值。

我们可以将此视为刷新 运行 管道中的 mongo collection 值。例如 mongo collection 总共有 20K 个文档,一天后又添加了三个记录到 mongo collection 然后我需要在我的 pileine 中再添加三个值,即 20,003总共

目前我的管道是这样的。

PCollection<String> mongodata =  pipeline.apply(MongoDbIO.read()
                .withUri(options.getMongoDBHostName())
                .withDatabase(options.getMongoDBDatabaseName())
                .withCollection(options.getMongoVinCollectionName()))
                .apply(ParDo.of(new ConvertDocuemntToStringFn()));

PCollectionView<List<String>> list_of_data = mongodata.apply(View.<String> asList());

PCollection<PubsubMessage>  pubsubMessagePCollection = controller.flattenPubSubPCollection(
                controller.fetchDataFromBucket(options),pipeline);

pubsubMessagePCollection.apply("Convert pubsub to kv,k=vin",ParDo.of(new ConvertPubsubToKVFn()))
                .apply("group by vin key",GroupByKey.<String,String>create())
                .apply("converting message to document type",ParDo.of(
                        new ConvertMessageToDocumentTypeFn(list_of_data)).withSideInputs(list_of_data))
                .apply(MongoDbIO.write()
                .withUri(options.getMongoDBHostName())
                .withDatabase(options.getMongoDBDatabaseName())
                .withCollection(CollectionA));
pipeline.run();

我希望此mongo数据(list_of_data)根据从后端更新的要求进行刷新,而无需停止管道。

我尝试寻找 GenerateSequence 或触发的方法,但无法找到测试此方法的确切代码,请帮助并提供更新的代码,如果可以的话,可以通过添加合适的代码来解决我的查询。

如果需要更多信息,请告诉我。

谢谢

您需要使用 GenerateSequence 定期创建元素,有一个读取 MongoDB 的 ParDo,然后使用适当的触发器 window 进入 GlobalWindows。我认为您不能直接使用 MongoDbIO,因为它不支持像这样的管道中间的 运行。代码将类似于:

PCollectionView<List<String>> list_of_data = pipeline
  .apply(GenerateSequence.from(0).withRate(1, Duration.hours(24))) // adjust polling rate
  .apply(ParDo.of(new DoFn<Long, List<String>>() {
    @ProcessElement
    public void process(@Element long unused) {
      // Read entire DB, and output as a List<String>
    }
  })
  .apply(Window.into(new GlobalWindows()).triggering(AfterPane.elementCountAtLeast(1))));