Windows 和 Apache Flink 中的状态

Windows and States in Apache Flink

这第一个问题;

我想学习 window 的时间行为。假设我将使用 Processing time 每 2 秒处理一次数据,而当前时间是 10:26:25.169。这时候我部署了作业。

这样的话,会不会每次window都四舍五入到0、2、4等等秒?像下面一样;

如您所见,我已经在 10:26:25.169 部署了作业,但 flink 确实在 window 上运行了 2 秒。是吗?

如果不是,windows 是否像下面这样工作?;

哪个是真的?当我使用 event time 而不是 processing time 时,这种行为会改变吗?

第二题;

我想保持每个键的状态。为此,我可以使用 richFlatMap 函数或 keyedProcessFunction。但我想知道在应用 window 后我可以使用上述功能管理状态吗?例如;

// in this case i can manage state by key
ds.keyBy(_.field).process(new MyStateFunction)

// in this case, can i manage state after window for the same key? 
ds.keyBy(keyExtractorWithTime)
  .window(new MyWindowFunction)
  .reduce(new myRedisFunction)
  .process(new MyStateFunction)

至于第一个问题,它将始终是完整的 2 秒间隔四舍五入,所以基本上正如您所描述的那样:

10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000

有一个 offset 参数允许您在某种程度上控制该行为。但基本上当 Flink 在数据到达时实际创建 window,startTimeendTime 不依赖于数据何时到达,因此数据适合 window 反之则不然。

可以找到更多信息here

因为,对于第二个问题,ProcessWindowFunction 是键控函数,因此您将能够在函数内使用键控状态,就像您在标准 ProcessFunction 中所做的那样。

问题1:如果offset参数不赋值,flink默认会使用window大小的整数倍作为startTimeendTime = startTime + windowSize)。所以你问的打击是对的

10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000

在flink中,startTime会这样计算:

/**
  * Method to get the window start for a timestamp.
  *
  * @param timestamp epoch millisecond to get the window start.
  * @param offset The offset which window start would be shifted by.
  * @param windowSize The size of the generated windows. windowSize.
  * @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
    return timestamp - (timestamp - offset + windowSize) % windowSize;
}

问题2:如果你想管理Keyed后的状态Window,下面的方法可能有效。通过这种方式,您可以管理每个 window.

的状态和 reduce 函数结果
DataStream<SensorReading> input = ...;

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());

// Function definitions

private static class MyReduceFunction implements ReduceFunction<SensorReading> {

  public SensorReading reduce(SensorReading r1, SensorReading r2) {
      return r1.value() > r2.value() ? r2 : r1;
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<SensorReading> minReadings,
                    Collector<Tuple2<Long, SensorReading>> out) {
      SensorReading min = minReadings.iterator().next();
      out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
  }
}

更多详情here