访问 ParDo 中的 sideinput
Access sideinput inside ParDo
我是 apache beam 的新手,我正在对我们的一个用例使用 sideinput 进行一些调查。下面是代码。
PipelineOptions options =
PipelineOptionsFactory.fromArgs().as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
final List<String> sideInput = Arrays.asList("1", "2", "3", "4");
final List<String> input = Arrays.asList("a", "b", "c", "d");
PCollectionView<List<String>> sideinput =
pipeline.apply("readInput", Create.of(sideInput)).apply(View.asList());
pipeline.apply("read", Create.of(input))
.apply("process", ParDo.of(new DoFn<String, String>() {
@ProcessElement public void process(ProcessContext pc) {
System.out.println("processing element:" + pc.element());
List<String> list = pc.sideInput(sideinput);
for (String element : list) {
System.out.print(element);
}
System.out.println("");
}
}).withSideInputs(sideinput));
pipeline.run();
我期待它在每个元素之后打印出所有的 sideinput 元素,例如
processing element:d
1234
processing element:c
1234
processing element:a
1234
processing element:b
1234
但是每次的结果都不一样:
processing element:d
processing element:a
processing element:c
processing element:b
44441113312
2
32
32
或者
processing element:c
processing element:d
processing element:b
processing element:a
444422233211
31
31
这是意料之中的事情,因为在分布式环境中,无法保证输入元素的处理顺序和聚合系统输出的顺序。您可能想要连接主要元素和辅助输入元素,并一次性将其写出以获得您期望的输出。
我是 apache beam 的新手,我正在对我们的一个用例使用 sideinput 进行一些调查。下面是代码。
PipelineOptions options =
PipelineOptionsFactory.fromArgs().as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
final List<String> sideInput = Arrays.asList("1", "2", "3", "4");
final List<String> input = Arrays.asList("a", "b", "c", "d");
PCollectionView<List<String>> sideinput =
pipeline.apply("readInput", Create.of(sideInput)).apply(View.asList());
pipeline.apply("read", Create.of(input))
.apply("process", ParDo.of(new DoFn<String, String>() {
@ProcessElement public void process(ProcessContext pc) {
System.out.println("processing element:" + pc.element());
List<String> list = pc.sideInput(sideinput);
for (String element : list) {
System.out.print(element);
}
System.out.println("");
}
}).withSideInputs(sideinput));
pipeline.run();
我期待它在每个元素之后打印出所有的 sideinput 元素,例如
processing element:d
1234
processing element:c
1234
processing element:a
1234
processing element:b
1234
但是每次的结果都不一样:
processing element:d
processing element:a
processing element:c
processing element:b
44441113312
2
32
32
或者
processing element:c
processing element:d
processing element:b
processing element:a
444422233211
31
31
这是意料之中的事情,因为在分布式环境中,无法保证输入元素的处理顺序和聚合系统输出的顺序。您可能想要连接主要元素和辅助输入元素,并一次性将其写出以获得您期望的输出。