Flink 如何计算键控 window 的过度聚合输出

Flink how to compute over aggregated output of a keyed window

Flink 是否可以计算键控 window 的聚合输出?

我们有一个数据流,我们调用 byKey() 指定一个由字符和数字组成的字段(例如 A01、A02...A10、B01、B02...B10 等) ), 就像棋盘的方格。 在 byKey() 之后我们调用 window(TumblingEventTimeWindow.of(Time.days(7)),所以我们创建一个每周 window。 在此之后,我们调用 reduce() 结果我们得到 SingleOutputStreamOperator<Result>.

现在,我们要根据每个 Result 对象的字段对 SingleOutputStreamOperator<Result> 进行分组,并迭代每个组以根据 Result 对象中的字段提取前 3 名在该组中,是否可以在不创建另一个每周 window 并且不必对其执行聚合函数的情况下执行此操作? 显然这是可行的,但是我不喜欢在另一个每周 window 之后有第二周 window 的想法。我希望能够合并第一个 window 的所有 SingleOutputStreamOperator<Result> 并对它们执行一个函数,而不必使用一个新的 window 来接收所有元素。

这是我的代码,如您所见:

  1. 我们使用keyBy()基于一个Tuple2<String, Integer>基于对象Query2IntermediateOutcome的字段。元组中的字符串是我之前提到的代码A01,...,A10。

  2. 代码 window(timeIntervalConstructor.newInstance()) 基本上创建每周 window.

  3. 我们调用 reduce() 所以对于每个键我们都有一个聚合值。

  4. 现在我们使用另一个 keyBy(),这次密钥基本上是通过查看代码 A01,...,A10 的数字来计算的:如果它大于 5,我们有一个海型,如果小于或等于我们有另一个。

  5. 同样,window(timeIntervalConstructor.newInstance()) 第二周 window。

  6. 最后,在 aggregate() 中,我们计算每个组的前 3 名。

            .keyBy(new KeySelector<Query2IntermediateOutcome, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> getKey(Query2IntermediateOutcome intermediateOutcome) throws Exception {
                    return new Tuple2<String, Integer>(intermediateOutcome.getCellId(), intermediateOutcome.getHourInDate());
                }
            })
            .window(timeIntervalConstructor.newInstance())
            .reduce(new ReduceFunction<Query2IntermediateOutcome>() {
                @Override
                public Query2IntermediateOutcome reduce(Query2IntermediateOutcome t1, Query2IntermediateOutcome t2) throws Exception {
                    t1.setAttendance(t1.getAttendance()+t2.getAttendance());
                    return t1;
                }
            })
            .keyBy(new KeySelector<Query2IntermediateOutcome, String>() {
                @Override
                public String getKey(Query2IntermediateOutcome query2IntermediateOutcome) throws Exception {
                    return query2IntermediateOutcome.getSeaType().toString();
                }
            })
            .window(timeIntervalConstructor.newInstance())
            .aggregate(new Query2FinalAggregator(), new Query2Window())
    

这个解决方案有效,但我不太喜欢它,因为第二个 window 在前一个触发时接收所有数据,但它每周发生一次,所以第二个 window 接收所有数据在一起,必须立即 运行 aggregate().

我认为将所有这些业务逻辑折叠到一个 KeyedProcessFunction 中会相当简单。这样就可以避免周末activity的爆发了。

查看 this tutorial in the Flink docs 示例,了解如何将键控 window 替换为 KeyedProcessFunction。