Flink 中的会话 Windows 给出了意外的结果

Session Windows in Flink giving unexpected results

我有一个记录流,它是 'keyed by' 两个字段,然后分配了一个间隔为 30 秒的会话 window。我使用附加到记录的 'time stamp' 作为事件时间。我正在使用 'assignAscendingTimestamps' 水印。

例如考虑以下记录。流由 (user,place) 键控。

记录 1:用户 1,地点 1,时间戳 t1

Record2:user2,place1,t1 后 30 秒的时间戳

Record3 : user1, place1, t1 30 秒内的时间戳

Record4 : user1, place1, t1 后 30 秒的时间戳

Record2 属于 user2,因此它属于不同的桶,因为流是键控的。因此我希望 Record1、Record3 和 Record4 属于一个桶,而 Record2 属于另一个桶。

桶 1

记录 1:用户 1,地点 1,时间戳 t1

Record3 : user1, place1, t1 30 秒内的时间戳

Record4 - user1,place1,t1 后 30 秒的时间戳

Bucket2

Record2:user2,place1,t1 后 30 秒的时间戳

据我了解,包含 Record1 和 Record3 的会话 window 只有在 Record4 到达时才会被触发。但是当我 运行 代码时,当 Record2 到达时触发仅包含 Record1 的会话,因为 Record2 的时间戳在 Record1 的时间戳的时间间隔(30 秒)之后,尽管 Record2 的密钥不同。我浏览了 Flink 的文档和几个我可以在网上找到的 Session Windows 示例。但是我无法解决这个问题。我在这里缺少什么吗?这可能是因为上升时间戳水印?

问题是 assignAscendingTimestamps 要求您的时间戳在所有键上单调递增。原因是Flink无法生成per key watermarks。

更新

由于 Flink 无法为每个键生成水印,因此必须生成对所有元素都有效的水印。如果每个键的时间戳是单调的,但不是所有键的时间戳都是单调的,那么您必须定义两个键之间的最大乱序(时间戳的差异)。通过从元素的时间戳中无序地减去它,您将获得有效的水印。另见 BoundedOutOfOrdernessTimestampExtractor。但是,请注意,如果元素以更大的乱序到达,那么这也会中断。