Kafka Streams - 处理器上下文提交
Kafka Streams - Processor context commit
我们是否应该自己在 Processor
实现中调用 processorContext.commit()
?我的意思是在预定的 Punctuator
实现或 process
方法中调用 commit
方法。
我们应该在哪些用例中这样做,我们是否需要这样做?该问题与 transform()
和处理器 API.
的 Kafka DSL 有关
似乎 Kafka Streams 会自行处理它,同时调用 processorContext.commit()
并不能保证它会立即完成。
可以调用 commit()
-- 无论是从处理器还是从标点符号 -- 这就是提供此 API 的原因。
虽然 Kafka Streams 按固定(可配置)间隔提交,但您可以在使用它时请求中间提交。一个示例用例是,您通常进行廉价计算,但有时您会做一些昂贵的事情并且希望在此操作后尽快提交而不是等待下一个提交间隔(以减少在昂贵的操作和下一个提交间隔)。另一个用例是,如果您将提交间隔设置为 MAX_VALUE
有效 "disables" 定期提交并根据您的业务逻辑决定何时提交。
我想,对于大多数用例来说,调用 commit()
并不是必需的。
对于用例,我在处理器处理方法中批处理一定数量的记录,如果批处理大小达到一定数量(比如 10),则将批处理的记录从处理函数写入文件。
假设我们在提交发生之前将一批记录写入文件和系统崩溃(因为我们无法调用显式提交)。下次流开始并且处理器处理上次提交偏移量的记录。这意味着我们可以将一些重复数据写入文件。有没有办法避免写重复数据??
我们是否应该自己在 Processor
实现中调用 processorContext.commit()
?我的意思是在预定的 Punctuator
实现或 process
方法中调用 commit
方法。
我们应该在哪些用例中这样做,我们是否需要这样做?该问题与 transform()
和处理器 API.
似乎 Kafka Streams 会自行处理它,同时调用 processorContext.commit()
并不能保证它会立即完成。
可以调用 commit()
-- 无论是从处理器还是从标点符号 -- 这就是提供此 API 的原因。
虽然 Kafka Streams 按固定(可配置)间隔提交,但您可以在使用它时请求中间提交。一个示例用例是,您通常进行廉价计算,但有时您会做一些昂贵的事情并且希望在此操作后尽快提交而不是等待下一个提交间隔(以减少在昂贵的操作和下一个提交间隔)。另一个用例是,如果您将提交间隔设置为 MAX_VALUE
有效 "disables" 定期提交并根据您的业务逻辑决定何时提交。
我想,对于大多数用例来说,调用 commit()
并不是必需的。
对于用例,我在处理器处理方法中批处理一定数量的记录,如果批处理大小达到一定数量(比如 10),则将批处理的记录从处理函数写入文件。
假设我们在提交发生之前将一批记录写入文件和系统崩溃(因为我们无法调用显式提交)。下次流开始并且处理器处理上次提交偏移量的记录。这意味着我们可以将一些重复数据写入文件。有没有办法避免写重复数据??