是否可以加入 2 个 Kafka KStreams,其中 JoinWindows 持续时间存储在 1 个流的对象中?
Is it possible to join 2 Kafka KStreams where the JoinWindows duration is stored in the object of 1 of the streams?
假设我有 2 个流:
- TimeWindow(开始时间、结束时间)
- 数字(带时间戳)
是否可以使用 DSL API 或进程 API 加入流,以便输出包含 TimeWindow 对象,该对象包含指定时间范围内的数字总和在 TimeWindow 中?
具体来说,你如何设置 XXX,它是 win.getDuration() 中的持续时间存储,其中 win 是在ValueJoiner.
timeWindow.join(
numbers,
(ValueJoiner<TimeWindow, Number, TimeWindow>) (win, num) -> win.addToTotal(num),
new JoinWindows(XXX, 0)
).to("output_Topic");
后面的JoinWindows为0,因为TimeWindow的时间戳是endtime。 XXX 持续时间应计算为 TimeWindows 结束时间 - 开始时间(以毫秒为单位)。
非常感谢您的帮助!
感谢 Matthias 的煽动,我最终回滚到使用处理器 API 实现 TimestampExtractors 并使用内存状态存储(默认使用 RockDB)来实现此功能。
假设我有 2 个流:
- TimeWindow(开始时间、结束时间)
- 数字(带时间戳)
是否可以使用 DSL API 或进程 API 加入流,以便输出包含 TimeWindow 对象,该对象包含指定时间范围内的数字总和在 TimeWindow 中?
具体来说,你如何设置 XXX,它是 win.getDuration() 中的持续时间存储,其中 win 是在ValueJoiner.
timeWindow.join(
numbers,
(ValueJoiner<TimeWindow, Number, TimeWindow>) (win, num) -> win.addToTotal(num),
new JoinWindows(XXX, 0)
).to("output_Topic");
后面的JoinWindows为0,因为TimeWindow的时间戳是endtime。 XXX 持续时间应计算为 TimeWindows 结束时间 - 开始时间(以毫秒为单位)。
非常感谢您的帮助!
感谢 Matthias 的煽动,我最终回滚到使用处理器 API 实现 TimestampExtractors 并使用内存状态存储(默认使用 RockDB)来实现此功能。