如何在每个 window 上 refresh/reload 侧输入
How to refresh/reload side input on every window
我正在使用 Apache Beam 加入多个流以及一些查找。我有 2 种情况,如果查找大小很大,我希望对每个记录处理进行 reload/refresh 的侧输入(即我将使用 where 子句查询数据库),如果查找大小较小,则 reload/refresh每天一次。
我想知道正确的做法是什么。我不希望巨大的数据端输入吃掉所有工人的内存。
我使用下面的代码每天刷新一次侧边输入。
PCollectionView<Map<String, String>> lkp =
p.apply(GenerateSequence.from(0)).withRate(1, Duration.standardDays(1))
.apply(
Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(
ParDo.of(
new DoFn<Long, Map<String, String>>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void process(
@Element Long input, OutputReceiver<Map<String, String>> o) {
Map<String, String> map = HiveConnection.getHiveConnection("select * from table");
o.output(map);
}
}))
.apply(View.<Map<String, String>>asSingleton());
请指导我完成这些类型用例的最佳实践,并提供一些示例代码以便我更好地理解。
谢谢,
高瑟姆
您正在使用正确的推荐模式进行小型每日查询。
在大的情况下,DoFn 的标注通常不是使用 SideInput,而是推荐的模式。这个旧博客包含模式 "Calling external services for data enrichment".
的示例
Guide to common Cloud Dataflow use-case patterns, Part 1
我会尽量找时间将此模式添加到 Beam 模式页面:
我正在使用 Apache Beam 加入多个流以及一些查找。我有 2 种情况,如果查找大小很大,我希望对每个记录处理进行 reload/refresh 的侧输入(即我将使用 where 子句查询数据库),如果查找大小较小,则 reload/refresh每天一次。
我想知道正确的做法是什么。我不希望巨大的数据端输入吃掉所有工人的内存。
我使用下面的代码每天刷新一次侧边输入。
PCollectionView<Map<String, String>> lkp =
p.apply(GenerateSequence.from(0)).withRate(1, Duration.standardDays(1))
.apply(
Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(
ParDo.of(
new DoFn<Long, Map<String, String>>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void process(
@Element Long input, OutputReceiver<Map<String, String>> o) {
Map<String, String> map = HiveConnection.getHiveConnection("select * from table");
o.output(map);
}
}))
.apply(View.<Map<String, String>>asSingleton());
请指导我完成这些类型用例的最佳实践,并提供一些示例代码以便我更好地理解。
谢谢, 高瑟姆
您正在使用正确的推荐模式进行小型每日查询。
在大的情况下,DoFn 的标注通常不是使用 SideInput,而是推荐的模式。这个旧博客包含模式 "Calling external services for data enrichment".
的示例Guide to common Cloud Dataflow use-case patterns, Part 1
我会尽量找时间将此模式添加到 Beam 模式页面: