Flink WaterMark 和触发器 - 迟到的元素不会在事件时间被丢弃?
Flink WaterMark And Triggers - Late elements not discarded on event time?
我对 Flink 在事件时间加水印时如何处理延迟元素感到有些困惑。
我的理解是,当 Flink 读取数据流时,如果看到任何事件时间大于当前水印的数据,水印时间就会增加。然后,任何覆盖时间严格小于水印的 windows 都会被触发驱逐(假设没有延迟津贴。
但是,举这个最小的例子:
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.log4j.{Level, Logger}
object EventTimeExample {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
case class ExampleType(time: Long, value: Long)
def main(args: Array[String]) {
// Set up environment
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Example S3 path
val simple = env.fromCollection(Seq(
ExampleType(1525132800000L, 1),
ExampleType(1525132800000L, 2) ,
ExampleType(1525132920000L, 3),
ExampleType(1525132800000L, 4)
))
.assignAscendingTimestamps(_.time)
val windows = simple
.windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
.apply{
(window, iter, collector: Collector[(Long, Long, String)]) => {
collector.collect(window.getStart, window.getEnd, iter.map(_.value).toString())
}
}
windows.print
env.execute("TimeStampExample")
}
}
运行的结果是:
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))
但是,如果我的理解是正确的,4
不应该包含在第一个window这里,因为水印时间应该在值3
记录被更新时更新达到。
现在我认识到这是一个微不足道的例子,但不理解这一点会使理解更复杂的流程变得困难。
你的理解基本上是正确的,但是这里还有几点需要注意。
首先,您使用了 assignAscendingTimestamps()
,它只能在事件流完全有序(按时间戳)时使用,而这里不是这种情况。当你 运行 这个应用程序时,你应该看到这个警告:
WARN org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor - Timestamp monotony violated: 1525132800000 < 1525132920000
这里起作用的另一个因素是 AscendingTimestampExtractor
不会为每个经过的流元素更新当前 Watermark。这是一个周期性水印生成器的示例,它将每 n 毫秒向流中注入一个 Watermark
,其中 n 由 ExecutionConfig.setAutoWatermarkInterval(...)
定义,默认为到 200 毫秒。这就是事件 #4 潜入第一个 window.
的方式
要获得您期望的结果,您可以实施标点符号水印生成器,配置为为每个事件生成水印:
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[ExampleType] {
override def extractTimestamp(element: ExampleType, previousElementTimestamp: Long): Long = {
element.time
}
override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp)
}
}
然后你会像这样使用它:
val simple = env.fromCollection(Seq(
ExampleType(1525132800000L, 1),
ExampleType(1525132800000L, 2) ,
ExampleType(1525132920000L, 3),
ExampleType(1525132800000L, 4)
))
.assignTimestampsAndWatermarks(new PunctuatedAssigner)
现在您的示例产生了这些结果:
(1525132800000,1525132860000,List(1, 2))
(1525132920000,1525132980000,List(3))
活动 #4 已被取消,因为它来晚了。这可以通过放松水印生成器来调整,以适应一定程度的乱序。例如,
override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp - 200000)
}
然后产生这些结果:
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))
或者您可以配置 windows 以允许延迟事件
val windows = simple
.windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
.allowedLateness(Time.seconds(200))
...
然后导致第一个 window 触发两次:
(1525132800000,1525132860000,List(1, 2))
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))
请注意,由于处理水印会产生一些开销,您通常不希望以这种方式使用标点水印(每个事件都有一个水印)。对于大多数应用程序,基于 BoundedOutOfOrdernessTimestampExtractor
的周期性水印是更好的选择。
如果使用 BoundedOutOfOrdernessTimestampExtractor,则在新事件到来之前不会输出最后的计算。如果我们在水印中使用 SystemTime,它会起作用,但是当您 re-run 对于带有嵌入时间戳的消息(过去的事件)时,它不会计算那些。
我对 Flink 在事件时间加水印时如何处理延迟元素感到有些困惑。
我的理解是,当 Flink 读取数据流时,如果看到任何事件时间大于当前水印的数据,水印时间就会增加。然后,任何覆盖时间严格小于水印的 windows 都会被触发驱逐(假设没有延迟津贴。
但是,举这个最小的例子:
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.log4j.{Level, Logger}
object EventTimeExample {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
case class ExampleType(time: Long, value: Long)
def main(args: Array[String]) {
// Set up environment
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Example S3 path
val simple = env.fromCollection(Seq(
ExampleType(1525132800000L, 1),
ExampleType(1525132800000L, 2) ,
ExampleType(1525132920000L, 3),
ExampleType(1525132800000L, 4)
))
.assignAscendingTimestamps(_.time)
val windows = simple
.windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
.apply{
(window, iter, collector: Collector[(Long, Long, String)]) => {
collector.collect(window.getStart, window.getEnd, iter.map(_.value).toString())
}
}
windows.print
env.execute("TimeStampExample")
}
}
运行的结果是:
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))
但是,如果我的理解是正确的,4
不应该包含在第一个window这里,因为水印时间应该在值3
记录被更新时更新达到。
现在我认识到这是一个微不足道的例子,但不理解这一点会使理解更复杂的流程变得困难。
你的理解基本上是正确的,但是这里还有几点需要注意。
首先,您使用了 assignAscendingTimestamps()
,它只能在事件流完全有序(按时间戳)时使用,而这里不是这种情况。当你 运行 这个应用程序时,你应该看到这个警告:
WARN org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor - Timestamp monotony violated: 1525132800000 < 1525132920000
这里起作用的另一个因素是 AscendingTimestampExtractor
不会为每个经过的流元素更新当前 Watermark。这是一个周期性水印生成器的示例,它将每 n 毫秒向流中注入一个 Watermark
,其中 n 由 ExecutionConfig.setAutoWatermarkInterval(...)
定义,默认为到 200 毫秒。这就是事件 #4 潜入第一个 window.
要获得您期望的结果,您可以实施标点符号水印生成器,配置为为每个事件生成水印:
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[ExampleType] {
override def extractTimestamp(element: ExampleType, previousElementTimestamp: Long): Long = {
element.time
}
override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp)
}
}
然后你会像这样使用它:
val simple = env.fromCollection(Seq(
ExampleType(1525132800000L, 1),
ExampleType(1525132800000L, 2) ,
ExampleType(1525132920000L, 3),
ExampleType(1525132800000L, 4)
))
.assignTimestampsAndWatermarks(new PunctuatedAssigner)
现在您的示例产生了这些结果:
(1525132800000,1525132860000,List(1, 2))
(1525132920000,1525132980000,List(3))
活动 #4 已被取消,因为它来晚了。这可以通过放松水印生成器来调整,以适应一定程度的乱序。例如,
override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp - 200000)
}
然后产生这些结果:
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))
或者您可以配置 windows 以允许延迟事件
val windows = simple
.windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
.allowedLateness(Time.seconds(200))
...
然后导致第一个 window 触发两次:
(1525132800000,1525132860000,List(1, 2))
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))
请注意,由于处理水印会产生一些开销,您通常不希望以这种方式使用标点水印(每个事件都有一个水印)。对于大多数应用程序,基于 BoundedOutOfOrdernessTimestampExtractor
的周期性水印是更好的选择。
如果使用 BoundedOutOfOrdernessTimestampExtractor,则在新事件到来之前不会输出最后的计算。如果我们在水印中使用 SystemTime,它会起作用,但是当您 re-run 对于带有嵌入时间戳的消息(过去的事件)时,它不会计算那些。