Windows 和 Apache Flink 中的状态
Windows and States in Apache Flink
这第一个问题;
我想学习 window 的时间行为。假设我将使用 Processing time
每 2 秒处理一次数据,而当前时间是 10:26:25.169
。这时候我部署了作业。
这样的话,会不会每次window都四舍五入到0、2、4等等秒?像下面一样;
10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000
如您所见,我已经在 10:26:25.169
部署了作业,但 flink 确实在 window 上运行了 2 秒。是吗?
如果不是,windows 是否像下面这样工作?;
10:26:25.169 - 10:26:27.169
10:26:27.169 - 10:26:29.169
哪个是真的?当我使用 event time
而不是 processing time
时,这种行为会改变吗?
第二题;
我想保持每个键的状态。为此,我可以使用 richFlatMap 函数或 keyedProcessFunction。但我想知道在应用 window 后我可以使用上述功能管理状态吗?例如;
// in this case i can manage state by key
ds.keyBy(_.field).process(new MyStateFunction)
// in this case, can i manage state after window for the same key?
ds.keyBy(keyExtractorWithTime)
.window(new MyWindowFunction)
.reduce(new myRedisFunction)
.process(new MyStateFunction)
至于第一个问题,它将始终是完整的 2 秒间隔四舍五入,所以基本上正如您所描述的那样:
10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000
有一个 offset
参数允许您在某种程度上控制该行为。但基本上当 Flink 在数据到达时实际创建 window,startTime
和 endTime
不依赖于数据何时到达,因此数据适合 window 反之则不然。
可以找到更多信息here
因为,对于第二个问题,ProcessWindowFunction
是键控函数,因此您将能够在函数内使用键控状态,就像您在标准 ProcessFunction
中所做的那样。
问题1:如果offset
参数不赋值,flink默认会使用window大小的整数倍作为startTime
(endTime
= startTime
+ windowSize
)。所以你问的打击是对的
10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000
在flink中,startTime
会这样计算:
/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start.
* @param offset The offset which window start would be shifted by.
* @param windowSize The size of the generated windows. windowSize.
* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
问题2:如果你想管理Keyed后的状态Window,下面的方法可能有效。通过这种方式,您可以管理每个 window.
的状态和 reduce
函数结果
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
更多详情here。
这第一个问题;
我想学习 window 的时间行为。假设我将使用 Processing time
每 2 秒处理一次数据,而当前时间是 10:26:25.169
。这时候我部署了作业。
这样的话,会不会每次window都四舍五入到0、2、4等等秒?像下面一样;
10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000
如您所见,我已经在 10:26:25.169
部署了作业,但 flink 确实在 window 上运行了 2 秒。是吗?
如果不是,windows 是否像下面这样工作?;
10:26:25.169 - 10:26:27.169
10:26:27.169 - 10:26:29.169
哪个是真的?当我使用 event time
而不是 processing time
时,这种行为会改变吗?
第二题;
我想保持每个键的状态。为此,我可以使用 richFlatMap 函数或 keyedProcessFunction。但我想知道在应用 window 后我可以使用上述功能管理状态吗?例如;
// in this case i can manage state by key
ds.keyBy(_.field).process(new MyStateFunction)
// in this case, can i manage state after window for the same key?
ds.keyBy(keyExtractorWithTime)
.window(new MyWindowFunction)
.reduce(new myRedisFunction)
.process(new MyStateFunction)
至于第一个问题,它将始终是完整的 2 秒间隔四舍五入,所以基本上正如您所描述的那样:
10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000
有一个 offset
参数允许您在某种程度上控制该行为。但基本上当 Flink 在数据到达时实际创建 window,startTime
和 endTime
不依赖于数据何时到达,因此数据适合 window 反之则不然。
可以找到更多信息here
因为,对于第二个问题,ProcessWindowFunction
是键控函数,因此您将能够在函数内使用键控状态,就像您在标准 ProcessFunction
中所做的那样。
问题1:如果offset
参数不赋值,flink默认会使用window大小的整数倍作为startTime
(endTime
= startTime
+ windowSize
)。所以你问的打击是对的
10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000
在flink中,startTime
会这样计算:
/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start.
* @param offset The offset which window start would be shifted by.
* @param windowSize The size of the generated windows. windowSize.
* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
问题2:如果你想管理Keyed后的状态Window,下面的方法可能有效。通过这种方式,您可以管理每个 window.
的状态和reduce
函数结果
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
更多详情here。