enable.auto.commit = false 的 Kafka 消费者仍在提交偏移量
Kafka Consumer with enable.auto.commit = false still committing the offsets
Kafka 消费者已经处理了消息 1、2、3、4 并且 enable.auto.commit 设置为 false。
但是在重新启动消费者时,它不会再次重新处理上述消息,从 CLI 我可以看到偏移量已经增加并且没有延迟(因此它正在提交)。
你能帮忙解决这个问题吗,以了解消费者如何在 属性 enable.auto.commit 设置为 false 的情况下仍然提交偏移量。
以下是消费者属性
allow.auto.create.topics = true
auto.commit.interval.ms = 0
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
enable.auto.commit = false
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = EmployeeConsumer
目前使用 spring-kafka-2.5.0.RELEASE.jar 作为依赖
您需要显示您的 Spring 配置。
enable.auto.commit=false
告诉 kafka-clients 不要提交偏移量,但 Spring 默认情况下会提交偏移量。
将侦听器容器 ackMode
属性 设置为 AckMode.MANUAL
以禁用容器提交。
此外:
auto.offset.reset = latest
意味着 从未 提交偏移量的消费者将从 topic/partition 的当前末尾开始消费,因此它不会得到现有记录。
Kafka 消费者已经处理了消息 1、2、3、4 并且 enable.auto.commit 设置为 false。
但是在重新启动消费者时,它不会再次重新处理上述消息,从 CLI 我可以看到偏移量已经增加并且没有延迟(因此它正在提交)。
你能帮忙解决这个问题吗,以了解消费者如何在 属性 enable.auto.commit 设置为 false 的情况下仍然提交偏移量。
以下是消费者属性
allow.auto.create.topics = true
auto.commit.interval.ms = 0
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
enable.auto.commit = false
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = EmployeeConsumer
目前使用 spring-kafka-2.5.0.RELEASE.jar 作为依赖
您需要显示您的 Spring 配置。
enable.auto.commit=false
告诉 kafka-clients 不要提交偏移量,但 Spring 默认情况下会提交偏移量。
将侦听器容器 ackMode
属性 设置为 AckMode.MANUAL
以禁用容器提交。
此外:
auto.offset.reset = latest
意味着 从未 提交偏移量的消费者将从 topic/partition 的当前末尾开始消费,因此它不会得到现有记录。