如何使用 Spring Boot 等待完整的 Kafka 消息批处理?
How to wait for full Kafka-message batch with Spring Boot?
批量消费Kafka消息时,可以使用max.poll.records
限制批量大小。
如果消费者速度非常快并且其提交偏移量没有明显滞后,这意味着大多数批次将小得多。我只想接收 "full" 个批次,即,只有在达到批次大小时才调用我的消费者函数。所以我正在寻找类似 min.poll.records
的东西,它在那种形式下不存在。
这是我正在做的一个最小例子:
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.stereotype.Component
@SpringBootApplication
class Application
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
@Component
class TestConsumer {
@Bean
fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> {
val configs = kafkaProperties.buildConsumerProperties()
configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 1000
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
factory.isBatchListener = true
return factory
}
@KafkaListener(
topics = ["myTopic"],
containerFactory = "kafkaBatchListenerContainerFactory"
)
fun batchListen(values: List<ConsumerRecord<String, String>>) {
println(values.count())
}
}
当开始时有一点消费者滞后,它输出如下:
[...]
1000
1000
1000
[...]
1000
1000
1000
256
27
8
9
3
1
1
23
[...]
是否有任何方法(在 "incomplete" 批处理的情况下无需在消费者处理程序中手动 sleep
ing)在满足以下两个条件之一时调用该函数?
- 仅当至少有 n
条消息时
- 或者至少 m
毫秒用于等待
Kafka没有min.poll.records
;如果您的记录长度相似,您可以使用 fetch.min.bytes
对其进行近似。另见 fetch.max.wait.ms
.
由于 ,目前无法让 Kafka 做我正在寻找的事情,这是我的手动缓冲解决方案,它实现了所需的行为:
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.text.SimpleDateFormat
import java.util.*
import javax.annotation.PreDestroy
@SpringBootApplication
class Application
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
@Component
class TestConsumer {
@KafkaListener(topics = ["myTopic"])
fun listen(value: String) {
addToBuffer(value)
}
private val buffer = mutableSetOf<String>()
@Synchronized
fun addToBuffer(message: String) {
buffer.add(message)
if (buffer.size >= 300) {
flushBuffer()
}
}
@Synchronized
@Scheduled(fixedDelay = 700)
@PreDestroy
fun flushBuffer() {
if (buffer.isEmpty()) {
return
}
val timestamp = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS").format(Date())
println("$timestamp: ${buffer.count()}")
buffer.clear()
}
}
示例输出:
[...]
2020-01-03T07:01:13.032: 300
2020-01-03T07:01:13.041: 300
2020-01-03T07:01:13.078: 300
2020-01-03T07:01:13.133: 300
2020-01-03T07:01:13.143: 300
2020-01-03T07:01:13.188: 300
2020-01-03T07:01:13.197: 300
2020-01-03T07:01:13.321: 300
2020-01-03T07:01:13.352: 300
2020-01-03T07:01:13.359: 300
2020-01-03T07:01:13.399: 300
2020-01-03T07:01:13.407: 300
2020-01-03T07:01:13.533: 300
2020-01-03T07:01:13.571: 300
2020-01-03T07:01:13.580: 300
2020-01-03T07:01:13.607: 300
2020-01-03T07:01:13.611: 300
2020-01-03T07:01:13.632: 300
2020-01-03T07:01:13.682: 300
2020-01-03T07:01:13.687: 300
2020-01-03T07:01:13.708: 300
2020-01-03T07:01:13.712: 300
2020-01-03T07:01:13.738: 300
2020-01-03T07:01:13.880: 300
2020-01-03T07:01:13.884: 300
2020-01-03T07:01:13.911: 300
2020-01-03T07:01:14.301: 300
2020-01-03T07:01:14.714: 300
2020-01-03T07:01:15.029: 300
2020-01-03T07:01:15.459: 300
2020-01-03T07:01:15.888: 300
2020-01-03T07:01:16.359: 300
[...]
所以我们看到在赶上消费者滞后之后,它提供了 300
匹配主题吞吐量的批次。
是的,@Synchronized
确实会终止并发处理,但在我的用例中,这部分远非瓶颈。
当您等待批处理完成(累积到 300)时,每次返回侦听器获取时都会提交您的偏移量。每次侦听器返回时,它都会提交前一批,尽管您可能没有处理它们,因为您将它们保存在缓冲区中。
如果出现故障(例如,侦听器崩溃),那么您将丢失缓冲区中的消息。对于您的用例来说,这可能不是问题,但只是想强调这种可能性。
批量消费Kafka消息时,可以使用max.poll.records
限制批量大小。
如果消费者速度非常快并且其提交偏移量没有明显滞后,这意味着大多数批次将小得多。我只想接收 "full" 个批次,即,只有在达到批次大小时才调用我的消费者函数。所以我正在寻找类似 min.poll.records
的东西,它在那种形式下不存在。
这是我正在做的一个最小例子:
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.stereotype.Component
@SpringBootApplication
class Application
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
@Component
class TestConsumer {
@Bean
fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> {
val configs = kafkaProperties.buildConsumerProperties()
configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 1000
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
factory.isBatchListener = true
return factory
}
@KafkaListener(
topics = ["myTopic"],
containerFactory = "kafkaBatchListenerContainerFactory"
)
fun batchListen(values: List<ConsumerRecord<String, String>>) {
println(values.count())
}
}
当开始时有一点消费者滞后,它输出如下:
[...]
1000
1000
1000
[...]
1000
1000
1000
256
27
8
9
3
1
1
23
[...]
是否有任何方法(在 "incomplete" 批处理的情况下无需在消费者处理程序中手动 sleep
ing)在满足以下两个条件之一时调用该函数?
- 仅当至少有 n
条消息时
- 或者至少 m
毫秒用于等待
Kafka没有min.poll.records
;如果您的记录长度相似,您可以使用 fetch.min.bytes
对其进行近似。另见 fetch.max.wait.ms
.
由于
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.text.SimpleDateFormat
import java.util.*
import javax.annotation.PreDestroy
@SpringBootApplication
class Application
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
@Component
class TestConsumer {
@KafkaListener(topics = ["myTopic"])
fun listen(value: String) {
addToBuffer(value)
}
private val buffer = mutableSetOf<String>()
@Synchronized
fun addToBuffer(message: String) {
buffer.add(message)
if (buffer.size >= 300) {
flushBuffer()
}
}
@Synchronized
@Scheduled(fixedDelay = 700)
@PreDestroy
fun flushBuffer() {
if (buffer.isEmpty()) {
return
}
val timestamp = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS").format(Date())
println("$timestamp: ${buffer.count()}")
buffer.clear()
}
}
示例输出:
[...]
2020-01-03T07:01:13.032: 300
2020-01-03T07:01:13.041: 300
2020-01-03T07:01:13.078: 300
2020-01-03T07:01:13.133: 300
2020-01-03T07:01:13.143: 300
2020-01-03T07:01:13.188: 300
2020-01-03T07:01:13.197: 300
2020-01-03T07:01:13.321: 300
2020-01-03T07:01:13.352: 300
2020-01-03T07:01:13.359: 300
2020-01-03T07:01:13.399: 300
2020-01-03T07:01:13.407: 300
2020-01-03T07:01:13.533: 300
2020-01-03T07:01:13.571: 300
2020-01-03T07:01:13.580: 300
2020-01-03T07:01:13.607: 300
2020-01-03T07:01:13.611: 300
2020-01-03T07:01:13.632: 300
2020-01-03T07:01:13.682: 300
2020-01-03T07:01:13.687: 300
2020-01-03T07:01:13.708: 300
2020-01-03T07:01:13.712: 300
2020-01-03T07:01:13.738: 300
2020-01-03T07:01:13.880: 300
2020-01-03T07:01:13.884: 300
2020-01-03T07:01:13.911: 300
2020-01-03T07:01:14.301: 300
2020-01-03T07:01:14.714: 300
2020-01-03T07:01:15.029: 300
2020-01-03T07:01:15.459: 300
2020-01-03T07:01:15.888: 300
2020-01-03T07:01:16.359: 300
[...]
所以我们看到在赶上消费者滞后之后,它提供了 300
匹配主题吞吐量的批次。
是的,@Synchronized
确实会终止并发处理,但在我的用例中,这部分远非瓶颈。
当您等待批处理完成(累积到 300)时,每次返回侦听器获取时都会提交您的偏移量。每次侦听器返回时,它都会提交前一批,尽管您可能没有处理它们,因为您将它们保存在缓冲区中。
如果出现故障(例如,侦听器崩溃),那么您将丢失缓冲区中的消息。对于您的用例来说,这可能不是问题,但只是想强调这种可能性。