KafkaConsumer:`seekToEnd()` 不会让消费者从最新的偏移量开始消费
KafkaConsumer: `seekToEnd()` does not make consumer consume from latest offset
我有以下代码
class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {
fun run() {
consumer.seekToEnd(emptyList())
val pollDuration = 30 // seconds
while (true) {
val records = consumer.poll(Duration.ofSeconds(pollDuration))
// perform record analysis and commitSync()
}
}
}
}
消费者订阅的主题不断收到记录。有时,消费者会因为处理步骤而崩溃。当消费者重新启动时,我希望它从主题的最新偏移量开始消费(即忽略消费者关闭时发布到主题的记录)。我认为 seekToEnd()
方法可以确保这一点。但是,该方法似乎根本没有效果。消费者从它崩溃的偏移量开始消费。
seekToEnd()
的正确使用方法是什么?
编辑:使用以下配置创建消费者
fun <T> buildConsumer(valueDeserializer: String): KafkaConsumer<String, T> {
val props = setupConfig(valueDeserializer)
Common.setupConsumerSecurityProtocol(props)
return createConsumer(props)
}
fun setupConfig(valueDeserializer: String): Properties {
// Configuration setup
val props = Properties()
props[ConsumerConfig.GROUP_ID_CONFIG] = config.applicationId
props[ConsumerConfig.CLIENT_ID_CONFIG] = config.kafka.clientId
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.kafka.bootstrapServers
props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = config.kafka.schemaRegistryUrl
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = config.kafka.stringDeserializer
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = valueDeserializer
props[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = "true"
props[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = config.kafka.maxPollIntervalMs
props[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = config.kafka.sessionTimeoutMs
props[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = "false"
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
return props
}
fun <T> createConsumer(props: Properties): KafkaConsumer<String, T> {
val consumer = KafkaConsumer<String, T>(props)
consumer.subscribe(listOf(config.kafka.inputTopic))
return consumer
}
seekToEnd
方法需要您计划让您的消费者从末尾读取的实际分区信息(在 Kafka 术语中 TopicPartition
)。
我不熟悉 Kotlin API,但是查看 KafkaConsumer's method seekToEnd 上的 JavaDocs 您会发现,它要求收集 TopicPartition。
由于您目前正在使用 emptyList()
,正如您观察到的那样,它根本不会产生任何影响。
我找到了解决办法!
我需要添加一个虚拟轮询作为消费者初始化过程的一部分。由于多个 Kafka 方法是延迟评估的,因此有必要使用虚拟轮询将分区分配给消费者。如果没有虚拟轮询,消费者会尝试寻找空分区的末尾。结果,seekToEnd()
没有效果。
重要的是虚拟轮询持续时间足够长以分配分区。例如,对于 consumer.poll((Duration.ofSeconds(1))
,在程序继续进行下一个方法调用(即 seekToEnd()
)之前,分区没有得到分配时间。
工作代码可能看起来像这样
class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {
fun run() {
// Initialization
val pollDuration = 30 // seconds
consumer.poll((Duration.ofSeconds(pollDuration)) // Dummy poll to get assigned partitions
// Seek to end and commit new offset
consumer.seekToEnd(emptyList())
consumer.commitSync()
while (true) {
val records = consumer.poll(Duration.ofSeconds(pollDuration))
// perform record analysis and commitSync()
}
}
}
}
我有以下代码
class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {
fun run() {
consumer.seekToEnd(emptyList())
val pollDuration = 30 // seconds
while (true) {
val records = consumer.poll(Duration.ofSeconds(pollDuration))
// perform record analysis and commitSync()
}
}
}
}
消费者订阅的主题不断收到记录。有时,消费者会因为处理步骤而崩溃。当消费者重新启动时,我希望它从主题的最新偏移量开始消费(即忽略消费者关闭时发布到主题的记录)。我认为 seekToEnd()
方法可以确保这一点。但是,该方法似乎根本没有效果。消费者从它崩溃的偏移量开始消费。
seekToEnd()
的正确使用方法是什么?
编辑:使用以下配置创建消费者
fun <T> buildConsumer(valueDeserializer: String): KafkaConsumer<String, T> {
val props = setupConfig(valueDeserializer)
Common.setupConsumerSecurityProtocol(props)
return createConsumer(props)
}
fun setupConfig(valueDeserializer: String): Properties {
// Configuration setup
val props = Properties()
props[ConsumerConfig.GROUP_ID_CONFIG] = config.applicationId
props[ConsumerConfig.CLIENT_ID_CONFIG] = config.kafka.clientId
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.kafka.bootstrapServers
props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = config.kafka.schemaRegistryUrl
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = config.kafka.stringDeserializer
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = valueDeserializer
props[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = "true"
props[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = config.kafka.maxPollIntervalMs
props[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = config.kafka.sessionTimeoutMs
props[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = "false"
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
return props
}
fun <T> createConsumer(props: Properties): KafkaConsumer<String, T> {
val consumer = KafkaConsumer<String, T>(props)
consumer.subscribe(listOf(config.kafka.inputTopic))
return consumer
}
seekToEnd
方法需要您计划让您的消费者从末尾读取的实际分区信息(在 Kafka 术语中 TopicPartition
)。
我不熟悉 Kotlin API,但是查看 KafkaConsumer's method seekToEnd 上的 JavaDocs 您会发现,它要求收集 TopicPartition。
由于您目前正在使用 emptyList()
,正如您观察到的那样,它根本不会产生任何影响。
我找到了解决办法!
我需要添加一个虚拟轮询作为消费者初始化过程的一部分。由于多个 Kafka 方法是延迟评估的,因此有必要使用虚拟轮询将分区分配给消费者。如果没有虚拟轮询,消费者会尝试寻找空分区的末尾。结果,seekToEnd()
没有效果。
重要的是虚拟轮询持续时间足够长以分配分区。例如,对于 consumer.poll((Duration.ofSeconds(1))
,在程序继续进行下一个方法调用(即 seekToEnd()
)之前,分区没有得到分配时间。
工作代码可能看起来像这样
class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {
fun run() {
// Initialization
val pollDuration = 30 // seconds
consumer.poll((Duration.ofSeconds(pollDuration)) // Dummy poll to get assigned partitions
// Seek to end and commit new offset
consumer.seekToEnd(emptyList())
consumer.commitSync()
while (true) {
val records = consumer.poll(Duration.ofSeconds(pollDuration))
// perform record analysis and commitSync()
}
}
}
}