flink 是否保存带有水印的关闭事件时间 windows 的历史记录?

Does flink hold history of closed event time windows with watermark?

我有一个 flink 作业,它使用带有事件时间和水印的键控滚动 windows 聚合数据。

我的问题是flink是否持有他已经关闭的状态windows? 否则,我没有其他解释为什么属于以前从未打开过的 window 的事件会打开 window 而不是立即将其删除。

假设我们的 windows 持续 1 小时,forBoundedOutOfOrderness 持续 10 分钟

让我们举个例子:

event1 = ("2022-01-01T08:25:00Z") => window 解雇

event2 = ("2022-01-01T09:25:00Z") => window 已创建但未按预期触发

event3 = ("2022-01-01T05:25:00Z") => 将与事件 4 聚合而不是丢弃(为什么?)

event4 = ("2022-01-01T05:40:00Z") => 将与事件 3 聚合而不是丢弃(为什么?)

    val stream = env
      .fromSource(
        kafkaSource,
        WatermarkStrategy
          .forBoundedOutOfOrderness[(String, EnrichedProcess, KafkaHeaders)](Duration.ofMinutes(outOfOrdernessMinutes)) //Watermark max time for late events
          .withIdleness(Duration.ofSeconds(idleness))
          .withTimestampAssigner(new SerializableTimestampAssigner[(String, EnrichedProcess, KafkaHeaders)] {
            override def extractTimestamp(element: (String, EnrichedProcess, KafkaHeaders), recordTimestamp: Long)
                : Long = {
              logger.info(
                LogMessage(
                  element._3.orgId,
                  s"Received incoming EnrichedProcess update_time: ${element._3.updateTime}, process time: ${recordTimestamp.asDate}",
                  element._3.flowId
                )
              )
              element._3.updateTime.asEpoch
            }
          }),
        s"Source - $kConsumeTopic"
      )

    stream
      .keyBy(element => (element._2.orgId -> element._2.procUid))                                                                     
      .window(TumblingEventTimeWindows.of(Time.hours(tumblingWindowHours), Time.minutes(windowStartingOffsetMinutes)))
      .reduce(new ReduceFunc)                                                                                         
      .name("Aggregated EnrichedProcess")
      .sinkTo(kafkaConnector.createKafkaSink(kProducerServers, kProduceTopic))
      .name(s"Sink -> $kProduceTopic")

编辑: 我对此进行测试的方法是使用 docker 撰写的集成测试。我正在向 Kafka 生成一个事件 => 由 Flink 作业消耗并发送到 Kafka => 检查 kafka 的内容。

当我在发送 event3 和 event4 之间放置 30 秒的睡眠时,它们会被删除。这是我所期待的行为。

    val producer = new Producer(producerTopic)

    val consumer = new Consumer(consumerTopic, groupId)
    producer.send(event1)
    producer.send(event2)
    Thread.sleep(30000)
    producer.send(event3)
    Thread.sleep(30000)
    producer.send(event4)

    val received: Iterable[(String, EnrichedProcess)] = consumer.receive[EnrichedProcess]()

但现在更奇怪的是,为什么当我将睡眠时间设置为 10 秒而不是 30 秒时,我只收到第一种情况(水印应该已经更新(周期性水印生成器的默认值为 200 毫秒)

执行摘要:

Flink 的

Non-determinism event-time-based 逻辑来自混合处理时间和事件时间——就像周期性水印生成器和空闲检测一样。只有当您永远不会有延迟事件或闲置资源时,您才能确定确定性结果。

更多详情:

如你所料

event3 = ("2022-01-01T05:25:00Z")

要迟到,只有足够大的水印成功先到,才是真正的迟到。使用 forBoundedOutOfOrderness 策略无法保证这一点——这是一个 periodic 水印生成器,每 200 毫秒生成一次水印。所以可能是在event3和event4之间产生了一个基于event2时间戳的水印。

这是一种可能的解释;根据具体情况,可能还有其他人。例如,随着所有这些睡眠的进行,水印生成器的一个并行实例空闲至少一分钟,这可能与产生所观察到的行为有关(取决于 idleness[ 的值=43=], 等等).

更多背景:

在并行度 > 1 的情况下,有多个独立的水印策略实例,每个实例都根据它们处理的事件做自己的事情。

具有多个输入通道的运算符,例如键控 window,将通过将输入水印(来自所有 non-idle 个通道)中的最小值作为它们自己的水印来组合这些水印。

回答原问题:

Flink 是否保留已经关闭的 windows 的状态? 否。一旦允许的延迟(如果有)过期,事件的状态时间 window 被清除。