如何根据已处理消息的数量安排周期性任务?
How to schedule periodical task based on number of processed messages?
我想使用 Kafka Processor API 来处理来自 Kafka 的消息。
我想调用一些周期性的函数——比如:
context.schedule(IntervalMS,punctuationType, somePunctuator)
,其中 somePunctuator 执行一些定期作业,但我想在处理一定数量的消息后调用该任务而不是使用间隔时间作为触发器
是否可以在 Kafka 流中进行此类触发?
是的,使用 Kafka Streams State Store 是可能的。
逻辑取决于您究竟需要做什么才能达到已处理消息的数量。
如果您需要将数据传播到下一个处理器或接收器节点,您需要将聚合值存储为键值状态存储中的对象列表。在 Processor.process(..)
中,您将数据放入键值存储中,然后检查项目数量是否达到限制,并执行所需的逻辑(如 processorContext.forward(..)
)。请看一下类似的例子 .
如果你需要在达到数字后做一些逻辑并且不需要值,你可以只存储计数器,并在 Processor.process(..)
中增加这个值。
我想使用 Kafka Processor API 来处理来自 Kafka 的消息。
我想调用一些周期性的函数——比如:
context.schedule(IntervalMS,punctuationType, somePunctuator)
,其中 somePunctuator 执行一些定期作业,但我想在处理一定数量的消息后调用该任务而不是使用间隔时间作为触发器
是否可以在 Kafka 流中进行此类触发?
是的,使用 Kafka Streams State Store 是可能的。 逻辑取决于您究竟需要做什么才能达到已处理消息的数量。
如果您需要将数据传播到下一个处理器或接收器节点,您需要将聚合值存储为键值状态存储中的对象列表。在 Processor.process(..)
中,您将数据放入键值存储中,然后检查项目数量是否达到限制,并执行所需的逻辑(如 processorContext.forward(..)
)。请看一下类似的例子
如果你需要在达到数字后做一些逻辑并且不需要值,你可以只存储计数器,并在 Processor.process(..)
中增加这个值。