Apache Flink:Window 函数和时间的开始

Apache Flink: Window Functions and the beginning of time

WindowAssigner 中,一个元素被分配给一个或多个 TimeWindow 实例。在滑动事件时间 window 的情况下,这发生在 SlidingEventTimeWindows#assignWindows1.

如果 window 带有 size=5slide=1,带有 timestamp=0 的元素被分配到以下 windows:

  1. Window(开始=0,结束=5)
  2. Window(开始=-1,结束=4)
  3. Window(开始=-2,结束=3)
  4. Window(开始=-3,结束=2)
  5. Window(开始=-4,结束=1)

在一张照片中:

                            +-> Beginning of time
                            |
                            |
+----------------------------------------------+
|     size = 5              +--+ element       |
|    slide = 1              |                  |
|                           v                  |
| t=[ 0,5[ Window 1         XXXXX              |
| t=[-1,4[ Window 2        XXXXX               |
| t=[-2,3[ Window 3       XXXXX                |
| t=[-3,2[ Window 4      XXXXX                 |
| t=[-4,1[ Window 5     XXXXX                  |
|                                              |
| time(-4 to +4)        ----                   |
|                       432101234              |
+---------------------------+------------------+
                            |
                            |
                            |
                            +

有没有办法告诉Flink有时间的开始而之前没有windows?如果不是,从哪里着手改变它? 在上面的例子中,Flink 应该只有一个 window (t=[4,8[ Window 1) 作为第一个元素。像这样:

                            +-> Beginning of time
                            |
                            |
+-----------------------------------------------+
|     size = 5              +--+ element        |
|    slide = 1              |                   |
|                           v                   |
| t=[ 0,5[ Window 1         XXXXX               |
| t=[ 1,6[ Window 2          XXXXX              |
| t=[ 2,7[ Window 3           XXXXX             |
| t=[ 3,8[ Window 4            XXXXX            |
| t=[ 4,9[ Window 5             XXXXX           |
|                                               |
| time(-4 to +8)        ----                    |
|                       4321012345678           |
+---------------------------+-------------------+
                            |
                            |
                            |
                            +

一旦 windows 的数量达到并超过 window 大小,这将不再起作用。那么,在上面的例子中,所有元素都在 5 Windows.

之内

脚注:

  1. org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows#assignWindows

目前无法指定Flink作业的有效时间间隔。这也可能有点问题,因为您可能也想将您的工作应用于历史数据。

不过,您可以做的是手动过滤 windows 在超时开始之前开始的内容:

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val startTime = 1
val windowLength = 2
val slide = 1

val input = env.fromElements((1,1), (2,2), (3,3))
               .assignAscendingTimestamps(x => x._2)

val windowed = input
      .timeWindowAll(Time.milliseconds(windowLength), Time.milliseconds(slide))
      .apply{ (window, iterable, collector: Collector[Int]) =>
         if (window.getStart >= startTime) {
           collector.collect(iterable.map(_._1).reduce(_ + _))
         } else {
           // discard early windows
         }
       }

windowed.print()

env.execute()

我可能会找到更好的解决方法来解决这个问题。 这个想法是将水印设置到未来足够远的时间点,这样您的 windows 就有足够的数据。早期的 windows 仍然存在,但它们将被丢弃。

这是 AssignerWithPeriodicWatermarks[T] 的概念证明:

  class WMG[T](wait: Long) extends AssignerWithPeriodicWatermarks[T] {
    var t: Option[Long] = None
    var firstTime = true

    override def extractTimestamp(el: T, prevTs: Long): Long = {
      t = Some(prevTs)
      prevTs
    }

    override def getCurrentWatermark(): Watermark = (t, firstTime) match {
      case (None, _) => return null
      case (Some(v), false) => new Watermark(v)
      case (Some(v), true) => {
        firstTime = false
        new Watermark(v + wait)
      }
    }
  }
`wait` 是您第一个 window 的大小。 似乎工作正常,但我对 flink 的了解不够确定。

更新:不幸的是,它不起作用(现在我不知道为什么会这样),"early windows" 的密钥流中总是很少有密钥.所以最后我只是过滤了错误的 windows 类似的东西:

val s = (winSize/winStep).intValue
kstream.flatMapWithState((in: StreamOut, state: Option[Int]) =>      
  state match {
    case None    => (Seq(), Some(1))
    case Some(s) => (Seq(in), Some(s))
    case Some(v) => (Seq(), Some(v+1))
  })