Flink 水印
Flink Watermark
在Flink中,我找到了2种设置watermark的方式,
第一个是
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(5000)
第二个是
env.addSource(
new FlinkKafkaConsumer[...](...)
).assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness[...](Duration.ofSeconds(10)).withTimestampAssigner(...)
)
我想知道哪个最终会生效
这两者之间完全没有冲突——它们处理的是不同的问题。指定的所有内容都会生效。
第一个,
env.getConfig.setAutoWatermarkInterval(5000)
指定您希望生成水印的频率(每 5000 毫秒生成一个水印)。如果未指定,将使用默认值 200 毫秒。
第二个,
env.addSource(
new FlinkKafkaConsumer[...](...)
).assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness[...](Duration.ofSeconds(10)).withTimestampAssigner(...)
)
正在指定如何计算这些水印的详细信息。也就是说,它们应该由 FlinkKafkaConsumer
使用 BoundedOutOfOrderness
策略生成,有 10 秒的有限延迟。 WatermarkStrategy
还需要一个时间戳分配器。
没有默认值 WatermarkStrategy
,因此如果您想使用事件时间,则需要类似第二个代码片段的内容。
在Flink中,我找到了2种设置watermark的方式,
第一个是
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(5000)
第二个是
env.addSource(
new FlinkKafkaConsumer[...](...)
).assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness[...](Duration.ofSeconds(10)).withTimestampAssigner(...)
)
我想知道哪个最终会生效
这两者之间完全没有冲突——它们处理的是不同的问题。指定的所有内容都会生效。
第一个,
env.getConfig.setAutoWatermarkInterval(5000)
指定您希望生成水印的频率(每 5000 毫秒生成一个水印)。如果未指定,将使用默认值 200 毫秒。
第二个,
env.addSource(
new FlinkKafkaConsumer[...](...)
).assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness[...](Duration.ofSeconds(10)).withTimestampAssigner(...)
)
正在指定如何计算这些水印的详细信息。也就是说,它们应该由 FlinkKafkaConsumer
使用 BoundedOutOfOrderness
策略生成,有 10 秒的有限延迟。 WatermarkStrategy
还需要一个时间戳分配器。
没有默认值 WatermarkStrategy
,因此如果您想使用事件时间,则需要类似第二个代码片段的内容。