适当地 seekToBeginning() 一组中的所有消费者
Properly seekToBeginning() all consumers in a group
我的目标是将一个消费者组中的所有消费者都寻找到每个分区的开头。我用的是spring-kafka,尝试使用ConsumerSeekAware
接口的方式:
override fun onPartitionsAssigned(assignments: MutableMap<TopicPartition, Long>?, callback: ConsumerSeekAware.ConsumerSeekCallback?) {
logger.info { "on partitions assigned" }
assignments?.forEach { topic, _ -> callback?.seekToBeginning(topic.topic(), topic.partition()) }
}
现在,它只适用于一组中的一个消费者,但如果我想寻求开始多个消费者怎么办?我试过了,它最终导致了分区的无限重新平衡。我误解了什么?
我最终使用了 ConsumerSeekAware
的其他方法,即 registerSeekCallback()
。
调用 onPartitionsAssigned
后,我将当前分区保存为侦听器中的成员。调用后相同 registerSeekCallback
我将回调保存在实例中。
然后我可以为每个消费者进行按需调用 consumeFromBeginning
(通过 REST,或在开始后):
class EventConsumer : ConsumerSeekAware {
private lateinit var assignedTopicPartitions: List<TopicPartition>
private lateinit var consumerSeekCallback: ConsumerSeekAware.ConsumerSeekCallback
fun consumeFromBeginning() {
assignedTopicPartitions.forEach { topicPartition ->
consumerSeekCallback.seekToBeginning(topicPartition.topic(), topicPartition.partition())}
}
override fun onPartitionsAssigned(assignments: MutableMap<TopicPartition, Long>?, callback: ConsumerSeekAware.ConsumerSeekCallback?) {
logger.debug { "on partitions assigned" }
this.assignedTopicPartitions = assignments?.map { it.key } ?: error("no assignments found")
}
override fun registerSeekCallback(callback: ConsumerSeekAware.ConsumerSeekCallback?) {
logger.debug { "registerSeekCallback" }
this.consumerSeekCallback = callback ?: error("seek callback not found")
}
override fun onIdleContainer(assignments: MutableMap<TopicPartition, Long>?, callback: ConsumerSeekAware.ConsumerSeekCallback?) {}
}
我的目标是将一个消费者组中的所有消费者都寻找到每个分区的开头。我用的是spring-kafka,尝试使用ConsumerSeekAware
接口的方式:
override fun onPartitionsAssigned(assignments: MutableMap<TopicPartition, Long>?, callback: ConsumerSeekAware.ConsumerSeekCallback?) {
logger.info { "on partitions assigned" }
assignments?.forEach { topic, _ -> callback?.seekToBeginning(topic.topic(), topic.partition()) }
}
现在,它只适用于一组中的一个消费者,但如果我想寻求开始多个消费者怎么办?我试过了,它最终导致了分区的无限重新平衡。我误解了什么?
我最终使用了 ConsumerSeekAware
的其他方法,即 registerSeekCallback()
。
调用 onPartitionsAssigned
后,我将当前分区保存为侦听器中的成员。调用后相同 registerSeekCallback
我将回调保存在实例中。
然后我可以为每个消费者进行按需调用 consumeFromBeginning
(通过 REST,或在开始后):
class EventConsumer : ConsumerSeekAware {
private lateinit var assignedTopicPartitions: List<TopicPartition>
private lateinit var consumerSeekCallback: ConsumerSeekAware.ConsumerSeekCallback
fun consumeFromBeginning() {
assignedTopicPartitions.forEach { topicPartition ->
consumerSeekCallback.seekToBeginning(topicPartition.topic(), topicPartition.partition())}
}
override fun onPartitionsAssigned(assignments: MutableMap<TopicPartition, Long>?, callback: ConsumerSeekAware.ConsumerSeekCallback?) {
logger.debug { "on partitions assigned" }
this.assignedTopicPartitions = assignments?.map { it.key } ?: error("no assignments found")
}
override fun registerSeekCallback(callback: ConsumerSeekAware.ConsumerSeekCallback?) {
logger.debug { "registerSeekCallback" }
this.consumerSeekCallback = callback ?: error("seek callback not found")
}
override fun onIdleContainer(assignments: MutableMap<TopicPartition, Long>?, callback: ConsumerSeekAware.ConsumerSeekCallback?) {}
}