Beam - 分支 PCollections 时出错

Beam - Error while branching PCollections

我有一个从 kafka 读取数据的管道。它将传入数据拆分为处理和拒绝的输出。来自 Kafka 的数据被读入自定义 class MyData 并且输出生成为 KV

用 MyData 定义两个 TupleTag。

 private static final TupleTag<MyData> rejectedTag = new TupleTag<DeserializationOutput>(){};
 private static final TupleTag<MyData> processingTag = new TupleTag<DeserializationOutput>(){};

InvalidDataDoFn 具有将 MyData 数据拆分为处理并拒绝的应用程序逻辑

InvalidDataDoFn invalidDataDoFn = new InvalidDataDoFn(processingTag, rejectedTag); 
PCollectionTuple mixedCollection = myCollection
    .apply(ParDo.of(invalidDataDoFn).withOutputTags(processingTag, TupleTagList.of(rejectedTag)));


OutputDoFn outputDoFn = new outputDoFn();

PCollection<MyData> processingCollection = mixedCollection.get(processingTag);

PCollection<KV<byte[], byte[]>> outputCollection = processingCollection
  .apply("ProcessElements", ParDo.of(outputDoFn));

OutputDoFn 将 MyData 转换为 KV。在 运行 OutputDoFn 时,我收到一个奇怪的错误,指出“传递给输出的标签不能为空”——这来自 https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L559

我的 OutputDoFn 具有以下逻辑。

@ProcessElement
public void processElement(@Element MyData mydata,
    OutputReceiver<KV<byte[], byte[]>> output, ProcessContext c) {

  c.output(KV.of(mydata.getMessageKey(), mydata.getSomething().getBytes()));
}

如果我错了请纠正我,但你想使用这个 c.output :

public void output(OutputT output)

你很惊讶这个函数被使用了:

public <T> void output(TupleTag<T> tag, T output)

要让 Beam 使用第一个,您传递的参数必须具有在您 DoFn 创建时声明的 OutputT 类型:

private class DoFnProcessContext extends DoFn<InputT, OutputT>.ProcessContext

我的猜测是您传递给 c.output() 的值与您在创建 DoFn 时指定的类型不完全相同。因此,选择了第二个 output 函数,但它错过了标签。

您能否提供 OutputDoFn 的完整 DoFn 声明以确认?

所有代码参考来自 here