Flink 水印

Flink Watermark

在Flink中,我找到了2种设置watermark的方式,

第一个是

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(5000)

第二个是

env.addSource(
    new FlinkKafkaConsumer[...](...)
).assignTimestampsAndWatermarks(
   WatermarkStrategy.forBoundedOutOfOrderness[...](Duration.ofSeconds(10)).withTimestampAssigner(...)
)

我想知道哪个最终会生效

这两者之间完全没有冲突——它们处理的是不同的问题。指定的所有内容都会生效。

第一个,

env.getConfig.setAutoWatermarkInterval(5000)

指定您希望生成水印的频率(每 5000 毫秒生成一个水印)。如果未指定,将使用默认值 200 毫秒。

第二个,

env.addSource(
    new FlinkKafkaConsumer[...](...)
).assignTimestampsAndWatermarks(
   WatermarkStrategy.forBoundedOutOfOrderness[...](Duration.ofSeconds(10)).withTimestampAssigner(...)
)

正在指定如何计算这些水印的详细信息。也就是说,它们应该由 FlinkKafkaConsumer 使用 BoundedOutOfOrderness 策略生成,有 10 秒的有限延迟。 WatermarkStrategy 还需要一个时间戳分配器。

没有默认值 WatermarkStrategy,因此如果您想使用事件时间,则需要类似第二个代码片段的内容。