Flink CEP 未检测到最后一条记录

Flink CEP not detecting last Record

我的代码有助于确定 Flink CEP 中记录的数量是否超过 25。因此,当我使用处理时间时,它匹配所有模式,但当我使用事件时间时,它不匹配最后一条记录。

{"trasanction_id":196,"customer_id":27,"datetime":"1576499008876","amount":6094,"state":"SUCCESS"}
{"trasanction_id":197,"customer_id":27,"datetime":"1576499017565","amount":547,"state":"SUCCESS"}
{"trasanction_id":198,"customer_id":27,"datetime":"1576499029116","amount":6824,"state":"SUCCESS"}
{"trasanction_id":196,"customer_id":27,"datetime":"1576499053211","amount":6094,"state":"SUCCESS"}
{"trasanction_id":197,"customer_id":28,"datetime":"1576499063867","amount":547,"state":"FAILED"}
{"trasanction_id":198,"customer_id":28,"datetime":"1576499073566","amount":6824,"state":"FAILED"}

以上是我的记录。我有兴趣匹配事件时间中数量大于 25 的每个事件。理想情况下,它应该检测所有记录(它在处理时间中执行),因为所有记录的数量都大于 25。截至目前,我正在使用 3 秒的 boundedoutofordertime 提取技术来处理乱序。

请帮助我理解这一点。提前致谢! :)

因为 CEP 匹配时间模式,所以在使用事件时间戳时,事件首先按时间戳排序。此排序涉及缓冲每个事件,直到水印赶上该事件,以便为任何较早的事件先到达留出时间。

因为您的水印被配置为落后于您的流的前缘(即,迄今为止最大的时间戳)3 秒,所以您的流的水印永远不会达到最后一个事件的时间戳。这就是最后一个事件未被处理的原因。 Flink 正在等待查看是否有任何更早的事件将要到达,并且不会放弃,直到水印表明流已通过最后一个事件的时间戳完成。