将 TupleTag 传递给 DoFn 方法
Pass TupleTag to DoFn method
我正在尝试从 DoFn 方法获得两个输出,下面是 Apache Beam programming guide
的示例
基本上在示例中你传递了一个 TupleTag,然后指定输出到哪里,这对我有用问题是我在 ParDo 中调用了一个外部方法,不知道如何传递这个 TupleTag,这是我的代码:
PCollectionTuple processedData = pubEv
.apply("Processing", ParDo.of(new HandleEv())
.withOutputTags(mainData, TupleTagList.of(failedData)));
HandleEv 方法:
static class HandleEv extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output("test")
c.output(failedData,"failed")
}
}
我得到的错误是 cannot find symbol
因为无法从 HandleEv 访问 failedData,我尝试在 class 的开头声明 failedData 但两者都不起作用。
非常感谢
您可以像将值传递给任何其他对象一样执行此操作 --
将其作为参数传递给 HandleEv
的构造函数并将其存储在字段中:
static class HandleEv extends DoFn<String, String> {
private final TupleTag<String> failedData;
public HandleEv(TupleTag<String> failedData) {
this.failedData = failedData;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output("test")
c.output(failedData,"failed")
}
}
然后像这样使用它:
PCollectionTuple processedData = pubEv
.apply("Processing", ParDo.of(new HandleEv(failedData))
.withOutputTags(mainData, TupleTagList.of(failedData)));
我正在尝试从 DoFn 方法获得两个输出,下面是 Apache Beam programming guide
的示例基本上在示例中你传递了一个 TupleTag,然后指定输出到哪里,这对我有用问题是我在 ParDo 中调用了一个外部方法,不知道如何传递这个 TupleTag,这是我的代码:
PCollectionTuple processedData = pubEv
.apply("Processing", ParDo.of(new HandleEv())
.withOutputTags(mainData, TupleTagList.of(failedData)));
HandleEv 方法:
static class HandleEv extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output("test")
c.output(failedData,"failed")
}
}
我得到的错误是 cannot find symbol
因为无法从 HandleEv 访问 failedData,我尝试在 class 的开头声明 failedData 但两者都不起作用。
非常感谢
您可以像将值传递给任何其他对象一样执行此操作 --
将其作为参数传递给 HandleEv
的构造函数并将其存储在字段中:
static class HandleEv extends DoFn<String, String> {
private final TupleTag<String> failedData;
public HandleEv(TupleTag<String> failedData) {
this.failedData = failedData;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output("test")
c.output(failedData,"failed")
}
}
然后像这样使用它:
PCollectionTuple processedData = pubEv
.apply("Processing", ParDo.of(new HandleEv(failedData))
.withOutputTags(mainData, TupleTagList.of(failedData)));