Watermark 在 Flink CEP 中远远落后
Watermark fell far behind in Flink CEP
我正在使用 Flink CEP 来检测针对来自 Kafka 的事件的模式。为简单起见,事件只有一种类型。我正在尝试检测连续事件流中字段值的变化。代码如下所示
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
.filter(...)
.map(...)
.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
)
.keyBy(...)(TypeInformation.of(classOf[...]))
val pattern: Pattern[Event, _] =
Pattern.begin[Event]("start", AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
.next("middle")
.oneOrMore()
.optional()
.where(new IterativeCondition[Event] {
override def filter(event: Event, ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.next("end").times(1)
.where(new IterativeCondition[Event] {
override def filter(event: Event, ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
!startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.within(Time.seconds(30))
Kafka 主题有 104 个分区,事件均匀分布在各个分区中。当我提交作业时,parallelism
设置为 104。
来自网络 UI,有 2 个任务:第一个是 Source->filter->map->timestamp/watermark
;第二个是 CepOperator->sink
。每个任务有 104 个并行度。
子任务的工作量不均匀,应该来自keyBy
。子任务之间的水印是不一样的,但是开始卡在一个值上,很久没有变化。从日志中,我可以看到 CEP 不断评估事件,并将匹配结果推送到下游接收器。
事件速率为10k/s,第一个任务的背压保持high
,第二个任务的背压保持ok
。
请帮助解释 CEP 中发生的情况以及如何解决问题
谢谢
在更仔细地考虑你的问题后,我正在修改我的答案。
听起来 CEP 正在继续生成匹配项并将它们推送到接收器,但 CEP+接收器任务正在产生高背压。找出背压的原因会有帮助。
如果可以从所有分区读取事件,但水印只是勉强前进,这听起来背压严重到足以阻止事件被摄取。
我怀疑
- CEP 引擎中的组合爆炸,and/or
- 足够多的匹配以至于接收器跟不上
可能的原因。
获得更多洞察力的一些想法:
(1) 尝试使用探查器来确定 CepOperator 是否是瓶颈,并可能确定它在做什么。
(2) 禁用 CepOperator 和接收器之间的运算符链接以隔离 CEP——仅作为调试步骤。这将使您更好地了解 CEP 和接收器各自在做什么(通过指标和背压监控)。
(3) 在较小的设置中对此进行测试,并扩展 CEP 日志记录。
我正在使用 Flink CEP 来检测针对来自 Kafka 的事件的模式。为简单起见,事件只有一种类型。我正在尝试检测连续事件流中字段值的变化。代码如下所示
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
.filter(...)
.map(...)
.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
)
.keyBy(...)(TypeInformation.of(classOf[...]))
val pattern: Pattern[Event, _] =
Pattern.begin[Event]("start", AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
.next("middle")
.oneOrMore()
.optional()
.where(new IterativeCondition[Event] {
override def filter(event: Event, ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.next("end").times(1)
.where(new IterativeCondition[Event] {
override def filter(event: Event, ctx:...): Boolean = {
val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
!startTrafficEvent.getFieldValue().equals(event.getFieldValue())
}
})
.within(Time.seconds(30))
Kafka 主题有 104 个分区,事件均匀分布在各个分区中。当我提交作业时,parallelism
设置为 104。
来自网络 UI,有 2 个任务:第一个是 Source->filter->map->timestamp/watermark
;第二个是 CepOperator->sink
。每个任务有 104 个并行度。
子任务的工作量不均匀,应该来自keyBy
。子任务之间的水印是不一样的,但是开始卡在一个值上,很久没有变化。从日志中,我可以看到 CEP 不断评估事件,并将匹配结果推送到下游接收器。
事件速率为10k/s,第一个任务的背压保持high
,第二个任务的背压保持ok
。
请帮助解释 CEP 中发生的情况以及如何解决问题
谢谢
在更仔细地考虑你的问题后,我正在修改我的答案。
听起来 CEP 正在继续生成匹配项并将它们推送到接收器,但 CEP+接收器任务正在产生高背压。找出背压的原因会有帮助。
如果可以从所有分区读取事件,但水印只是勉强前进,这听起来背压严重到足以阻止事件被摄取。
我怀疑
- CEP 引擎中的组合爆炸,and/or
- 足够多的匹配以至于接收器跟不上
可能的原因。
获得更多洞察力的一些想法:
(1) 尝试使用探查器来确定 CepOperator 是否是瓶颈,并可能确定它在做什么。
(2) 禁用 CepOperator 和接收器之间的运算符链接以隔离 CEP——仅作为调试步骤。这将使您更好地了解 CEP 和接收器各自在做什么(通过指标和背压监控)。
(3) 在较小的设置中对此进行测试,并扩展 CEP 日志记录。