对 intervalJoin 感到困惑
Confused about intervalJoin
我正在尝试提出一种解决方案,该解决方案涉及在 加入操作之后应用一些逻辑 从多个 EventB
中的 streamB
选择一个事件]s。它就像一个 reduce 函数,但它只有 returns 1 个元素,而不是增量执行。因此最终结果将是单个 (EventA
, EventB
) 对,而不是 1 EventA
和多个 EventB
.
的叉积
streamA
.keyBy((a: EventA) => a.common_key)
.intervalJoin(
streamB
.keyBy((b: EventB) => b.common_key)
)
.between(Time.days(-30), Time.days(0))
.process(new MyJoinFunction)
数据将被摄取(假设它们具有相同的密钥):
EventB ts: 1616686386000
EventB ts: 1616686387000
EventB ts: 1616686388000
EventB ts: 1616686389000
EventA ts: 1616686390000
每个 EventA
密钥保证只到达一次。
假设像上面这样的连接操作,它生成了 1 EventA
和 4 EventB
,成功连接并收集在 MyJoinFunction
中。现在我想要做的是,立即访问这些值并执行一些逻辑以正确地将 EventA
匹配到 恰好一个 EventB
。
例如,对于上面的数据集,我需要 (EventA
1616686390000
, EventB
1616686387000
).
MyJoinFunction
将为每个 (EventA
, EventB
) 对调用,但我希望在此之后执行一个操作,它允许我访问一个迭代器,以便我可以查看每个 EventA
.
的所有 EventB
个事件
我知道我可以在加入后应用另一个 windowing 操作来对所有对进行分组,但我希望这在加入成功后立即发生。因此,如果可能的话,我想避免添加另一个 window,因为我的 window 已经很大(30 天)。
Flink 是这个用例的正确选择还是我完全错了?
这可以实现为 KeyedCoProcessFunction
。您可以通过公共密钥为两个流设置密钥,将它们连接起来,然后一起处理两个流。
您可以使用 ListState
存储来自 B 的事件(对于给定的键),并使用 ValueState
存储 A 的事件(同样,对于给定的键)。您可以使用事件时间计时器来了解查看 ListState 中的 B 事件并生成结果的时间。完成后不要忘记清除状态。
如果您不熟悉 Flink 的这一部分 API,tutorial on Process Functions 应该会有帮助。
我正在尝试提出一种解决方案,该解决方案涉及在 加入操作之后应用一些逻辑 从多个 EventB
中的 streamB
选择一个事件]s。它就像一个 reduce 函数,但它只有 returns 1 个元素,而不是增量执行。因此最终结果将是单个 (EventA
, EventB
) 对,而不是 1 EventA
和多个 EventB
.
streamA
.keyBy((a: EventA) => a.common_key)
.intervalJoin(
streamB
.keyBy((b: EventB) => b.common_key)
)
.between(Time.days(-30), Time.days(0))
.process(new MyJoinFunction)
数据将被摄取(假设它们具有相同的密钥):
EventB ts: 1616686386000
EventB ts: 1616686387000
EventB ts: 1616686388000
EventB ts: 1616686389000
EventA ts: 1616686390000
每个 EventA
密钥保证只到达一次。
假设像上面这样的连接操作,它生成了 1 EventA
和 4 EventB
,成功连接并收集在 MyJoinFunction
中。现在我想要做的是,立即访问这些值并执行一些逻辑以正确地将 EventA
匹配到 恰好一个 EventB
。
例如,对于上面的数据集,我需要 (EventA
1616686390000
, EventB
1616686387000
).
MyJoinFunction
将为每个 (EventA
, EventB
) 对调用,但我希望在此之后执行一个操作,它允许我访问一个迭代器,以便我可以查看每个 EventA
.
EventB
个事件
我知道我可以在加入后应用另一个 windowing 操作来对所有对进行分组,但我希望这在加入成功后立即发生。因此,如果可能的话,我想避免添加另一个 window,因为我的 window 已经很大(30 天)。
Flink 是这个用例的正确选择还是我完全错了?
这可以实现为 KeyedCoProcessFunction
。您可以通过公共密钥为两个流设置密钥,将它们连接起来,然后一起处理两个流。
您可以使用 ListState
存储来自 B 的事件(对于给定的键),并使用 ValueState
存储 A 的事件(同样,对于给定的键)。您可以使用事件时间计时器来了解查看 ListState 中的 B 事件并生成结果的时间。完成后不要忘记清除状态。
如果您不熟悉 Flink 的这一部分 API,tutorial on Process Functions 应该会有帮助。