Beam - 分支 PCollections 时出错
Beam - Error while branching PCollections
我有一个从 kafka 读取数据的管道。它将传入数据拆分为处理和拒绝的输出。来自 Kafka 的数据被读入自定义 class MyData 并且输出生成为 KV
用 MyData 定义两个 TupleTag。
private static final TupleTag<MyData> rejectedTag = new TupleTag<DeserializationOutput>(){};
private static final TupleTag<MyData> processingTag = new TupleTag<DeserializationOutput>(){};
InvalidDataDoFn 具有将 MyData 数据拆分为处理并拒绝的应用程序逻辑
InvalidDataDoFn invalidDataDoFn = new InvalidDataDoFn(processingTag, rejectedTag);
PCollectionTuple mixedCollection = myCollection
.apply(ParDo.of(invalidDataDoFn).withOutputTags(processingTag, TupleTagList.of(rejectedTag)));
OutputDoFn outputDoFn = new outputDoFn();
PCollection<MyData> processingCollection = mixedCollection.get(processingTag);
PCollection<KV<byte[], byte[]>> outputCollection = processingCollection
.apply("ProcessElements", ParDo.of(outputDoFn));
OutputDoFn 将 MyData 转换为 KV。在 运行 OutputDoFn 时,我收到一个奇怪的错误,指出“传递给输出的标签不能为空”——这来自 https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L559
我的 OutputDoFn 具有以下逻辑。
@ProcessElement
public void processElement(@Element MyData mydata,
OutputReceiver<KV<byte[], byte[]>> output, ProcessContext c) {
c.output(KV.of(mydata.getMessageKey(), mydata.getSomething().getBytes()));
}
如果我错了请纠正我,但你想使用这个 c.output
:
public void output(OutputT output)
你很惊讶这个函数被使用了:
public <T> void output(TupleTag<T> tag, T output)
要让 Beam 使用第一个,您传递的参数必须具有在您 DoFn
创建时声明的 OutputT
类型:
private class DoFnProcessContext extends DoFn<InputT, OutputT>.ProcessContext
我的猜测是您传递给 c.output()
的值与您在创建 DoFn 时指定的类型不完全相同。因此,选择了第二个 output
函数,但它错过了标签。
您能否提供 OutputDoFn
的完整 DoFn 声明以确认?
所有代码参考来自 here。
我有一个从 kafka 读取数据的管道。它将传入数据拆分为处理和拒绝的输出。来自 Kafka 的数据被读入自定义 class MyData 并且输出生成为 KV
用 MyData 定义两个 TupleTag。
private static final TupleTag<MyData> rejectedTag = new TupleTag<DeserializationOutput>(){};
private static final TupleTag<MyData> processingTag = new TupleTag<DeserializationOutput>(){};
InvalidDataDoFn 具有将 MyData 数据拆分为处理并拒绝的应用程序逻辑
InvalidDataDoFn invalidDataDoFn = new InvalidDataDoFn(processingTag, rejectedTag);
PCollectionTuple mixedCollection = myCollection
.apply(ParDo.of(invalidDataDoFn).withOutputTags(processingTag, TupleTagList.of(rejectedTag)));
OutputDoFn outputDoFn = new outputDoFn();
PCollection<MyData> processingCollection = mixedCollection.get(processingTag);
PCollection<KV<byte[], byte[]>> outputCollection = processingCollection
.apply("ProcessElements", ParDo.of(outputDoFn));
OutputDoFn 将 MyData 转换为 KV
我的 OutputDoFn 具有以下逻辑。
@ProcessElement
public void processElement(@Element MyData mydata,
OutputReceiver<KV<byte[], byte[]>> output, ProcessContext c) {
c.output(KV.of(mydata.getMessageKey(), mydata.getSomething().getBytes()));
}
如果我错了请纠正我,但你想使用这个 c.output
:
public void output(OutputT output)
你很惊讶这个函数被使用了:
public <T> void output(TupleTag<T> tag, T output)
要让 Beam 使用第一个,您传递的参数必须具有在您 DoFn
创建时声明的 OutputT
类型:
private class DoFnProcessContext extends DoFn<InputT, OutputT>.ProcessContext
我的猜测是您传递给 c.output()
的值与您在创建 DoFn 时指定的类型不完全相同。因此,选择了第二个 output
函数,但它错过了标签。
您能否提供 OutputDoFn
的完整 DoFn 声明以确认?
所有代码参考来自 here。