翻滚后执行flink sink window

Execute flink sink after tumbling window

来源:Kinesis 数据流

接收器:弹性搜索

两者都使用 AWS 服务。

此外,运行 我在 AWS Kinesis 数据分析应用程序上的 Flink 作业

我遇到了 flink 的 windowing 功能问题。我的工作看起来像这样

DataStream<TrackingData> input = ...; // input from kinesis stream
input.keyBy(e -> e.getArea())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .reduce(new MyReduceFunction(), new MyProcessWindowFunction())
                .addSink(<elasticsearch sink>);
 private static class MyReduceFunction implements ReduceFunction<TrackingData> {
        @Override
        public TrackingData reduce(TrackingData trackingData, TrackingData t1) throws Exception {
            trackingData.setVideoDuration(trackingData.getVideoDuration() + t1.getVideoDuration());
            return trackingData;
        }
    }
private static class MyProcessWindowFunction extends ProcessWindowFunction<TrackingData, TrackingData, String, TimeWindow> {
        public void process(String key,
                            Context context,
                            Iterable<TrackingData> in,
                            Collector<TrackingData> out) {

            TrackingData trackingIn = in.iterator().next();

            Long videoDuration =0l;
            for (TrackingData t: in) {
                videoDuration += t.getVideoDuration();
            }
            trackingIn.setVideoDuration(videoDuration);
            out.collect(trackingIn);
        }
    }

示例事件:

{"area":"sessions","userId":4450,"date":"2021-12-03T11:00:00","videoDuration":5} 

我在这里所做的是从运动流中获取大量这些事件我想对 window 的每 10 秒求和 videoDuration 然后我想存储这个 单个事件进入elasticsearch。

在 Kinesis 中,每秒可以有 10,000 个事件。我不想在 elasticsearch 中存储所有 10,000 个事件我只想每 10 秒存储一个事件。

问题是当我向这个作业发送一个事件时,它会快速处理这个事件并直接进入 elasticsearch 但我想实现:直到每 10 秒我希望事件 videoDuration 时间递增,之后10 秒内只有一个事件存储在 elasticearch 中。

我怎样才能做到这一点?

我认为你误诊了问题。

您编写的代码将从每个 10 秒长的 window 中为每个在 window[=26= 期间有事件的不同键生成一个事件]. MyProcessWindowFunction 没有任何效果:由于 window 结果已经预先聚合,每个 Iterable 将只包含一个事件。

我相信你想这样做:

input.keyBy(e -> e.getArea())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .reduce(new MyReduceFunction())
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .reduce(new MyReduceFunction())
                .addSink(<elasticsearch sink>);

你也可以这样做

input.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .reduce(new MyReduceFunction())
                .addSink(<elasticsearch sink>);

但第一个版本会更快,因为它将能够在计算 windowAll.[=13= 中的全局总和之前并行计算每个键的 window 结果。 ]

FWIW,Table/SQL API 通常更适合此类应用程序,并且应该产生比其中任何一个都更优化的管道。