autoCommit 如何工作以及在 spring-cloud-stream reactive kafka 中一次将轮询多少条消息?

How autoCommit works and how many message will be polled once in spring-cloud-stream reactive kafka?

我检查document, it only says how auto commit works with poll() here, and how to configure poll count, here

那么当我使用 Flux 时,事情是如何运作的?

下面是我的消费代码。

    @Bean
    fun consumerInboundMsg(handler: QueueHandler): java.util.function.Function<Flux<MessageRequest>, Mono<Void>> {
        return Function { flux ->
            flux.asFlow().flatMapMerge {
                flow {
                    handler.handleInboundRequest(it)
                    emit(it)
                }
            }.asFlux().then()
        }
    }

澄清一下,自动提交机制由 Apache Kafka 的实现决定。您正在使用 Spring Cloud Stream 反应式或 Spring Cloud Stream 非反应式不会影响自动提交的工作方式。偏移量将在每次轮询中提交,并检查经过的时间是否大于 auto.commit.interval.ms.

如果提交间隔为 5 秒并且轮询在 7 秒后发生,则提交只会在 7 秒后发生。

要检查您的消费者提交偏移量的频率,启用跟踪日志:logging.level.org.apache.kafka: trace