Apache Beam 中的侧输入与普通构造函数参数
Side inputs vs normal constructor parameters in Apache Beam
我有一个关于 Apache Beam
上下文中的边输入和广播的一般性问题。 processElement
期间需要计算的任何其他变量、列表、地图是否需要作为辅助输入传递?如果将它们作为 DoFn
的普通构造函数参数传递可以吗?例如,如果我想在 processElement
的每个元素计算期间使用一些固定(未计算)值变量(常量,如开始日期、结束日期)怎么办?现在,我可以分别从每个变量中创建单例 PCollectionView
s,并将它们作为辅助输入传递给 DoFn
构造函数。但是,除了这样做,我能否不将这些常量中的每一个作为普通构造函数参数传递给 DoFn
?我在这里遗漏了什么微妙的东西吗?
代码方面,我应该什么时候做:
public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> {
// these are singleton views
private final PCollectionView<LocalDateTime> dateStartView;
private final PCollectionView<LocalDateTime> dateEndView;
public MyFilter(PCollectionView<LocalDateTime> dateStartView,
PCollectionView<LocalDateTime> dateEndView){
this.dateStartView = dateStartView;
this.dateEndView = dateEndView;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception{
// extract date values from the singleton views here and use them
相对于:
public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> {
private final LocalDateTime dateStart;
private final LocalDateTime dateEnd;
public MyFilter(LocalDateTime dateStart,
LocalDateTime dateEnd){
this.dateStart = dateStart;
this.dateEnd = dateEnd;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception{
// use the passed in date values directly here
请注意,在这些示例中,startDate
和 endDate
是固定值,而不是任何先前管道计算的动态结果。
当您调用类似 pipeline.apply(ParDo.of(new MyFilter(...))
的内容时,DoFn
会在您用来启动管道的 main
程序中实例化。然后它被序列化并传递给运行器执行。然后 Runner 决定在哪里执行它,例如在一个由 100 个虚拟机组成的队列上,每个虚拟机都将收到自己的代码副本和序列化数据。如果成员变量是可序列化的,并且您在执行时不改变它们,那应该没问题(link, link), the DoFn
will get deserialized on each node with all the fields populated, and will get executed as expected. However you don't control the number of instances or basically their lifecycle (), so mutate them 风险自负。
PCollections
和辅助输入的好处是您不限于静态值,因此对于几个简单的不可变值您应该没问题。
我有一个关于 Apache Beam
上下文中的边输入和广播的一般性问题。 processElement
期间需要计算的任何其他变量、列表、地图是否需要作为辅助输入传递?如果将它们作为 DoFn
的普通构造函数参数传递可以吗?例如,如果我想在 processElement
的每个元素计算期间使用一些固定(未计算)值变量(常量,如开始日期、结束日期)怎么办?现在,我可以分别从每个变量中创建单例 PCollectionView
s,并将它们作为辅助输入传递给 DoFn
构造函数。但是,除了这样做,我能否不将这些常量中的每一个作为普通构造函数参数传递给 DoFn
?我在这里遗漏了什么微妙的东西吗?
代码方面,我应该什么时候做:
public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> {
// these are singleton views
private final PCollectionView<LocalDateTime> dateStartView;
private final PCollectionView<LocalDateTime> dateEndView;
public MyFilter(PCollectionView<LocalDateTime> dateStartView,
PCollectionView<LocalDateTime> dateEndView){
this.dateStartView = dateStartView;
this.dateEndView = dateEndView;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception{
// extract date values from the singleton views here and use them
相对于:
public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> {
private final LocalDateTime dateStart;
private final LocalDateTime dateEnd;
public MyFilter(LocalDateTime dateStart,
LocalDateTime dateEnd){
this.dateStart = dateStart;
this.dateEnd = dateEnd;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception{
// use the passed in date values directly here
请注意,在这些示例中,startDate
和 endDate
是固定值,而不是任何先前管道计算的动态结果。
当您调用类似 pipeline.apply(ParDo.of(new MyFilter(...))
的内容时,DoFn
会在您用来启动管道的 main
程序中实例化。然后它被序列化并传递给运行器执行。然后 Runner 决定在哪里执行它,例如在一个由 100 个虚拟机组成的队列上,每个虚拟机都将收到自己的代码副本和序列化数据。如果成员变量是可序列化的,并且您在执行时不改变它们,那应该没问题(link, link), the DoFn
will get deserialized on each node with all the fields populated, and will get executed as expected. However you don't control the number of instances or basically their lifecycle (
PCollections
和辅助输入的好处是您不限于静态值,因此对于几个简单的不可变值您应该没问题。