Processor.init() 在 Kafka Stream 中为单个任务调用多次
Processor.init() called multiple times for a single task in Kafka Stream
我使用了一个利用 WALL_CLOCK_TIME
标点符号的处理器,我注意到在重新平衡阶段之后,init()
方法被多次调用用于同一任务。
我在 init()
中记录了这一行:
log.info("In processor init, taskId is {}, cancellable is {}", context.taskId(), statisticsSending);
在日志中我可以看到它被调用了两次:
07:53:15 INFO - In processor
init, taskId is 1_0, cancellable is
org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd
07:53:15 INFO - In processor
init, taskId is 1_0, cancellable is
org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@7770d7b7
此外,我记录了 close()
方法中发生的事情,我看到那里取消了 Cancellable
...
07:53:15 INFO - Closing cancellable
org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd
...从其身份哈希码 (11a53ebd) 判断,现有处理器已被重用,但也创建了一个新处理器。结果,我的周期性任务被安排了两次,而不是一次。
我以为每个任务只有一个处理器。有什么想法可能导致此类行为,我该如何防止它发生?
在重新平衡期间,所有 Processor
都关闭并在重新平衡后再次初始化。这是确保没有数据丢失所必需的。
但是,您看到的散列是指注册的标点符号,而不是 Processor
对象。因此,如果您 cancel
close
中的标点符号和 schedule
中的标点符号 init()
旧时间表将被新时间表取代。
我使用了一个利用 WALL_CLOCK_TIME
标点符号的处理器,我注意到在重新平衡阶段之后,init()
方法被多次调用用于同一任务。
我在 init()
中记录了这一行:
log.info("In processor init, taskId is {}, cancellable is {}", context.taskId(), statisticsSending);
在日志中我可以看到它被调用了两次:
07:53:15 INFO - In processor init, taskId is 1_0, cancellable is org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd
07:53:15 INFO - In processor init, taskId is 1_0, cancellable is org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@7770d7b7
此外,我记录了 close()
方法中发生的事情,我看到那里取消了 Cancellable
...
07:53:15 INFO - Closing cancellable org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd
...从其身份哈希码 (11a53ebd) 判断,现有处理器已被重用,但也创建了一个新处理器。结果,我的周期性任务被安排了两次,而不是一次。
我以为每个任务只有一个处理器。有什么想法可能导致此类行为,我该如何防止它发生?
在重新平衡期间,所有 Processor
都关闭并在重新平衡后再次初始化。这是确保没有数据丢失所必需的。
但是,您看到的散列是指注册的标点符号,而不是 Processor
对象。因此,如果您 cancel
close
中的标点符号和 schedule
中的标点符号 init()
旧时间表将被新时间表取代。