Flink 如何计算键控 window 的过度聚合输出
Flink how to compute over aggregated output of a keyed window
Flink 是否可以计算键控 window 的聚合输出?
我们有一个数据流,我们调用 byKey()
指定一个由字符和数字组成的字段(例如 A01、A02...A10、B01、B02...B10 等) ), 就像棋盘的方格。
在 byKey()
之后我们调用 window(TumblingEventTimeWindow.of(Time.days(7))
,所以我们创建一个每周 window。
在此之后,我们调用 reduce()
结果我们得到 SingleOutputStreamOperator<Result>
.
现在,我们要根据每个 Result
对象的字段对 SingleOutputStreamOperator<Result>
进行分组,并迭代每个组以根据 Result
对象中的字段提取前 3 名在该组中,是否可以在不创建另一个每周 window 并且不必对其执行聚合函数的情况下执行此操作?
显然这是可行的,但是我不喜欢在另一个每周 window 之后有第二周 window 的想法。我希望能够合并第一个 window 的所有 SingleOutputStreamOperator<Result>
并对它们执行一个函数,而不必使用一个新的 window 来接收所有元素。
这是我的代码,如您所见:
我们使用keyBy()
基于一个Tuple2<String, Integer>
基于对象Query2IntermediateOutcome
的字段。元组中的字符串是我之前提到的代码A01,...,A10。
代码 window(timeIntervalConstructor.newInstance())
基本上创建每周 window.
我们调用 reduce()
所以对于每个键我们都有一个聚合值。
现在我们使用另一个 keyBy()
,这次密钥基本上是通过查看代码 A01,...,A10 的数字来计算的:如果它大于 5,我们有一个海型,如果小于或等于我们有另一个。
同样,window(timeIntervalConstructor.newInstance())
第二周 window。
最后,在 aggregate()
中,我们计算每个组的前 3 名。
.keyBy(new KeySelector<Query2IntermediateOutcome, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> getKey(Query2IntermediateOutcome intermediateOutcome) throws Exception {
return new Tuple2<String, Integer>(intermediateOutcome.getCellId(), intermediateOutcome.getHourInDate());
}
})
.window(timeIntervalConstructor.newInstance())
.reduce(new ReduceFunction<Query2IntermediateOutcome>() {
@Override
public Query2IntermediateOutcome reduce(Query2IntermediateOutcome t1, Query2IntermediateOutcome t2) throws Exception {
t1.setAttendance(t1.getAttendance()+t2.getAttendance());
return t1;
}
})
.keyBy(new KeySelector<Query2IntermediateOutcome, String>() {
@Override
public String getKey(Query2IntermediateOutcome query2IntermediateOutcome) throws Exception {
return query2IntermediateOutcome.getSeaType().toString();
}
})
.window(timeIntervalConstructor.newInstance())
.aggregate(new Query2FinalAggregator(), new Query2Window())
这个解决方案有效,但我不太喜欢它,因为第二个 window 在前一个触发时接收所有数据,但它每周发生一次,所以第二个 window 接收所有数据在一起,必须立即 运行 aggregate()
.
我认为将所有这些业务逻辑折叠到一个 KeyedProcessFunction 中会相当简单。这样就可以避免周末activity的爆发了。
查看 this tutorial in the Flink docs 示例,了解如何将键控 window 替换为 KeyedProcessFunction。
Flink 是否可以计算键控 window 的聚合输出?
我们有一个数据流,我们调用 byKey()
指定一个由字符和数字组成的字段(例如 A01、A02...A10、B01、B02...B10 等) ), 就像棋盘的方格。
在 byKey()
之后我们调用 window(TumblingEventTimeWindow.of(Time.days(7))
,所以我们创建一个每周 window。
在此之后,我们调用 reduce()
结果我们得到 SingleOutputStreamOperator<Result>
.
现在,我们要根据每个 Result
对象的字段对 SingleOutputStreamOperator<Result>
进行分组,并迭代每个组以根据 Result
对象中的字段提取前 3 名在该组中,是否可以在不创建另一个每周 window 并且不必对其执行聚合函数的情况下执行此操作?
显然这是可行的,但是我不喜欢在另一个每周 window 之后有第二周 window 的想法。我希望能够合并第一个 window 的所有 SingleOutputStreamOperator<Result>
并对它们执行一个函数,而不必使用一个新的 window 来接收所有元素。
这是我的代码,如您所见:
我们使用
keyBy()
基于一个Tuple2<String, Integer>
基于对象Query2IntermediateOutcome
的字段。元组中的字符串是我之前提到的代码A01,...,A10。代码
window(timeIntervalConstructor.newInstance())
基本上创建每周 window.我们调用
reduce()
所以对于每个键我们都有一个聚合值。现在我们使用另一个
keyBy()
,这次密钥基本上是通过查看代码 A01,...,A10 的数字来计算的:如果它大于 5,我们有一个海型,如果小于或等于我们有另一个。同样,
window(timeIntervalConstructor.newInstance())
第二周 window。最后,在
aggregate()
中,我们计算每个组的前 3 名。.keyBy(new KeySelector<Query2IntermediateOutcome, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> getKey(Query2IntermediateOutcome intermediateOutcome) throws Exception { return new Tuple2<String, Integer>(intermediateOutcome.getCellId(), intermediateOutcome.getHourInDate()); } }) .window(timeIntervalConstructor.newInstance()) .reduce(new ReduceFunction<Query2IntermediateOutcome>() { @Override public Query2IntermediateOutcome reduce(Query2IntermediateOutcome t1, Query2IntermediateOutcome t2) throws Exception { t1.setAttendance(t1.getAttendance()+t2.getAttendance()); return t1; } }) .keyBy(new KeySelector<Query2IntermediateOutcome, String>() { @Override public String getKey(Query2IntermediateOutcome query2IntermediateOutcome) throws Exception { return query2IntermediateOutcome.getSeaType().toString(); } }) .window(timeIntervalConstructor.newInstance()) .aggregate(new Query2FinalAggregator(), new Query2Window())
这个解决方案有效,但我不太喜欢它,因为第二个 window 在前一个触发时接收所有数据,但它每周发生一次,所以第二个 window 接收所有数据在一起,必须立即 运行 aggregate()
.
我认为将所有这些业务逻辑折叠到一个 KeyedProcessFunction 中会相当简单。这样就可以避免周末activity的爆发了。
查看 this tutorial in the Flink docs 示例,了解如何将键控 window 替换为 KeyedProcessFunction。