对 intervalJoin 感到困惑

Confused about intervalJoin

我正在尝试提出一种解决方案,该解决方案涉及在 加入操作之后应用一些逻辑 从多个 EventB 中的 streamB 选择一个事件]s。它就像一个 reduce 函数,但它只有 returns 1 个元素,而不是增量执行。因此最终结果将是单个 (EventA, EventB) 对,而不是 1 EventA 和多个 EventB.

的叉积
streamA
      .keyBy((a: EventA) => a.common_key)
      .intervalJoin(
          streamB
            .keyBy((b: EventB) => b.common_key)
        )
      .between(Time.days(-30), Time.days(0))
      .process(new MyJoinFunction)

数据将被摄取(假设它们具有相同的密钥):

EventB ts: 1616686386000

EventB ts: 1616686387000

EventB ts: 1616686388000

EventB ts: 1616686389000

EventA ts: 1616686390000

每个 EventA 密钥保证只到达一次。

假设像上面这样的连接操作,它生成了 1 EventA 和 4 EventB,成功连接并收集在 MyJoinFunction 中。现在我想要做的是,立即访问这些值并执行一些逻辑以正确地将 EventA 匹配到 恰好一个 EventB。 例如,对于上面的数据集,我需要 (EventA 1616686390000, EventB 1616686387000).

MyJoinFunction 将为每个 (EventA, EventB) 对调用,但我希望在此之后执行一个操作,它允许我访问一个迭代器,以便我可以查看每个 EventA.

的所有 EventB 个事件

我知道我可以在加入后应用另一个 windowing 操作来对所有对进行分组,但我希望这在加入成功后立即发生。因此,如果可能的话,我想避免添加另一个 window,因为我的 window 已经很大(30 天)。

Flink 是这个用例的正确选择还是我完全错了?

这可以实现为 KeyedCoProcessFunction。您可以通过公共密钥为两个流设置密钥,将它们连接起来,然后一起处理两个流。

您可以使用 ListState 存储来自 B 的事件(对于给定的键),并使用 ValueState 存储 A 的事件(同样,对于给定的键)。您可以使用事件时间计时器来了解查看 ListState 中的 B 事件并生成结果的时间。完成后不要忘记清除状态。

如果您不熟悉 Flink 的这一部分 API,tutorial on Process Functions 应该会有帮助。