有时一个新的消费者群体不起作用
Sometimes a new consumer group does not work
我在生产中见过一次(我不记得我们是如何解决它的)现在我可以在集成测试中重复它,它总是从全新的 Kafka 安装开始。这是怎么回事:
第 1 步: 尚不存在的组的消费者订阅尚不存在的主题并开始轮询。
self.kafka_consumer = confluent_kafka.Consumer({
'group.id': 'mygroup',
'bootstrap.servers': 'kafka:9092',
'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
})
self.kafka_consumer.subscribe('mytopic')
第 2 步: 生产者向主题写入消息。
结果:
- 大约一半的时间它工作正常;消费者可以正常阅读消息。
- 另一半时间消费者似乎卡住了。我试过最多等待 10 分钟,看看它是否会松开,但没有。
- 即使这两个步骤相反,即消费者尝试订阅已经有消息的现有主题,行为也是相同的(但是组总是新的)。
更多详情
消费者正在轮询,超时为 2 秒,如果没有结果则循环。
虽然主题不存在,poll()
returns None
。主题存在后,poll()
returns 一个 msg
其 error().code()
是 _PARTITION_EOF
.
当消费者似乎卡住了时,我询问 kafka mygroup
发生了什么,它告诉我的是:
root@e7b124b4039c:/# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
root@e7b124b4039c:/#
我试图通过阅读另一个不存在的主题来让它解开 mygroup
:
root@e7b124b4039c:/# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group mygroup --topic nonexistent --from-beginning
[2018-03-15 16:36:59,369] WARN [Consumer clientId=consumer-1, groupId=pixelprocessor] Error while fetching metadata with correlation id 2 : {nonexistent=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
^CProcessed a total of 0 messages
root@e7b124b4039c:/#
在我这样做之后,这是 Kafka 对 mygroup
的看法:
root@e7b124b4039c:/# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mytopic 0 - 1 - rdkafka-a172d013-08e6-4ee2-92f3-fdb07d163d57 /172.20.0.6 rdkafka
(another topic) 0 - 0 - rdkafka-a172d013-08e6-4ee2-92f3-fdb07d163d57 /172.20.0.6 rdkafka
(a third topic) 0 - 0 - rdkafka-a172d013-08e6-4ee2-92f3-fdb07d163d57 /172.20.0.6 rdkafka
nonexistent 0 0 0 0 - - -
这是 Kafka 1.0.1,librdkafka 0.11.3,confluent_kafka 0.11.0,在 Ubuntu 16.04 docker 上(使用 OS 打包的 zookeeper 3.4.8) 运行 在 Debian stretch (9.4) 上 Linux 4.9.0-6-amd64.
问题似乎出在 Consumer()
参数中。这不能正常工作:
self.kafka_consumer = confluent_kafka.Consumer({
'group.id': 'mygroup',
'bootstrap.servers': 'kafka:9092',
'auto.offset.reset': 'earliest',
})
但是这样做:
self.kafka_consumer = confluent_kafka.Consumer({
'group.id': 'mygroup',
'bootstrap.servers': 'kafka:9092',
'default.topic.config': {
'auto.offset.reset': 'earliest',
},
})
我在生产中见过一次(我不记得我们是如何解决它的)现在我可以在集成测试中重复它,它总是从全新的 Kafka 安装开始。这是怎么回事:
第 1 步: 尚不存在的组的消费者订阅尚不存在的主题并开始轮询。
self.kafka_consumer = confluent_kafka.Consumer({
'group.id': 'mygroup',
'bootstrap.servers': 'kafka:9092',
'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
})
self.kafka_consumer.subscribe('mytopic')
第 2 步: 生产者向主题写入消息。
结果:
- 大约一半的时间它工作正常;消费者可以正常阅读消息。
- 另一半时间消费者似乎卡住了。我试过最多等待 10 分钟,看看它是否会松开,但没有。
- 即使这两个步骤相反,即消费者尝试订阅已经有消息的现有主题,行为也是相同的(但是组总是新的)。
更多详情
消费者正在轮询,超时为 2 秒,如果没有结果则循环。
虽然主题不存在,poll()
returns None
。主题存在后,poll()
returns 一个 msg
其 error().code()
是 _PARTITION_EOF
.
当消费者似乎卡住了时,我询问 kafka mygroup
发生了什么,它告诉我的是:
root@e7b124b4039c:/# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
root@e7b124b4039c:/#
我试图通过阅读另一个不存在的主题来让它解开 mygroup
:
root@e7b124b4039c:/# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group mygroup --topic nonexistent --from-beginning
[2018-03-15 16:36:59,369] WARN [Consumer clientId=consumer-1, groupId=pixelprocessor] Error while fetching metadata with correlation id 2 : {nonexistent=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
^CProcessed a total of 0 messages
root@e7b124b4039c:/#
在我这样做之后,这是 Kafka 对 mygroup
的看法:
root@e7b124b4039c:/# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mytopic 0 - 1 - rdkafka-a172d013-08e6-4ee2-92f3-fdb07d163d57 /172.20.0.6 rdkafka
(another topic) 0 - 0 - rdkafka-a172d013-08e6-4ee2-92f3-fdb07d163d57 /172.20.0.6 rdkafka
(a third topic) 0 - 0 - rdkafka-a172d013-08e6-4ee2-92f3-fdb07d163d57 /172.20.0.6 rdkafka
nonexistent 0 0 0 0 - - -
这是 Kafka 1.0.1,librdkafka 0.11.3,confluent_kafka 0.11.0,在 Ubuntu 16.04 docker 上(使用 OS 打包的 zookeeper 3.4.8) 运行 在 Debian stretch (9.4) 上 Linux 4.9.0-6-amd64.
问题似乎出在 Consumer()
参数中。这不能正常工作:
self.kafka_consumer = confluent_kafka.Consumer({
'group.id': 'mygroup',
'bootstrap.servers': 'kafka:9092',
'auto.offset.reset': 'earliest',
})
但是这样做:
self.kafka_consumer = confluent_kafka.Consumer({
'group.id': 'mygroup',
'bootstrap.servers': 'kafka:9092',
'default.topic.config': {
'auto.offset.reset': 'earliest',
},
})