autoCommit 如何工作以及在 spring-cloud-stream reactive kafka 中一次将轮询多少条消息?
How autoCommit works and how many message will be polled once in spring-cloud-stream reactive kafka?
我检查document, it only says how auto commit works with poll()
here, and how to configure poll count, here。
那么当我使用 Flux
时,事情是如何运作的?
下面是我的消费代码。
@Bean
fun consumerInboundMsg(handler: QueueHandler): java.util.function.Function<Flux<MessageRequest>, Mono<Void>> {
return Function { flux ->
flux.asFlow().flatMapMerge {
flow {
handler.handleInboundRequest(it)
emit(it)
}
}.asFlux().then()
}
}
澄清一下,自动提交机制由 Apache Kafka 的实现决定。您正在使用 Spring Cloud Stream 反应式或 Spring Cloud Stream 非反应式不会影响自动提交的工作方式。偏移量将在每次轮询中提交,并检查经过的时间是否大于 auto.commit.interval.ms
.
如果提交间隔为 5 秒并且轮询在 7 秒后发生,则提交只会在 7 秒后发生。
要检查您的消费者提交偏移量的频率,启用跟踪日志:logging.level.org.apache.kafka: trace
我检查document, it only says how auto commit works with poll()
here, and how to configure poll count, here。
那么当我使用 Flux
时,事情是如何运作的?
下面是我的消费代码。
@Bean
fun consumerInboundMsg(handler: QueueHandler): java.util.function.Function<Flux<MessageRequest>, Mono<Void>> {
return Function { flux ->
flux.asFlow().flatMapMerge {
flow {
handler.handleInboundRequest(it)
emit(it)
}
}.asFlux().then()
}
}
澄清一下,自动提交机制由 Apache Kafka 的实现决定。您正在使用 Spring Cloud Stream 反应式或 Spring Cloud Stream 非反应式不会影响自动提交的工作方式。偏移量将在每次轮询中提交,并检查经过的时间是否大于 auto.commit.interval.ms
.
如果提交间隔为 5 秒并且轮询在 7 秒后发生,则提交只会在 7 秒后发生。
要检查您的消费者提交偏移量的频率,启用跟踪日志:logging.level.org.apache.kafka: trace