如何使用 Spring Boot 等待完整的 Kafka 消息批处理?

How to wait for full Kafka-message batch with Spring Boot?

批量消费Kafka消息时,可以使用max.poll.records限制批量大小。

如果消费者速度非常快并且其提交偏移量没有明显滞后,这意味着大多数批次将小得多。我只想接收 "full" 个批次,即,只有在达到批次大小时才调用我的消费者函数。所以我正在寻找类似 min.poll.records 的东西,它在那种形式下不存在。

这是我正在做的一个最小例子:

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.stereotype.Component

@SpringBootApplication
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args)
}

@Component
class TestConsumer {
    @Bean
    fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> {
        val configs = kafkaProperties.buildConsumerProperties()
        configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 1000
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
        factory.isBatchListener = true
        return factory
    }

    @KafkaListener(
        topics = ["myTopic"],
        containerFactory = "kafkaBatchListenerContainerFactory"
    )
    fun batchListen(values: List<ConsumerRecord<String, String>>) {
        println(values.count())
    }
}

当开始时有一点消费者滞后,它输出如下:

[...]
1000
1000
1000
[...]
1000
1000
1000
256
27
8
9
3
1
1
23
[...]

是否有任何方法(在 "incomplete" 批处理的情况下无需在消费者处理程序中手动 sleeping)在满足以下两个条件之一时调用该函数? - 仅当至少有 n 条消息时 - 或者至少 m 毫秒用于等待

Kafka没有min.poll.records;如果您的记录长度相似,您可以使用 fetch.min.bytes 对其进行近似。另见 fetch.max.wait.ms.

由于 ,目前无法让 Kafka 做我正在寻找的事情,这是我的手动缓冲解决方案,它实现了所需的行为:

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.text.SimpleDateFormat
import java.util.*
import javax.annotation.PreDestroy

@SpringBootApplication
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args)
}

@Component
class TestConsumer {
    @KafkaListener(topics = ["myTopic"])
    fun listen(value: String) {
        addToBuffer(value)
    }

    private val buffer = mutableSetOf<String>()

    @Synchronized
    fun addToBuffer(message: String) {
        buffer.add(message)
        if (buffer.size >= 300) {
            flushBuffer()
        }
    }

    @Synchronized
    @Scheduled(fixedDelay = 700)
    @PreDestroy
    fun flushBuffer() {
        if (buffer.isEmpty()) {
            return
        }
        val timestamp = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS").format(Date())
        println("$timestamp: ${buffer.count()}")
        buffer.clear()
    }
}

示例输出:

[...]
2020-01-03T07:01:13.032: 300
2020-01-03T07:01:13.041: 300
2020-01-03T07:01:13.078: 300
2020-01-03T07:01:13.133: 300
2020-01-03T07:01:13.143: 300
2020-01-03T07:01:13.188: 300
2020-01-03T07:01:13.197: 300
2020-01-03T07:01:13.321: 300
2020-01-03T07:01:13.352: 300
2020-01-03T07:01:13.359: 300
2020-01-03T07:01:13.399: 300
2020-01-03T07:01:13.407: 300
2020-01-03T07:01:13.533: 300
2020-01-03T07:01:13.571: 300
2020-01-03T07:01:13.580: 300
2020-01-03T07:01:13.607: 300
2020-01-03T07:01:13.611: 300
2020-01-03T07:01:13.632: 300
2020-01-03T07:01:13.682: 300
2020-01-03T07:01:13.687: 300
2020-01-03T07:01:13.708: 300
2020-01-03T07:01:13.712: 300
2020-01-03T07:01:13.738: 300
2020-01-03T07:01:13.880: 300
2020-01-03T07:01:13.884: 300
2020-01-03T07:01:13.911: 300
2020-01-03T07:01:14.301: 300
2020-01-03T07:01:14.714: 300
2020-01-03T07:01:15.029: 300
2020-01-03T07:01:15.459: 300
2020-01-03T07:01:15.888: 300
2020-01-03T07:01:16.359: 300
[...]

所以我们看到在赶上消费者滞后之后,它提供了 300 匹配主题吞吐量的批次。

是的,@Synchronized 确实会终止并发处理,但在我的用例中,这部分远非瓶颈。

当您等待批处理完成(累积到 300)时,每次返回侦听器获取时都会提交您的偏移量。每次侦听器返回时,它都会提交前一批,尽管您可能没有处理它们,因为您将它们保存在缓冲区中。

如果出现故障(例如,侦听器崩溃),那么您将丢失缓冲区中的消息。对于您的用例来说,这可能不是问题,但只是想强调这种可能性。