kafka __consumer_offsets 主题的分区数过多
kafka __consumer_offsets topic has excessive partition count
我正在使用 Kafka 0.8.2,但我的消费者提示 "offset commit failed with ..." 时出现错误。在查看主题“__consumer_offsets”时。我看到它有 50 个分区数。那是正常的吗?我只能通过删除所有 Kafka 日志并重新启动我的 Kafka 服务器来解决这个问题。有没有办法在达到一定数量的分区时删除此主题,或者我是否提交了错误的偏移量?
这是我提交偏移量的方式:
public void commitOffsets(BlockingChannel channel, String topic, String groupid, int partition, String clientName, int corrilationid, long offset) throws Exception{
if (commitTryCount > 100){
throw new Exception("Offset commit failed with " + channel.host());
}
long now = System.currentTimeMillis();
Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
//for (int i = 0; i < this.totalPartitions; i++){
TopicAndPartition topicPartition = new TopicAndPartition(topic, partition);
offsets.put(topicPartition, new OffsetAndMetadata(offset, topic, now));
//}
//initialize offset commit
OffsetCommitRequest commitRequest = new OffsetCommitRequest(groupid, offsets, corrilationid, clientName, (short) 1);
channel.send(commitRequest.underlying());
OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer());
if (commitResponse.hasError()){
for (Object partitionErrorCode: commitResponse.errors().values()){
if (Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.OffsetMetadataTooLargeCode()){
//reduce the size of the metadata and retry
offset--;
commitOffsets(channel, topic, groupid, partition, clientName, corrilationid, offset);
commitTryCount++;
} else if (Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.NotCoordinatorForConsumerCode()
|| Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
//discover new coordinator and retry
int newCorrilation = corrilationid;
newCorrilation++;
this.channel = discoverChannel(channel.host(), port, groupid, clientName, newCorrilation);
commitOffsets(this.channel, topic, groupid, partition, clientName, newCorrilation, offset);
commitTryCount++;
} else{
//retry
commitOffsets(channel, topic, groupid, partition, clientName, corrilationid, offset);
commitTryCount++;
}//end of else
}//end of for
}//end of if
}//end of method
我发布代码后就明白了。我忘记在提交成功时将变量 "commitTryCount" 设置为 0。我还在想 __consumer_offsets 主题有 50 个分区是否正常?
是的,默认 50 个消费者偏移分区。要更改,请设置 offsets.topic.num.partitions
属性.
我正在使用 Kafka 0.8.2,但我的消费者提示 "offset commit failed with ..." 时出现错误。在查看主题“__consumer_offsets”时。我看到它有 50 个分区数。那是正常的吗?我只能通过删除所有 Kafka 日志并重新启动我的 Kafka 服务器来解决这个问题。有没有办法在达到一定数量的分区时删除此主题,或者我是否提交了错误的偏移量?
这是我提交偏移量的方式:
public void commitOffsets(BlockingChannel channel, String topic, String groupid, int partition, String clientName, int corrilationid, long offset) throws Exception{
if (commitTryCount > 100){
throw new Exception("Offset commit failed with " + channel.host());
}
long now = System.currentTimeMillis();
Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
//for (int i = 0; i < this.totalPartitions; i++){
TopicAndPartition topicPartition = new TopicAndPartition(topic, partition);
offsets.put(topicPartition, new OffsetAndMetadata(offset, topic, now));
//}
//initialize offset commit
OffsetCommitRequest commitRequest = new OffsetCommitRequest(groupid, offsets, corrilationid, clientName, (short) 1);
channel.send(commitRequest.underlying());
OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer());
if (commitResponse.hasError()){
for (Object partitionErrorCode: commitResponse.errors().values()){
if (Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.OffsetMetadataTooLargeCode()){
//reduce the size of the metadata and retry
offset--;
commitOffsets(channel, topic, groupid, partition, clientName, corrilationid, offset);
commitTryCount++;
} else if (Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.NotCoordinatorForConsumerCode()
|| Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
//discover new coordinator and retry
int newCorrilation = corrilationid;
newCorrilation++;
this.channel = discoverChannel(channel.host(), port, groupid, clientName, newCorrilation);
commitOffsets(this.channel, topic, groupid, partition, clientName, newCorrilation, offset);
commitTryCount++;
} else{
//retry
commitOffsets(channel, topic, groupid, partition, clientName, corrilationid, offset);
commitTryCount++;
}//end of else
}//end of for
}//end of if
}//end of method
我发布代码后就明白了。我忘记在提交成功时将变量 "commitTryCount" 设置为 0。我还在想 __consumer_offsets 主题有 50 个分区是否正常?
是的,默认 50 个消费者偏移分区。要更改,请设置 offsets.topic.num.partitions
属性.