当使用带有更改日志的 RocksDb 状态存储时,Kafka Stream 提供哪些保证?
Which guarantees does Kafka Stream provide when using a RocksDb state store with changelog?
我正在构建一个 Kafka Streams 应用程序,它通过将每个新计算的对象与最后一个已知对象进行比较来生成更改事件。
因此,对于输入主题上的每条消息,我都会更新状态存储中的一个对象,每隔一段时间(使用标点符号),我会对这个对象应用计算并将结果与之前的计算结果进行比较(来自另一家国营商店)。
为确保此操作的一致性,我在标点触发器后执行以下操作:
- 将元组写入状态存储
- 比较两个值,创建更改事件并
context.forward
它们。所以事件转到结果主题。
- 用 new_value 交换元组并将其写入状态存储
我将此元组用于应用程序崩溃或重新平衡的场景,因此我始终可以在继续之前发送正确的事件集。
现在,我注意到生成的事件并不总是一致的,尤其是在应用程序频繁重新平衡的情况下。看起来在极少数情况下,Kafka Streams 应用程序会向结果主题发出事件,但更改日志主题尚未更新。换句话说,我为结果主题制作了一些东西,但我的更改日志主题还没有处于相同状态。
所以,当我执行 stateStore.put()
并且方法调用 returns 成功时,是否可以保证它何时会出现在更新日志主题上?
我可以强制刷新更新日志吗?当我执行 context.commit()
时,flush+commit 何时会发生?
要获得完全的一致性,您需要启用 processing.guarantee="exaclty_once"
-- 否则,由于潜在的错误,您可能会得到不一致的结果。
如果您想继续使用 "at_least_once",您可能想要使用单个存储,并在 处理完成后更新存储 (即调用 forward()
).这最大限度地减少了出现不一致的时间 window。
是的,如果您调用 context.commit()
,在提交输入主题偏移量之前,所有存储都将刷新到磁盘,并且所有待处理的生产者写入也将被刷新。
我正在构建一个 Kafka Streams 应用程序,它通过将每个新计算的对象与最后一个已知对象进行比较来生成更改事件。
因此,对于输入主题上的每条消息,我都会更新状态存储中的一个对象,每隔一段时间(使用标点符号),我会对这个对象应用计算并将结果与之前的计算结果进行比较(来自另一家国营商店)。
为确保此操作的一致性,我在标点触发器后执行以下操作:
- 将元组写入状态存储
- 比较两个值,创建更改事件并
context.forward
它们。所以事件转到结果主题。 - 用 new_value 交换元组并将其写入状态存储
我将此元组用于应用程序崩溃或重新平衡的场景,因此我始终可以在继续之前发送正确的事件集。
现在,我注意到生成的事件并不总是一致的,尤其是在应用程序频繁重新平衡的情况下。看起来在极少数情况下,Kafka Streams 应用程序会向结果主题发出事件,但更改日志主题尚未更新。换句话说,我为结果主题制作了一些东西,但我的更改日志主题还没有处于相同状态。
所以,当我执行 stateStore.put()
并且方法调用 returns 成功时,是否可以保证它何时会出现在更新日志主题上?
我可以强制刷新更新日志吗?当我执行 context.commit()
时,flush+commit 何时会发生?
要获得完全的一致性,您需要启用 processing.guarantee="exaclty_once"
-- 否则,由于潜在的错误,您可能会得到不一致的结果。
如果您想继续使用 "at_least_once",您可能想要使用单个存储,并在 处理完成后更新存储 (即调用 forward()
).这最大限度地减少了出现不一致的时间 window。
是的,如果您调用 context.commit()
,在提交输入主题偏移量之前,所有存储都将刷新到磁盘,并且所有待处理的生产者写入也将被刷新。