Processor.init() 在 Kafka Stream 中为单个任务调用多次

Processor.init() called multiple times for a single task in Kafka Stream

我使用了一个利用 WALL_CLOCK_TIME 标点符号的处理器,我注意到在重新平衡阶段之后,init() 方法被多次调用用于同一任务。

我在 init() 中记录了这一行:

log.info("In processor init, taskId is {}, cancellable is {}", context.taskId(), statisticsSending);

在日志中我可以看到它被调用了两次:

07:53:15 INFO - In processor init, taskId is 1_0, cancellable is org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd

07:53:15 INFO - In processor init, taskId is 1_0, cancellable is org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@7770d7b7

此外,我记录了 close() 方法中发生的事情,我看到那里取消了 Cancellable...

07:53:15 INFO - Closing cancellable org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd

...从其身份哈希码 (11a53ebd) 判断,现有处理器已被重用,但也创建了一个新处理器。结果,我的周期性任务被安排了两次,而不是一次。

我以为每个任务只有一个处理器。有什么想法可能导致此类行为,我该如何防止它发生?

在重新平衡期间,所有 Processor 都关闭并在重新平衡后再次初始化。这是确保没有数据丢失所必需的。

但是,您看到的散列是指注册的标点符号,而不是 Processor 对象。因此,如果您 cancel close 中的标点符号和 schedule 中的标点符号 init() 旧时间表将被新时间表取代。