访问 ParDo 中的 sideinput

Access sideinput inside ParDo

我是 apache beam 的新手,我正在对我们的一个用例使用 sideinput 进行一些调查。下面是代码。

PipelineOptions options =
            PipelineOptionsFactory.fromArgs().as(PipelineOptions.class);
    Pipeline pipeline = Pipeline.create(options);
    final List<String> sideInput = Arrays.asList("1", "2", "3", "4");
    final List<String> input = Arrays.asList("a", "b", "c", "d");
    PCollectionView<List<String>> sideinput =
            pipeline.apply("readInput", Create.of(sideInput)).apply(View.asList());
    pipeline.apply("read", Create.of(input))
            .apply("process", ParDo.of(new DoFn<String, String>() {
                @ProcessElement public void process(ProcessContext pc) {
                    System.out.println("processing element:" + pc.element());
                    List<String> list = pc.sideInput(sideinput);
                    for (String element : list) {
                        System.out.print(element);
                    }
                    System.out.println("");
                }
            }).withSideInputs(sideinput));
    pipeline.run();

我期待它在每个元素之后打印出所有的 sideinput 元素,例如

processing element:d
1234
processing element:c
1234
processing element:a
1234
processing element:b
1234

但是每次的结果都不一样:

processing element:d
processing element:a
processing element:c
processing element:b
44441113312
2
32
32

或者

processing element:c
processing element:d
processing element:b
processing element:a
444422233211
31

31

这是意料之中的事情,因为在分布式环境中,无法保证输入元素的处理顺序和聚合系统输出的顺序。您可能想要连接主要元素和辅助输入元素,并一次性将其写出以获得您期望的输出。