我如何在 Kafka 中获取代理的连接字符串

How do I get connection string for brokers in Kafka

我是卡夫卡的新手。试图实现消费者和生产者 classes 来发送和接收消息。需要为两个 classes 配置 bootstrap.servers,这是由 , 分隔的代理的 ip 和端口列表。例如,

producerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");

由于应用程序将 运行 在集群的主节点上,它应该能够从 ZooKeeper 检索代理信息,就像 的答案一样。

public static void main(String[] args) throws Exception {
    ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
    List<String> ids = zk.getChildren("/brokers/ids", false);
    for (String id : ids) {
        String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
        System.out.println(id + ": " + brokerInfo);
    }
}

然而,此 brokerInfo 的格式为 Json,如下所示: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}

在同一个 post 中,另一个人建议使用以下方法获取每个代理的连接字符串并用逗号将它们连接在一起。

for (String id : ids) {
        String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
        Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
        if (broker != null) {
            brokerList.add(broker.connectionString());
        }
    }

如果这个 Broker class 来自 org.apache.kafka.common.requests.UpdateMetadataRequest.Broker,它没有方法 createBrokerconnectionString

找到另一个相似的 post 。但它没有说明如何从hostport等代理信息中获取属性。我可能可以为 json 这样的字符串编写一个解析器来提取它们,但我怀疑有更多的 Kafka 本机方法可以做到这一点。有什么建议吗?

编辑: 我意识到 Broker class 来自 kafka.cluster.Broker。仍然没有方法 connectionstring().

您可以使用ZkUtils获取集群中的所有broker信息,如下所示:

ZkUtils zk = ZkUtils.apply("zkHost:2181", 6000, 6000, true);
List<Broker> brokers = JavaConversions.seqAsJavaList(zk.getAllBrokersInCluster());
for (Broker broker : brokers) {  
    //assuming you do not enable security    
    System.out.println(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host());
}
zk.close();