Flink TumblingEventTimeWindows 如何实现不重叠?
Flink TumblingEventTimeWindows how achievement without overlap?
Stream Processing with Apache Flink 第 211 页有这段文字
“The WindowAssigner determines for each arriving element to which windows it is assigned.”
那我研究一下TumblingEventTimeWindows
的源码
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
...............................
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
if (staggerOffset == null) {
staggerOffset =
windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
}
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start =
TimeWindow.getWindowStartWithOffset(
timestamp, (globalOffset + staggerOffset) % size, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException(
"Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
+ "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
+ "'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
...............................
从我能找到的源代码中,元素确实被分配给了window,new TimeWindow(start, start + size)表示每个元素被分配一个新的 TimeWindow。
但是我很困惑,TumblingEventTimeWindows如何实现不重叠?
如果每个元素都分配一个新的TimeWindow,结果如下
不能保证每个 window 不会重叠,谁能指出 TumblingEventTimeWindows 的方向如何实现不重叠?
TimeWindow
对象不是很重要。这是一个简单的结构,用于保存 window 的开始和结束时间戳,仅此而已。它的名字听起来很重要,但它只是用于对描述传入事件分配给的时间间隔的信息副本进行编码。
实际上 WindowOperator
有重要的 window 数据。从逻辑上讲,它保留了类似于地图的内容,其中键是由 TimeWindow
对象描述的间隔,值是分配给这些间隔的事件列表。
Stream Processing with Apache Flink 第 211 页有这段文字
“The WindowAssigner determines for each arriving element to which windows it is assigned.”
那我研究一下TumblingEventTimeWindows
的源码public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
...............................
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
if (staggerOffset == null) {
staggerOffset =
windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
}
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start =
TimeWindow.getWindowStartWithOffset(
timestamp, (globalOffset + staggerOffset) % size, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException(
"Record has Long.MIN_VALUE timestamp (= no timestamp marker). "
+ "Is the time characteristic set to 'ProcessingTime', or did you forget to call "
+ "'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
...............................
从我能找到的源代码中,元素确实被分配给了window,new TimeWindow(start, start + size)表示每个元素被分配一个新的 TimeWindow。
但是我很困惑,TumblingEventTimeWindows如何实现不重叠?
如果每个元素都分配一个新的TimeWindow,结果如下
不能保证每个 window 不会重叠,谁能指出 TumblingEventTimeWindows 的方向如何实现不重叠?
TimeWindow
对象不是很重要。这是一个简单的结构,用于保存 window 的开始和结束时间戳,仅此而已。它的名字听起来很重要,但它只是用于对描述传入事件分配给的时间间隔的信息副本进行编码。
实际上 WindowOperator
有重要的 window 数据。从逻辑上讲,它保留了类似于地图的内容,其中键是由 TimeWindow
对象描述的间隔,值是分配给这些间隔的事件列表。