Watermark 在 Flink CEP 中远远落后

Watermark fell far behind in Flink CEP

我正在使用 Flink CEP 来检测针对来自 Kafka 的事件的模式。为简单起见,事件只有一种类型。我正在尝试检测连续事件流中字段值的变化。代码如下所示

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.addSource(new FlinkKafkaConsumer[..]())
          .filter(...)
          .map(...)
          .assignTimestampsAndWatermarks(
            WatermarkStrategy.forMonotonousTimestamps[Event]().withTimestampAssigner(..)
          )
          .keyBy(...)(TypeInformation.of(classOf[...]))
    
val pattern: Pattern[Event, _] = 
          Pattern.begin[Event]("start", AfterMatchSkipStrategy.skipPastLastEvent()).times(1)
          .next("middle")
          .oneOrMore()
          .optional()
          .where(new IterativeCondition[Event] {
             override def filter(event: Event, ctx:...): Boolean = {
                 val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                 startTrafficEvent.getFieldValue().equals(event.getFieldValue())
             }
          })
          .next("end").times(1)
          .where(new IterativeCondition[Event] {
             override def filter(event: Event, ctx:...): Boolean = {
                  val startTrafficEvent = ctx.getEventsForPattern("start").iterator().next()
                  !startTrafficEvent.getFieldValue().equals(event.getFieldValue())
            }
          })
          .within(Time.seconds(30))

Kafka 主题有 104 个分区,事件均匀分布在各个分区中。当我提交作业时,parallelism 设置为 104。

来自网络 UI,有 2 个任务:第一个是 Source->filter->map->timestamp/watermark;第二个是 CepOperator->sink。每个任务有 104 个并行度。

子任务的工作量不均匀,应该来自keyBy。子任务之间的水印是不一样的,但是开始卡在一个值上,很久没有变化。从日志中,我可以看到 CEP 不断评估事件,并将匹配结果推送到下游接收器。

事件速率为10k/s,第一个任务的背压保持high,第二个任务的背压保持ok

请帮助解释 CEP 中发生的情况以及如何解决问题

谢谢

在更仔细地考虑你的问题后,我正在修改我的答案。

听起来 CEP 正在继续生成匹配项并将它们推送到接收器,但 CEP+接收器任务正在产生高背压。找出背压的原因会有帮助。

如果可以从所有分区读取事件,但水印只是勉强前进,这听起来背压严重到足以阻止事件被摄取。

我怀疑

  1. CEP 引擎中的组合爆炸,and/or
  2. 足够多的匹配以至于接收器跟不上

可能的原因。

获得更多洞察力的一些想法:

(1) 尝试使用探查器来确定 CepOperator 是否是瓶颈,并可能确定它在做什么。

(2) 禁用 CepOperator 和接收器之间的运算符链接以隔离 CEP——仅作为调试步骤。这将使您更好地了解 CEP 和接收器各自在做什么(通过指标和背压监控)。

(3) 在较小的设置中对此进行测试,并扩展 CEP 日志记录。