价值 SingletonPCollectionView 的未知生产者
Unknown producer for value SingletonPCollectionView
为了提供我的问题的最小示例,我正在尝试实现一个简单的 Beam 作业,该作业将 String
作为辅助输入并将其应用于 PCollection
这是从 Cloud Storage 中的 csv 文件中读取的。然后将结果输出到 Cloud Storage 中的 .txt 文件。
到目前为止,我已尝试:尝试使用 PipelineResult.waitUntilFinish
(如 (p.run().waitUntilFinish()
),改变两个 p.run()
命令的位置,并尽可能简化只是使用一个字符串作为我的辅助输入,总是得到相同的结果。在 Stack 和 Google 上搜索只是让我找到了实现错误消息的 Beam 存储库上的 PR。
SideInputTest.java:
public class SideInputTest {
public static void main(String[] arg) throws IOException {
// Build a pipeline to read in string
DataflowPipelineOptions options1 = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options1.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options1);
// Build really simple side input
PCollectionView<String> sideInputView = p.apply(Create.of("foo"))
.apply(View.<String>asSingleton());
// Run p
p.run();
// Build main pipeline to read csv data
DataflowPipelineOptions options2 = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options2.setProject(PROJECT_NAME);
options2.setStagingLocation(STAGING_LOCATION);
options2.setRunner(DataflowRunner.class);
Pipeline p2 = Pipeline.create(options2);
p2.apply(TextIO.Read.from(INPUT_DATA))
.apply(ParDo.withSideInputs(sideInputView).of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String[] rowData = c.element().split(",");
String sideInput = c.sideInput(sideInputView);
c.output(rowData[0] + sideInput);
}
}))
.apply(TextIO.Write
.to(OUTPUT_DATA));
p2.run();
}
}
完整堆栈跟踪:
Caused by: java.lang.NullPointerException: Unknown producer for value SingletonPCollectionView{tag=Tag<org.apache.beam.sdk.util.PCollectionViews$SimplePCollectionView.<init>:435#3d93cb799b3970be>} while translating step ParDo(Anonymous)
at org.apache.beam.runners.dataflow.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:1079)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.getProducer(DataflowPipelineTranslator.java:508)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateSideInputs(DataflowPipelineTranslator.java:926)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateInputs(DataflowPipelineTranslator.java:913)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.access00(DataflowPipelineTranslator.java:112)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateSingleHelper(DataflowPipelineTranslator.java:863)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:856)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:853)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:415)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:486)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access0(TransformHierarchy.java:231)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:206)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:321)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:365)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:154)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:514)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:151)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
at com.xpw.SideInputTest.main(SideInputTest.java:63)
目前正在使用 org.apache.beam
个包 @0.6.0
此代码采用在一个管道 (p.apply(Create.of("foo")).apply(View.<String>asSingleton());
) 中创建的 PCollectionView
并在另一个管道 (p2
) 中使用它。
PCollection
和 PCollectionView
属于特定管道,不支持在不同管道中重复使用它们。
您可以在 p2
中创建一个类似的 PCollectionView
。
我也对您的管道 p
试图完成的事情感到困惑:它唯一的转换是创建视图?..所以其中没有数据正在处理。我认为你应该完全摆脱 p
并只使用 p2
.
为了提供我的问题的最小示例,我正在尝试实现一个简单的 Beam 作业,该作业将 String
作为辅助输入并将其应用于 PCollection
这是从 Cloud Storage 中的 csv 文件中读取的。然后将结果输出到 Cloud Storage 中的 .txt 文件。
到目前为止,我已尝试:尝试使用 PipelineResult.waitUntilFinish
(如 (p.run().waitUntilFinish()
),改变两个 p.run()
命令的位置,并尽可能简化只是使用一个字符串作为我的辅助输入,总是得到相同的结果。在 Stack 和 Google 上搜索只是让我找到了实现错误消息的 Beam 存储库上的 PR。
SideInputTest.java:
public class SideInputTest {
public static void main(String[] arg) throws IOException {
// Build a pipeline to read in string
DataflowPipelineOptions options1 = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options1.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options1);
// Build really simple side input
PCollectionView<String> sideInputView = p.apply(Create.of("foo"))
.apply(View.<String>asSingleton());
// Run p
p.run();
// Build main pipeline to read csv data
DataflowPipelineOptions options2 = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options2.setProject(PROJECT_NAME);
options2.setStagingLocation(STAGING_LOCATION);
options2.setRunner(DataflowRunner.class);
Pipeline p2 = Pipeline.create(options2);
p2.apply(TextIO.Read.from(INPUT_DATA))
.apply(ParDo.withSideInputs(sideInputView).of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String[] rowData = c.element().split(",");
String sideInput = c.sideInput(sideInputView);
c.output(rowData[0] + sideInput);
}
}))
.apply(TextIO.Write
.to(OUTPUT_DATA));
p2.run();
}
}
完整堆栈跟踪:
Caused by: java.lang.NullPointerException: Unknown producer for value SingletonPCollectionView{tag=Tag<org.apache.beam.sdk.util.PCollectionViews$SimplePCollectionView.<init>:435#3d93cb799b3970be>} while translating step ParDo(Anonymous)
at org.apache.beam.runners.dataflow.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:1079)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.getProducer(DataflowPipelineTranslator.java:508)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateSideInputs(DataflowPipelineTranslator.java:926)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateInputs(DataflowPipelineTranslator.java:913)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.access00(DataflowPipelineTranslator.java:112)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateSingleHelper(DataflowPipelineTranslator.java:863)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:856)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:853)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:415)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:486)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access0(TransformHierarchy.java:231)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:206)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:321)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:365)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:154)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:514)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:151)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
at com.xpw.SideInputTest.main(SideInputTest.java:63)
目前正在使用 org.apache.beam
个包 @0.6.0
此代码采用在一个管道 (p.apply(Create.of("foo")).apply(View.<String>asSingleton());
) 中创建的 PCollectionView
并在另一个管道 (p2
) 中使用它。
PCollection
和 PCollectionView
属于特定管道,不支持在不同管道中重复使用它们。
您可以在 p2
中创建一个类似的 PCollectionView
。
我也对您的管道 p
试图完成的事情感到困惑:它唯一的转换是创建视图?..所以其中没有数据正在处理。我认为你应该完全摆脱 p
并只使用 p2
.