kafka 流会话 windows
kafka Streams session windows
您好,我正在处理 kafka 会话 window,非活动时间为 5 分钟。当达到非活动时间并且会话因密钥而下降时,我需要某种反馈。
假设我有
(A,1)
记录,其中 'A' 是关键。现在,如果我在 5 分钟内没有得到任何 'A' 密钥记录,会话将被删除。
我想在会话结束时做一些操作让我们说 (value)*2 该会话。有什么办法可以使用 Kafka Stream API
Kafka Streams 在 gap-time 通过后不会删除会话。相反,如果在 gap-time 通过后另一个具有相同键的记录到达,则将创建一个新会话并并行维护两个会话。这允许处理 out-of-order 数据。甚至可能发生,如果 out-of-order 数据落入间隙并且 "connects" 两个会话彼此合并。
会话默认保持 1 天。您可以通过 SessionWindows#until()
方法更改此设置。如果会话过期,它将被静默删除。没有通知。您还需要考虑配置参数 window.store.change.log.additional.retention.ms
:
The default retention setting is Windows#maintainMs() + 1 day. You can override this setting by specifying StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG in the StreamsConfig.
因此,如果你想在时间过去时做出反应,你应该研究允许你根据 "even time progress" 或 wall-clock 时间注册定期回调(某种计时器)的标点符号。这允许您在某个时间段内未更新会话并且您认为它是 "completed".
时做出反应
您好,我正在处理 kafka 会话 window,非活动时间为 5 分钟。当达到非活动时间并且会话因密钥而下降时,我需要某种反馈。 假设我有
(A,1)
记录,其中 'A' 是关键。现在,如果我在 5 分钟内没有得到任何 'A' 密钥记录,会话将被删除。
我想在会话结束时做一些操作让我们说 (value)*2 该会话。有什么办法可以使用 Kafka Stream API
Kafka Streams 在 gap-time 通过后不会删除会话。相反,如果在 gap-time 通过后另一个具有相同键的记录到达,则将创建一个新会话并并行维护两个会话。这允许处理 out-of-order 数据。甚至可能发生,如果 out-of-order 数据落入间隙并且 "connects" 两个会话彼此合并。
会话默认保持 1 天。您可以通过 SessionWindows#until()
方法更改此设置。如果会话过期,它将被静默删除。没有通知。您还需要考虑配置参数 window.store.change.log.additional.retention.ms
:
The default retention setting is Windows#maintainMs() + 1 day. You can override this setting by specifying StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG in the StreamsConfig.
因此,如果你想在时间过去时做出反应,你应该研究允许你根据 "even time progress" 或 wall-clock 时间注册定期回调(某种计时器)的标点符号。这允许您在某个时间段内未更新会话并且您认为它是 "completed".
时做出反应