Flink windows 和延迟事件

Flink windows and late events

我有一个用例,我需要处理与正常事件不同的延迟事件:如果事件在其 window 关闭后到达,则应将其发送到另一条路径。

我认为 .sideOutputLateData(..) 会为我解决这个问题。它在正常情况下(即使用真实世界的数据)。但是如果我想测试它,用伪造的数据,它就停止工作了。

我希望是这样的:

val env = StreamExecutionEnvironment.createLocalEnvironment()
env.setParallelism(1)

val events: DataStream[(Int, Long)] = env.fromElements(
  (1, 1),
  (1, 15),
  (1, 25),
  (1, 8) //late Event
)
val lateEvents = OutputTag[(Int, Long)]("lateEvents")

val windowedSum = events
  .assignAscendingTimestamps(e => e._2)
  .windowAll(TumblingEventTimeWindows.of(time.Time.milliseconds(10)))
  .sideOutputLateData(lateEvents)
  .sum(position=0)

val lateEventsStream = windowedSum
  .getSideOutput(lateEvents)
  // Handle differently
  .map(e => (e._1 + 100, e._2))

windowedSum.print()
lateEventsStream.print()
// execute program
env.execute("Flink Scala watermarking test")

会导致:

[info] (1,1)
[info] (1,15)
[info] (1,25)
[info] (101, 8)

相反,我得到:

[info] (2,1)
[info] (1,15)
[info] (1,25)

如果我使用 socketTextStream 作为具有相同数据的源,它会按预期工作。

这告诉我,不知何故,水印没有像它应该以非常快的数据输入那样前进。 我尝试将 setAutoWatermarkInterval 调整为一个非常小的值,但没有成功。

我错过了什么吗?我怎样才能测试我的工作?

您可以尝试使用 WatermarkStrategy 为每个元素发出 Watermark 而不是定期生成它们。您可以通过在实现 WatermarkGenerator 时在 onElement 内发出 Watermark 来做到这一点,如 here 所述。这是最好和最可靠的测试方法。

感谢@Dominik Wosinski 为我指明了正确的方向。 对于迷失在错综复杂的 Flink 文档中的其他人,我将 post 在这里给出我的解决方案:

正如所怀疑的那样,问题是快速输入数据没有推进水印。这是因为默认情况下,Flink 将每 200 毫秒检查一次水印是否应该前进。您可以使用

缩短此延迟(增加系统负载)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(10) // or even lower

显然这对于​​作为 4 元素列表的快速输入是不够的。

解决方案是在每个事件中发出水印(请注意,在生产环境中不推荐)。

要实现这样的解决方案,我们需要扩展 WatermarkGenerator class:

class MyPunctuatedWatermarkAssigner extends WatermarkGenerator[(Int, Long)] {

  override def onEvent(
      event: (Int, Long),
      eventTimestamp: Long,
      output: WatermarkOutput
  ): Unit = {
    // emit at every event
    output.emitWatermark(new Watermark(event._2))
  }

  // do nothing at AutoWatermarkInterval
  override def onPeriodicEmit(output: WatermarkOutput): Unit = {}
}

要将此生成器分配给流,我们首先需要创建一个 WatermarkStrategy:

class MyStrategy extends WatermarkStrategy[(Int, Long)] {
  override def createWatermarkGenerator(
      context: WatermarkGeneratorSupplier.Context
  ): WatermarkGenerator[(Int, Long)] = new MyPunctuatedWatermarkAssigner
}

(这个class也可以实现一个可选的createTimestampAssigner方法)

然后我们可以在流中使用它:

eventsStream
   .assignTimestampsAndWatermarks(new MyStrategy())