在非匿名 DoFn 中访问端输入
Access side input in a non - anonymous DoFn
如果我有 class 扩展 DoFn,如何访问侧输入的元素?
例如:
假设我有一个 ParDo 转换,例如:
PCollection<String> data = myData.apply("Get data",
ParDo.of(new MyClass()).withSideInputs(myDataView));
我有一个 class:-
static class MyClass extends DoFn<String,String>
{
//How to access side input here
}
c.sideInput() 在这种情况下不起作用。
谢谢。
在这种情况下,问题是您的 DoFn 中的 processElement
方法无法访问您的 main 方法中的 PCollectionView 实例。
您可以在构造函数中将 PCollectionView 传递给 DoFn:
class MyClass extends DoFn<String,String>
{
private final PCollectionView<..> mySideInput;
public MyClass(PCollectionView<..> mySideInput) {
// List, or Map or anything:
this.mySideInput = mySideInput;
}
@ProcessElement
public void processElement(ProcessContext c) throws IOException
{
// List or Map or any type you need:
List<..> sideInputList = c.sideInput(mySideInput);
}
}
然后在实例化时将辅助输入传递给 class,并将其指示为辅助输入,如下所示:
p.apply(ParDo.of(new MyClass(mySideInput)).withSideInputs(mySideInput));
对此的解释是,当您使用匿名 DoFn 时,process 方法有一个闭包,可以访问封闭 DoFn 范围内的所有对象(其中包括 PCollectionView)。当您不使用匿名 DoFn 时,没有闭包,您需要另一种传递 PCollectionView 的方式。
所以上面的回答虽然是正确的,但是还是有些不完整。
因此,一旦您完成上述答案的实施,您需要像这样执行您的管道:
p.apply(ParDo.of(new MyClass(mySideInput)).withSideInputs(mySideInput));
如果我有 class 扩展 DoFn,如何访问侧输入的元素?
例如:
假设我有一个 ParDo 转换,例如:
PCollection<String> data = myData.apply("Get data",
ParDo.of(new MyClass()).withSideInputs(myDataView));
我有一个 class:-
static class MyClass extends DoFn<String,String>
{
//How to access side input here
}
c.sideInput() 在这种情况下不起作用。
谢谢。
在这种情况下,问题是您的 DoFn 中的 processElement
方法无法访问您的 main 方法中的 PCollectionView 实例。
您可以在构造函数中将 PCollectionView 传递给 DoFn:
class MyClass extends DoFn<String,String>
{
private final PCollectionView<..> mySideInput;
public MyClass(PCollectionView<..> mySideInput) {
// List, or Map or anything:
this.mySideInput = mySideInput;
}
@ProcessElement
public void processElement(ProcessContext c) throws IOException
{
// List or Map or any type you need:
List<..> sideInputList = c.sideInput(mySideInput);
}
}
然后在实例化时将辅助输入传递给 class,并将其指示为辅助输入,如下所示:
p.apply(ParDo.of(new MyClass(mySideInput)).withSideInputs(mySideInput));
对此的解释是,当您使用匿名 DoFn 时,process 方法有一个闭包,可以访问封闭 DoFn 范围内的所有对象(其中包括 PCollectionView)。当您不使用匿名 DoFn 时,没有闭包,您需要另一种传递 PCollectionView 的方式。
所以上面的回答虽然是正确的,但是还是有些不完整。
因此,一旦您完成上述答案的实施,您需要像这样执行您的管道:
p.apply(ParDo.of(new MyClass(mySideInput)).withSideInputs(mySideInput));