broker重启后长时间加载offsets和元数据块KafkaConsumer
Loading offsets and metadata blocks KafkaConsumer after broker restart for a long time
我们遇到的问题是,有时调用新 KafkaConsumer 的 'poll' 方法会挂起 20 到 30 分钟 在三分之一的 kafka 经纪人重启后!
我们正在使用 3 broker kafka 设置 (0.9.0.1)。
我们的消费者进程使用新的 Java KafkaConsumer-API 我们是
分配给特定的 TopicPartitions。
由于各种原因我不能在这里展示真正的代码,但基本上我们的代码是这样工作的:
Properties consumerProps=loadConsumerProperties();
// bootstrap.servers=<IP1>:9092,<IP2>:9092,<IP3>:9092
// group.id="consumer_group_gwbc2
// enable.auto.commit=false
// auto.offset.reset=latest
// session.timeout.ms=30000
// key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
// value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.assign(Arrays.asList(new TopicPartition("someTopic",0)));
while (true) {
// THIS CALL sometimes blocks for a very long Time after a broker restart
ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(200);
Iterator<ConsumerRecord<String, byte[]>> recordIter = records.iterator();
while (recordIter.hasNext()) {
ConsumerRecord<String, byte[]> record = recordIter.next();
// Very fast, actually just sending a UDP Paket via Netty.
processRecord(record);
if (lastCommitHappendFiveOrMoreSecondsAgo()) {
kafkaConsumer.commitAsync();
}
}
}
kafka-topics.sh描述__consumer_offsets题目如下
Topic:__consumer_offsets PartitionCount:50
ReplicationFactor:3 Configs:segment.bytes=104857600,
cleanup.policy=compact,compression.type=uncompressed
重新启动的代理的 server.log 显示从 __consumer_offsets 主题的特定分区加载偏移量需要很长时间(在本例中大约 22 分钟)。这与消费者的 'poll' 调用被阻塞的时间相关。
[2016-07-25 16:02:40,846] INFO [Group Metadata Manager on Broker 1]: Loading offsets and group metadata from [__consumer_offsets,15] (kafka.coordinator.GroupMetadataManager)
[2016-07-25 16:25:36,697] INFO [Group Metadata Manager on Broker 1]: Finished loading offsets from [__consumer_offsets,15] in 1375851 milliseconds.
我想知道是什么导致加载过程如此缓慢,可以采取什么措施!?
找到原因了
我们代理的 server.xml 配置文件包含 属性
log.cleaner.enable=false
(默认情况下,此 属性 在版本 0.9.0.1 中为真)
这意味着kafkas内部压缩了__consumer_offsets主题
由于禁用了日志清理器,因此实际上并未压缩。
实际上,该主题的某些分区增长到几千兆字节的大小,这解释了当新的组协调器需要重新填充其缓存时读取所有消费者偏移数据所需的时间量。
我们遇到的问题是,有时调用新 KafkaConsumer 的 'poll' 方法会挂起 20 到 30 分钟 在三分之一的 kafka 经纪人重启后!
我们正在使用 3 broker kafka 设置 (0.9.0.1)。 我们的消费者进程使用新的 Java KafkaConsumer-API 我们是 分配给特定的 TopicPartitions。
由于各种原因我不能在这里展示真正的代码,但基本上我们的代码是这样工作的:
Properties consumerProps=loadConsumerProperties();
// bootstrap.servers=<IP1>:9092,<IP2>:9092,<IP3>:9092
// group.id="consumer_group_gwbc2
// enable.auto.commit=false
// auto.offset.reset=latest
// session.timeout.ms=30000
// key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
// value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.assign(Arrays.asList(new TopicPartition("someTopic",0)));
while (true) {
// THIS CALL sometimes blocks for a very long Time after a broker restart
ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(200);
Iterator<ConsumerRecord<String, byte[]>> recordIter = records.iterator();
while (recordIter.hasNext()) {
ConsumerRecord<String, byte[]> record = recordIter.next();
// Very fast, actually just sending a UDP Paket via Netty.
processRecord(record);
if (lastCommitHappendFiveOrMoreSecondsAgo()) {
kafkaConsumer.commitAsync();
}
}
}
kafka-topics.sh描述__consumer_offsets题目如下
Topic:__consumer_offsets PartitionCount:50
ReplicationFactor:3 Configs:segment.bytes=104857600,
cleanup.policy=compact,compression.type=uncompressed
重新启动的代理的 server.log 显示从 __consumer_offsets 主题的特定分区加载偏移量需要很长时间(在本例中大约 22 分钟)。这与消费者的 'poll' 调用被阻塞的时间相关。
[2016-07-25 16:02:40,846] INFO [Group Metadata Manager on Broker 1]: Loading offsets and group metadata from [__consumer_offsets,15] (kafka.coordinator.GroupMetadataManager)
[2016-07-25 16:25:36,697] INFO [Group Metadata Manager on Broker 1]: Finished loading offsets from [__consumer_offsets,15] in 1375851 milliseconds.
我想知道是什么导致加载过程如此缓慢,可以采取什么措施!?
找到原因了
我们代理的 server.xml 配置文件包含 属性
log.cleaner.enable=false
(默认情况下,此 属性 在版本 0.9.0.1 中为真) 这意味着kafkas内部压缩了__consumer_offsets主题 由于禁用了日志清理器,因此实际上并未压缩。 实际上,该主题的某些分区增长到几千兆字节的大小,这解释了当新的组协调器需要重新填充其缓存时读取所有消费者偏移数据所需的时间量。