两个 Kafka 消费者导致彼此发生奇怪的行为
Two Kafka consumers causing odd behavior with one another
我有两个具有不同客户端 ID 和组 ID 的消费者。除了 retention hour 和 max partitions 之外,我的 Kafka 安装还包含默认配置。我环顾四周,看看是否有其他人遇到过同样的问题,但无法得出任何结果。
所以场景是这样的:
消费者A:
连接到Kafka,消费了大约300万条需要消费的消息,然后闲置等待更多的消息。
消费者乙:
不同的客户端/组 ID,连接到相同的 Kafka 主题,这导致消费者 A 重复获取 300 万条消息,而消费者 B 也消费它们。
这两个消费者是两个完全不同的 Java 应用程序,在同一台计算机上具有不同的客户端和组 ID 运行。 Kafka 服务器在另一台计算机上。
这是 Kafka 中的正常行为吗?我完全不知所措。
这是我的消费者配置:
bootstrap.servers=192.168.110.109:9092
acks=all
max.block.ms=2000
retries=0
batch.size=16384
auto.commit.interval.ms=1000
linger.ms=0
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
block.on.buffer.full=true
enable.auto.commit=false
auto.offset.reset=none
session.timeout.ms=30000
zookeeper.session.timeout=100000
rebalance.backoff.ms=8000
group.id=consumerGroupA
zookeeper.connect=192.168.110.109:2181
poll.interval=100
我的消费者 B 的明显区别是 group.id=consumerGroupB
这是正确的行为。因为根据您的配置,您的消费者不会提交他们已阅读的记录的偏移量!
当消费者读取一条记录时,它必须提交读取它,您可以通过设置enable.auto.commit=true
或手动提交每条记录来确保消费者自动提交偏移量。在这种情况下,我认为自动提交适合你。
我有两个具有不同客户端 ID 和组 ID 的消费者。除了 retention hour 和 max partitions 之外,我的 Kafka 安装还包含默认配置。我环顾四周,看看是否有其他人遇到过同样的问题,但无法得出任何结果。
所以场景是这样的:
消费者A: 连接到Kafka,消费了大约300万条需要消费的消息,然后闲置等待更多的消息。
消费者乙: 不同的客户端/组 ID,连接到相同的 Kafka 主题,这导致消费者 A 重复获取 300 万条消息,而消费者 B 也消费它们。
这两个消费者是两个完全不同的 Java 应用程序,在同一台计算机上具有不同的客户端和组 ID 运行。 Kafka 服务器在另一台计算机上。
这是 Kafka 中的正常行为吗?我完全不知所措。
这是我的消费者配置:
bootstrap.servers=192.168.110.109:9092
acks=all
max.block.ms=2000
retries=0
batch.size=16384
auto.commit.interval.ms=1000
linger.ms=0
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
block.on.buffer.full=true
enable.auto.commit=false
auto.offset.reset=none
session.timeout.ms=30000
zookeeper.session.timeout=100000
rebalance.backoff.ms=8000
group.id=consumerGroupA
zookeeper.connect=192.168.110.109:2181
poll.interval=100
我的消费者 B 的明显区别是 group.id=consumerGroupB
这是正确的行为。因为根据您的配置,您的消费者不会提交他们已阅读的记录的偏移量!
当消费者读取一条记录时,它必须提交读取它,您可以通过设置enable.auto.commit=true
或手动提交每条记录来确保消费者自动提交偏移量。在这种情况下,我认为自动提交适合你。