Flink DataSet API: GroupBy 是不是工作不正常?
Flink DataSet API: Is GroupBy is not working correctly?
在我的 Flink Java 程序中,我使用 GroupBy-Operator 如下:
dataSet.groupBy(new KeySelector<myObject, Tuple2<Tuple2<Integer, Integer>, Integer>>() {
private static final long serialVersionUID = 5L;
Tuple2<Tuple2<Integer, Integer>, Integer> groupingKey = new Tuple2<Tuple2<Integer, Integer>, Integer>();
public Tuple2<Tuple2<Integer, Integer>, Integer> getKey(myObject s) {
groupingKey.setField(s.getPosition(), 0);
groupingKey.setField(s.getBand(), 1);
return groupingKey;
}
})
.reduceGroup(reduceFunction);
getPosition()
returns一个Tuple2<Integer, Integer>
和getBand()
returns一个int
。
我想根据这两个值对我的数据集进行分组。如果我有 6 个位置和 4 个波段,我想得到 24 个不同的组,并为每个组独立使用 groupReduce
-函数。但目前我的结果组似乎包含乐队和位置的各种值。我在 groupReduce
函数中检查了这个:
if (this.band == null) {
this.band = myObject.getBand();
}
if (this.band != myObject.getBand()) {
System.out.println("The band should be " + this.band + " but is: " + myObject.getBand());
此外,我生成的文件中还有一些值表明分组存在问题。分组是否有可能在我的情况下不起作用?或者这可能只是我的代码中另一个潜在错误的结果?
我认为您在 GroupReduceFunction
中的检查工作不正常。
GroupReduceFunction.reduce()
可以针对不同的组调用多次。 this.band
是您的 GroupReduceFunction
的成员变量,我假设您没有在 reduce()
方法结束时重置 this.band
。
因此,this.band
只会在 reduce()
的第一次调用中 null
。在第二次调用开始时 this.band
将被初始化并且不会被设置为当前组的波段。因此,下面的检查将失败。
在我的 Flink Java 程序中,我使用 GroupBy-Operator 如下:
dataSet.groupBy(new KeySelector<myObject, Tuple2<Tuple2<Integer, Integer>, Integer>>() {
private static final long serialVersionUID = 5L;
Tuple2<Tuple2<Integer, Integer>, Integer> groupingKey = new Tuple2<Tuple2<Integer, Integer>, Integer>();
public Tuple2<Tuple2<Integer, Integer>, Integer> getKey(myObject s) {
groupingKey.setField(s.getPosition(), 0);
groupingKey.setField(s.getBand(), 1);
return groupingKey;
}
})
.reduceGroup(reduceFunction);
getPosition()
returns一个Tuple2<Integer, Integer>
和getBand()
returns一个int
。
我想根据这两个值对我的数据集进行分组。如果我有 6 个位置和 4 个波段,我想得到 24 个不同的组,并为每个组独立使用 groupReduce
-函数。但目前我的结果组似乎包含乐队和位置的各种值。我在 groupReduce
函数中检查了这个:
if (this.band == null) {
this.band = myObject.getBand();
}
if (this.band != myObject.getBand()) {
System.out.println("The band should be " + this.band + " but is: " + myObject.getBand());
此外,我生成的文件中还有一些值表明分组存在问题。分组是否有可能在我的情况下不起作用?或者这可能只是我的代码中另一个潜在错误的结果?
我认为您在 GroupReduceFunction
中的检查工作不正常。
GroupReduceFunction.reduce()
可以针对不同的组调用多次。 this.band
是您的 GroupReduceFunction
的成员变量,我假设您没有在 reduce()
方法结束时重置 this.band
。
因此,this.band
只会在 reduce()
的第一次调用中 null
。在第二次调用开始时 this.band
将被初始化并且不会被设置为当前组的波段。因此,下面的检查将失败。