Apache Beam:刷新我正在使用 MongoDbIO.read() 从 MongoDB 读取的 sideinput 第 2 部分
Apache Beam : Refreshing a sideinput which i am reading from the MongoDB using MongoDbIO.read() Part 2
不确定这个 GenerateSequence 如何为我工作,因为我必须每小时或每天定期从 Mongo 读取值,创建了一个读取 MongoDB 的 ParDo,还添加了 window 使用触发器进入 GlobalWindows(触发器我将根据 pr 要求更新)。但是下面的代码片段给出了 return 类型错误所以你能帮我更正下面的代码行吗?还可以找到错误的快照。另外,这个生成序列对我的情况有何帮助?
PCollectionView<List<String>> list_of_vins = pipeline
.apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(5))) // adjust polling rate
.apply(ParDo.of(new DoFn<Long, List<String>>() {
@ProcessElement
public void process(ProcessContext c) {
// Read entire DB, and output as a List<String>
final String uriString = "mongodb://$[username]:$[password]@$[hostlist]/$[database]?authSource=$[authSource]";
MongoClient mongoClient = MongoClients.create(uriString);
MongoDatabase mongoDB = mongoClient.getDatabase(options.getMongoDBHostName());
MongoCollection<Document> mongoCollection = mongoDB.getCollection(options.getMongoVinCollectionName());
c.output((List<String>) ((View) mongoCollection).asList());
}
})
.apply(Window.into(new GlobalWindows()).triggering(AfterPane.elementCountAtLeast(1))));
您需要像这样在 Window 转换上指定类型:
.apply(Window.<List<String>>into(...));
@danielm 和所有,
我已经更新了我的代码,它似乎可以正常工作,但问题很少,需要澄清才能继续,
PCollection<String> list_of_vins_1 = pipeline
// Generate a tick every 15 seconds
.apply("Ticker", GenerateSequence.from(0).withRate(1, Duration.standardMinutes(2)))
// Just to check if individual ticks are being generated once every day
.apply("Read Data from Mongo DB",ParDo.of(new DoFn<Long, Document>() {
@ProcessElement
public void processElement(@Element Long tick, OutputReceiver<Document> out) {
// reading values from Mongo DB
out.output(mongoDocuments);
}
}
}
)).apply("Window", Window.<Document>into(new GlobalWindows()).triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())
.apply(ParDo.of(new ConvertDocuemntToStringFn()));
// convert to mongodata to list of string
PCollectionView<List<String>> list_of_data_1 = list_of_vins_1.apply(View.<String> asList());
我能够根据提到的 Ticker Duration 从 mongo db 读取值,但我不确定这会增加我的 sideinput 大小。就像我将此 list_of_data_1 作为侧输入传递一样,在管道中它显示添加的元素数量在增加。
假设如果 mongo db 有 20000 个集合,并且如果此代码每 2 分钟运行一次,则添加的元素数将为 20000 乘以代码运行的次数,即 20,000 + 20,0000 + 20,000 + 。 .... 等等。
所以我的问题是每次在 Side inputs 中添加元素时,sideinput 是否刷新并且 sideinput 总是有 20,000 个值,或者 MongoDB 有什么,它是追加还是覆盖?
不确定这个 GenerateSequence 如何为我工作,因为我必须每小时或每天定期从 Mongo 读取值,创建了一个读取 MongoDB 的 ParDo,还添加了 window 使用触发器进入 GlobalWindows(触发器我将根据 pr 要求更新)。但是下面的代码片段给出了 return 类型错误所以你能帮我更正下面的代码行吗?还可以找到错误的快照。另外,这个生成序列对我的情况有何帮助?
PCollectionView<List<String>> list_of_vins = pipeline
.apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(5))) // adjust polling rate
.apply(ParDo.of(new DoFn<Long, List<String>>() {
@ProcessElement
public void process(ProcessContext c) {
// Read entire DB, and output as a List<String>
final String uriString = "mongodb://$[username]:$[password]@$[hostlist]/$[database]?authSource=$[authSource]";
MongoClient mongoClient = MongoClients.create(uriString);
MongoDatabase mongoDB = mongoClient.getDatabase(options.getMongoDBHostName());
MongoCollection<Document> mongoCollection = mongoDB.getCollection(options.getMongoVinCollectionName());
c.output((List<String>) ((View) mongoCollection).asList());
}
})
.apply(Window.into(new GlobalWindows()).triggering(AfterPane.elementCountAtLeast(1))));
您需要像这样在 Window 转换上指定类型:
.apply(Window.<List<String>>into(...));
@danielm 和所有,
我已经更新了我的代码,它似乎可以正常工作,但问题很少,需要澄清才能继续,
PCollection<String> list_of_vins_1 = pipeline
// Generate a tick every 15 seconds
.apply("Ticker", GenerateSequence.from(0).withRate(1, Duration.standardMinutes(2)))
// Just to check if individual ticks are being generated once every day
.apply("Read Data from Mongo DB",ParDo.of(new DoFn<Long, Document>() {
@ProcessElement
public void processElement(@Element Long tick, OutputReceiver<Document> out) {
// reading values from Mongo DB
out.output(mongoDocuments);
}
}
}
)).apply("Window", Window.<Document>into(new GlobalWindows()).triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())
.apply(ParDo.of(new ConvertDocuemntToStringFn()));
// convert to mongodata to list of string
PCollectionView<List<String>> list_of_data_1 = list_of_vins_1.apply(View.<String> asList());
我能够根据提到的 Ticker Duration 从 mongo db 读取值,但我不确定这会增加我的 sideinput 大小。就像我将此 list_of_data_1 作为侧输入传递一样,在管道中它显示添加的元素数量在增加。
假设如果 mongo db 有 20000 个集合,并且如果此代码每 2 分钟运行一次,则添加的元素数将为 20000 乘以代码运行的次数,即 20,000 + 20,0000 + 20,000 + 。 .... 等等。
所以我的问题是每次在 Side inputs 中添加元素时,sideinput 是否刷新并且 sideinput 总是有 20,000 个值,或者 MongoDB 有什么,它是追加还是覆盖?