Flink timeWindow 获取开始时间
Flink timeWindow get start time
我正在计算一段时间内的计数(求和 1)window,如下所示:
mappedUserTrackingEvent
.keyBy("videoId", "userId")
.timeWindow(Time.seconds(30))
.sum("count")
我实际上也想将 window 开始时间添加为关键字段。所以结果会是这样的:
key: videoId=123,userId=234,time=2016-09-16T17:01:30
value: 50
所以基本上按 window 聚合计数。最终目标是绘制这些 windows.
的直方图
如何将 window 的开头添加为键中的字段?然后在这种情况下将 window 对齐到 00s 或 30s?这可能吗?
WindowFunction
的apply()
方法提供了一个Window
对象,如果使用keyBy().timeWindow()
就是一个TimeWindow
。 TimeWindow
对象有两个方法,getStart()
和 getEnd()
,其中 return 分别是 window 开始和结束的时间戳。
目前无法将 sum()
聚合与 WindowFunction
一起使用。您需要执行以下操作:
mappedUserTrackingEvent
.keyBy("videoId", "userId")
.timeWindow(Time.seconds(30))
.apply(new MySumReduceFunction(), new MyWindowFunction());`
MySumReduceFunction
实现 ReduceFunction
接口并通过递增聚合到达 window 的元素来计算总和。 MyWindowFunction
实现了 WindowFunction
。它通过 Iterable
参数接收聚合值,并使用从 TimeWindow
参数获得的时间戳来丰富该值。
您可以使用方法 aggregate
而不是求和。
在 aggregate
中设置第二个参数 implements WindowFunction
或 extends ProcessWindowFunction
.
我使用的是 Flink-1.4.0 ,推荐使用 ProcessWindowFunction
,如:
mappedUserTrackingEvent
.keyBy("videoId", "userId")
.timeWindow(Time.seconds(30))
.aggregate(new Count(), new MyProcessWindowFunction();
public static class MyProcessWindowFunction extends ProcessWindowFunction<Integer, Tuple2<Long, Integer>, Tuple, TimeWindow>
{
@Override
public void process(Tuple tuple, Context context, Iterable<Integer> iterable, Collector<Tuple2<Long, Integer>> collector) throws Exception
{
context.currentProcessingTime();
context.window().getStart();
}
}
我正在计算一段时间内的计数(求和 1)window,如下所示:
mappedUserTrackingEvent
.keyBy("videoId", "userId")
.timeWindow(Time.seconds(30))
.sum("count")
我实际上也想将 window 开始时间添加为关键字段。所以结果会是这样的:
key: videoId=123,userId=234,time=2016-09-16T17:01:30
value: 50
所以基本上按 window 聚合计数。最终目标是绘制这些 windows.
的直方图如何将 window 的开头添加为键中的字段?然后在这种情况下将 window 对齐到 00s 或 30s?这可能吗?
WindowFunction
的apply()
方法提供了一个Window
对象,如果使用keyBy().timeWindow()
就是一个TimeWindow
。 TimeWindow
对象有两个方法,getStart()
和 getEnd()
,其中 return 分别是 window 开始和结束的时间戳。
目前无法将 sum()
聚合与 WindowFunction
一起使用。您需要执行以下操作:
mappedUserTrackingEvent
.keyBy("videoId", "userId")
.timeWindow(Time.seconds(30))
.apply(new MySumReduceFunction(), new MyWindowFunction());`
MySumReduceFunction
实现 ReduceFunction
接口并通过递增聚合到达 window 的元素来计算总和。 MyWindowFunction
实现了 WindowFunction
。它通过 Iterable
参数接收聚合值,并使用从 TimeWindow
参数获得的时间戳来丰富该值。
您可以使用方法 aggregate
而不是求和。
在 aggregate
中设置第二个参数 implements WindowFunction
或 extends ProcessWindowFunction
.
我使用的是 Flink-1.4.0 ,推荐使用 ProcessWindowFunction
,如:
mappedUserTrackingEvent
.keyBy("videoId", "userId")
.timeWindow(Time.seconds(30))
.aggregate(new Count(), new MyProcessWindowFunction();
public static class MyProcessWindowFunction extends ProcessWindowFunction<Integer, Tuple2<Long, Integer>, Tuple, TimeWindow>
{
@Override
public void process(Tuple tuple, Context context, Iterable<Integer> iterable, Collector<Tuple2<Long, Integer>> collector) throws Exception
{
context.currentProcessingTime();
context.window().getStart();
}
}