使用partitionsFor方法获取kafka 0.8.0中生产者的分区数
Getting the number of partitions in the producer in kafka 0.8.0 using partitionsFor method
kafka 版本 0.8.0 中的生产者是否支持 partitionsFor
方法?我想使用这种方法来获取给定 kafka 主题的分区数。
如果在kafka 0.8.0中没有这种方法,那么在这个特定版本的kafka中获取生产者中分区数的最简单方法是什么?
你为什么不试试下面的方法
。
ZkUtils 也有方法 getPartitionsForTopics 也可以使用。虽然我没有亲自尝试和测试过
您也可以使用 listTopics() 方法
ArrayList<Topics> topicList = new ArrayList<Topics>();
Properties props = new Properties();
Map<String, List<PartitionInfo>> topics;
Topics topic;
InputStream input =
getClass().getClassLoader().getResourceAsStream("kafkaCluster.properties");
try {
props.load(input);
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
props.getProperty("BOOTSTRAP_SERVERS_CONFIG"));
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);
topics = consumer.listTopics();
// System.out.println(topics.get(topics));
for (Map.Entry<String, List<PartitionInfo>> entry :
topics.entrySet()) {
System.out.println("Key = " + entry.getKey() + ", Value = " +
entry.getValue());
topic = new Topics();
topic.setTopic_name(entry.getKey());
topic.setPartitions(Integer.toString(entry.getValue().size()));
topicList.add(topic);
}
} catch (IOException e) {
e.printStackTrace();
}
kafka 版本 0.8.0 中的生产者是否支持 partitionsFor
方法?我想使用这种方法来获取给定 kafka 主题的分区数。
如果在kafka 0.8.0中没有这种方法,那么在这个特定版本的kafka中获取生产者中分区数的最简单方法是什么?
你为什么不试试下面的方法
您也可以使用 listTopics() 方法
ArrayList<Topics> topicList = new ArrayList<Topics>();
Properties props = new Properties();
Map<String, List<PartitionInfo>> topics;
Topics topic;
InputStream input =
getClass().getClassLoader().getResourceAsStream("kafkaCluster.properties");
try {
props.load(input);
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
props.getProperty("BOOTSTRAP_SERVERS_CONFIG"));
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);
topics = consumer.listTopics();
// System.out.println(topics.get(topics));
for (Map.Entry<String, List<PartitionInfo>> entry :
topics.entrySet()) {
System.out.println("Key = " + entry.getKey() + ", Value = " +
entry.getValue());
topic = new Topics();
topic.setTopic_name(entry.getKey());
topic.setPartitions(Integer.toString(entry.getValue().size()));
topicList.add(topic);
}
} catch (IOException e) {
e.printStackTrace();
}