价值 SingletonPCollectionView 的未知生产者

Unknown producer for value SingletonPCollectionView

为了提供我的问题的最小示例,我正在尝试实现一个简单的 Beam 作业,该作业将 String 作为辅助输入并将其应用于 PCollection这是从 Cloud Storage 中的 csv 文件中读取的。然后将结果输出到 Cloud Storage 中的 .txt 文件。

到目前为止,我已尝试:尝试使用 PipelineResult.waitUntilFinish(如 (p.run().waitUntilFinish()),改变两个 p.run() 命令的位置,并尽可能简化只是使用一个字符串作为我的辅助输入,总是得到相同的结果。在 Stack 和 Google 上搜索只是让我找到了实现错误消息的 Beam 存储库上的 PR。

SideInputTest.java:

public class SideInputTest {

    public static void main(String[] arg) throws IOException {

        // Build a pipeline to read in string
        DataflowPipelineOptions options1 = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options1.setRunner(DataflowRunner.class);
        Pipeline p = Pipeline.create(options1);

        // Build really simple side input
        PCollectionView<String> sideInputView = p.apply(Create.of("foo"))
            .apply(View.<String>asSingleton());

        // Run p
        p.run();

        // Build main pipeline to read csv data
        DataflowPipelineOptions options2 = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options2.setProject(PROJECT_NAME);
        options2.setStagingLocation(STAGING_LOCATION);
        options2.setRunner(DataflowRunner.class);
        Pipeline p2 = Pipeline.create(options2);

        p2.apply(TextIO.Read.from(INPUT_DATA))
            .apply(ParDo.withSideInputs(sideInputView).of(new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    String[] rowData = c.element().split(",");
                    String sideInput = c.sideInput(sideInputView);

                    c.output(rowData[0] + sideInput);
                }
            }))
            .apply(TextIO.Write
                .to(OUTPUT_DATA));

        p2.run();

    }
}

完整堆栈跟踪:

Caused by: java.lang.NullPointerException: Unknown producer for value SingletonPCollectionView{tag=Tag<org.apache.beam.sdk.util.PCollectionViews$SimplePCollectionView.<init>:435#3d93cb799b3970be>} while translating step ParDo(Anonymous)
    at org.apache.beam.runners.dataflow.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:1079)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.getProducer(DataflowPipelineTranslator.java:508)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateSideInputs(DataflowPipelineTranslator.java:926)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateInputs(DataflowPipelineTranslator.java:913)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.access00(DataflowPipelineTranslator.java:112)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateSingleHelper(DataflowPipelineTranslator.java:863)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:856)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:853)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:415)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:486)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access0(TransformHierarchy.java:231)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:206)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:321)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:365)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:154)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:514)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:151)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
    at com.xpw.SideInputTest.main(SideInputTest.java:63)

目前正在使用 org.apache.beam 个包 @0.6.0

此代码采用在一个管道 (p.apply(Create.of("foo")).apply(View.<String>asSingleton()‌​);) 中创建的 PCollectionView 并在另一个管道 (p2) 中使用它。

PCollectionPCollectionView 属于特定管道,不支持在不同管道中重复使用它们。

您可以在 p2 中创建一个类似的 PCollectionView

我也对您的管道 p 试图完成的事情感到困惑:它唯一的转换是创建视图?..所以其中没有数据正在处理。我认为你应该完全摆脱 p 并只使用 p2.