Flink timeWindow 获取开始时间

Flink timeWindow get start time

我正在计算一段时间内的计数(求和 1)window,如下所示:

mappedUserTrackingEvent
            .keyBy("videoId", "userId")
            .timeWindow(Time.seconds(30))
            .sum("count")

我实际上也想将 window 开始时间添加为关键字段。所以结果会是这样的:

key: videoId=123,userId=234,time=2016-09-16T17:01:30
value: 50

所以基本上按 window 聚合计数。最终目标是绘制这些 windows.

的直方图

如何将 window 的开头添加为键中的字段?然后在这种情况下将 window 对齐到 00s 或 30s?这可能吗?

WindowFunctionapply()方法提供了一个Window对象,如果使用keyBy().timeWindow()就是一个TimeWindowTimeWindow 对象有两个方法,getStart()getEnd(),其中 return 分别是 window 开始和结束的时间戳。

目前无法将 sum() 聚合与 WindowFunction 一起使用。您需要执行以下操作:

 mappedUserTrackingEvent
        .keyBy("videoId", "userId")
        .timeWindow(Time.seconds(30))
        .apply(new MySumReduceFunction(), new MyWindowFunction());`

MySumReduceFunction 实现 ReduceFunction 接口并通过递增聚合到达 window 的元素来计算总和。 MyWindowFunction 实现了 WindowFunction。它通过 Iterable 参数接收聚合值,并使用从 TimeWindow 参数获得的时间戳来丰富该值。

您可以使用方法 aggregate 而不是求和。
aggregate 中设置第二个参数 implements WindowFunction 或 extends ProcessWindowFunction.
我使用的是 Flink-1.4.0 ,推荐使用 ProcessWindowFunction,如:

mappedUserTrackingEvent
    .keyBy("videoId", "userId")
    .timeWindow(Time.seconds(30))
    .aggregate(new Count(), new MyProcessWindowFunction();

public static class MyProcessWindowFunction extends ProcessWindowFunction<Integer, Tuple2<Long,  Integer>, Tuple, TimeWindow>
{
    @Override
    public void process(Tuple tuple, Context context, Iterable<Integer> iterable, Collector<Tuple2<Long,  Integer>> collector) throws Exception
    {
        context.currentProcessingTime();
        context.window().getStart();
    }
}