访问 PCollectionView<List<Foo>> 的元素:Google Cloud Dataflow/Apache Beam
Access elements of PCollectionView<List<Foo>> : Google Cloud Dataflow/Apache Beam
我有一个 PCollection,我想将其作为辅助输入传递并在 ParDo 中访问它的元素。
所以我创建了一个 PCollectionView 作为:
final PCollectionView<List<Foo>> view =
myPCollection.apply(View.asList());
但是如何在传递侧输入时访问 ParDo 中的元素?
一个例子真的很有帮助。
谢谢
这段代码主要来自Beam programming guide.
final PCollectionView<List<Foo>> view =
myPCollection.apply(View.asList());
PCollection<String> resultingPCollection =
someOtherPCollection.apply(ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
List<Foo> mySideInput = c.sideInput(view);
// Do something with side input
}
}).withSideInputs(view)
);
如果您不想使用匿名 DoFn,您也可以将 PCollectionView 作为其构造函数的一部分传递,并在 processElement 函数中访问它。像这样:
final PCollectionView<List<Foo>> view =
myPCollection.apply(View.asList());
PCollection<String> resultingPCollection =
someOtherPCollection.apply(ParDo
.of(new MyDoFn(view)).withSideInputs(view));
class MyDoFn extends DoFn<String, String> {
final PCollectionView<List<Foo>> view;
MyDoFn(PCollectionView<List<Foo>> view) {
this.view = view;
}
@ProcessElement
public void processElement(ProcessContext c) {
List<Foo> mySideInput = c.sideInput(this.view);
// Do something with side input
}
}
我有一个 PCollection,我想将其作为辅助输入传递并在 ParDo 中访问它的元素。
所以我创建了一个 PCollectionView 作为:
final PCollectionView<List<Foo>> view =
myPCollection.apply(View.asList());
但是如何在传递侧输入时访问 ParDo 中的元素?
一个例子真的很有帮助。
谢谢
这段代码主要来自Beam programming guide.
final PCollectionView<List<Foo>> view =
myPCollection.apply(View.asList());
PCollection<String> resultingPCollection =
someOtherPCollection.apply(ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
List<Foo> mySideInput = c.sideInput(view);
// Do something with side input
}
}).withSideInputs(view)
);
如果您不想使用匿名 DoFn,您也可以将 PCollectionView 作为其构造函数的一部分传递,并在 processElement 函数中访问它。像这样:
final PCollectionView<List<Foo>> view =
myPCollection.apply(View.asList());
PCollection<String> resultingPCollection =
someOtherPCollection.apply(ParDo
.of(new MyDoFn(view)).withSideInputs(view));
class MyDoFn extends DoFn<String, String> {
final PCollectionView<List<Foo>> view;
MyDoFn(PCollectionView<List<Foo>> view) {
this.view = view;
}
@ProcessElement
public void processElement(ProcessContext c) {
List<Foo> mySideInput = c.sideInput(this.view);
// Do something with side input
}
}