Apache Beam:刷新我正在使用 MongoDbIO.read() 从 MongoDB 读取的 sideinput 第 2 部分

Apache Beam : Refreshing a sideinput which i am reading from the MongoDB using MongoDbIO.read() Part 2

不确定这个 GenerateSequence 如何为我工作,因为我必须每小时或每天定期从 Mongo 读取值,创建了一个读取 MongoDB 的 ParDo,还添加了 window 使用触发器进入 GlobalWindows(触发器我将根据 pr 要求更新)。但是下面的代码片段给出了 return 类型错误所以你能帮我更正下面的代码行吗?还可以找到错误的快照。另外,这个生成序列对我的情况有何帮助?

PCollectionView<List<String>> list_of_vins = pipeline
                  .apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(5))) // adjust polling rate
                  .apply(ParDo.of(new DoFn<Long, List<String>>() {
                    @ProcessElement
                    public void process(ProcessContext c) {
                      // Read entire DB, and output as a List<String>
                        final String uriString = "mongodb://$[username]:$[password]@$[hostlist]/$[database]?authSource=$[authSource]";
                        MongoClient mongoClient = MongoClients.create(uriString);
                        MongoDatabase mongoDB = mongoClient.getDatabase(options.getMongoDBHostName());
                        MongoCollection<Document> mongoCollection = mongoDB.getCollection(options.getMongoVinCollectionName());
                        c.output((List<String>) ((View) mongoCollection).asList());
                    }
                  })
                  .apply(Window.into(new GlobalWindows()).triggering(AfterPane.elementCountAtLeast(1))));

您需要像这样在 Window 转换上指定类型:

.apply(Window.<List<String>>into(...));

@danielm 和所有,

我已经更新了我的代码,它似乎可以正常工作,但问题很少,需要澄清才能继续,

PCollection<String> list_of_vins_1 = pipeline
            // Generate a tick every 15 seconds
            .apply("Ticker", GenerateSequence.from(0).withRate(1, Duration.standardMinutes(2)))
            // Just to check if individual ticks are being generated once every day
            .apply("Read Data from Mongo DB",ParDo.of(new DoFn<Long, Document>() {
                    @ProcessElement
                    public void processElement(@Element Long tick, OutputReceiver<Document> out) {
                            // reading values from Mongo DB
                            out.output(mongoDocuments);
                        }
                    }
                }
            )).apply("Window", Window.<Document>into(new GlobalWindows()).triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())
            .apply(ParDo.of(new ConvertDocuemntToStringFn()));

// convert to mongodata to list of string
PCollectionView<List<String>> list_of_data_1 = list_of_vins_1.apply(View.<String> asList());

我能够根据提到的 Ticker Duration 从 mongo db 读取值,但我不确定这会增加我的 sideinput 大小。就像我将此 list_of_data_1 作为侧输入传递一样,在管道中它显示添加的元素数量在增加。

假设如果 mongo db 有 20000 个集合,并且如果此代码每 2 分钟运行一次,则添加的元素数将为 20000 乘以代码运行的次数,即 20,000 + 20,0000 + 20,000 + 。 .... 等等。

所以我的问题是每次在 Side inputs 中添加元素时,sideinput 是否刷新并且 sideinput 总是有 20,000 个值,或者 MongoDB 有什么,它是追加还是覆盖?