Kafka:从 ZooKeeper 获取代理主机
Kafka: Get broker host from ZooKeeper
出于特殊原因,我需要同时使用 - ConsumerGroup
(a.k.a。高级消费者)和 SimpleConsumer
(a.k.a。低级消费者)来从卡夫卡读。对于 ConsumerGroup
,我使用基于 ZooKeeper 的配置并且对此非常满意,但是 SimpleConsumer
需要实例化种子代理。
我不想保留两者的列表 - ZooKeeper 和代理主机。因此,我正在寻找一种方法来自动发现来自 ZooKeeper 的特定主题 的代理。
由于一些间接信息,我相信这些数据存储在以下路径之一的 ZooKeeper 中:
/brokers/topics/<topic>/partitions/<partition-id>/state
- /brokers/ids/
但是,当我尝试从这些节点读取数据时,出现序列化错误(为此我使用 com.101tec.zkclient
):
org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 7B226A6D
at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37)
at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)
... 64 elided
Caused by: java.io.StreamCorruptedException: invalid stream header: 7B226A6D
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804)
at java.io.ObjectInputStream.(ObjectInputStream.java:299)
at org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30)
at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31)
... 69 more
我可以毫无问题地写入和读取自定义 Java 对象(例如字符串),所以我相信这不是客户端的问题,而是编码的问题。因此,我想知道:
- 如果这是正确的方法,如何正确读取这些节点?
- 如果整个方法是错误的,什么是正确的?
这就是我的一位同事获取 Kafka 经纪人列表的方式。我认为当您想动态获取经纪人列表时,这是一种正确的方法。
下面是一个示例代码,展示了如何获取列表。
public class KafkaBrokerInfoFetcher {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
List<String> ids = zk.getChildren("/brokers/ids", false);
for (String id : ids) {
String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
System.out.println(id + ": " + brokerInfo);
}
}
}
运行 由三个代理组成的集群上的代码导致
1: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}
2: {"jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094}
3: {"jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095}
原来Kafka使用ZKStringSerializer
来读写数据到znodes。因此,要修复错误,我只需将其添加为 ZkClient
构造函数中的最后一个参数:
val zkClient = new ZkClient(zkQuorum, Integer.MAX_VALUE, 10000, ZKStringSerializer)
使用它,我编写了几个有用的函数来发现经纪人 ID、他们的地址和其他东西:
import kafka.utils.Json
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.KafkaException
def listBrokers(): List[Int] = {
zkClient.getChildren("/brokers/ids").toList.map(_.toInt)
}
def listTopics(): List[String] = {
zkClient.getChildren("/brokers/topics").toList
}
def listPartitions(topic: String): List[Int] = {
val path = "/brokers/topics/" + topic + "/partitions"
if (zkClient.exists(path)) {
zkClient.getChildren(path).toList.map(_.toInt)
} else {
throw new KafkaException(s"Topic ${topic} doesn't exist")
}
}
def getBrokerAddress(brokerId: Int): (String, Int) = {
val path = s"/brokers/ids/${brokerId}"
if (zkClient.exists(path)) {
val brokerInfo = readZkData(path)
(brokerInfo.get("host").get.asInstanceOf[String], brokerInfo.get("port").get.asInstanceOf[Int])
} else {
throw new KafkaException("Broker with ID ${brokerId} doesn't exist")
}
}
def getLeaderAddress(topic: String, partitionId: Int): (String, Int) = {
val path = s"/brokers/topics/${topic}/partitions/${partitionId}/state"
if (zkClient.exists(path)) {
val leaderStr = zkClient.readData[String](path)
val leaderId = Json.parseFull(leaderStr).get.asInstanceOf[Map[String, Any]].get("leader").get.asInstanceOf[Int]
getBrokerAddress(leaderId)
} else {
throw new KafkaException(s"Topic (${topic}) or partition (${partitionId}) doesn't exist")
}
}
public KafkaProducer(String zookeeperAddress, String topic) throws IOException,
KeeperException, InterruptedException {
this.zookeeperAddress = zookeeperAddress;
this.topic = topic;
ZooKeeper zk = new ZooKeeper(zookeeperAddress, 10000, null);
List<String> brokerList = new ArrayList<String>();
List<String> ids = zk.getChildren("/brokers/ids", false);
for (String id : ids) {
String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
if (broker != null) {
brokerList.add(broker.connectionString());
}
}
props.put("serializer.class", KAFKA_STRING_ENCODER);
props.put("metadata.broker.list", String.join(",", brokerList));
producer = new Producer<String, String>(new ProducerConfig(props));
}
实际上,Kafka 中有 ZkUtils
(至少 0.8.x 行),您可以使用它,但需要注意一个小问题:您需要 re-implement ZkStringSerializer 将字符串转换为 UTF-8 编码的字节数组。如果您想使用 Java8 的流式 API,您可以通过 scala.collection.JavaConversions
迭代 Scala 集合。这对我的案子有帮助。
要使用 shell 执行此操作:
zookeeper-shell myzookeeper.example.com:2181
ls /brokers/ids
=> [2, 1, 0]
get /brokers/ids/2
get /brokers/ids/1
get /brokers/ids/0
出于特殊原因,我需要同时使用 - ConsumerGroup
(a.k.a。高级消费者)和 SimpleConsumer
(a.k.a。低级消费者)来从卡夫卡读。对于 ConsumerGroup
,我使用基于 ZooKeeper 的配置并且对此非常满意,但是 SimpleConsumer
需要实例化种子代理。
我不想保留两者的列表 - ZooKeeper 和代理主机。因此,我正在寻找一种方法来自动发现来自 ZooKeeper 的特定主题 的代理。
由于一些间接信息,我相信这些数据存储在以下路径之一的 ZooKeeper 中:
/brokers/topics/<topic>/partitions/<partition-id>/state
- /brokers/ids/
但是,当我尝试从这些节点读取数据时,出现序列化错误(为此我使用 com.101tec.zkclient
):
org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) ... 64 elided Caused by: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) at java.io.ObjectInputStream.(ObjectInputStream.java:299) at org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30) at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31) ... 69 more
我可以毫无问题地写入和读取自定义 Java 对象(例如字符串),所以我相信这不是客户端的问题,而是编码的问题。因此,我想知道:
- 如果这是正确的方法,如何正确读取这些节点?
- 如果整个方法是错误的,什么是正确的?
这就是我的一位同事获取 Kafka 经纪人列表的方式。我认为当您想动态获取经纪人列表时,这是一种正确的方法。
下面是一个示例代码,展示了如何获取列表。
public class KafkaBrokerInfoFetcher {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
List<String> ids = zk.getChildren("/brokers/ids", false);
for (String id : ids) {
String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
System.out.println(id + ": " + brokerInfo);
}
}
}
运行 由三个代理组成的集群上的代码导致
1: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}
2: {"jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094}
3: {"jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095}
原来Kafka使用ZKStringSerializer
来读写数据到znodes。因此,要修复错误,我只需将其添加为 ZkClient
构造函数中的最后一个参数:
val zkClient = new ZkClient(zkQuorum, Integer.MAX_VALUE, 10000, ZKStringSerializer)
使用它,我编写了几个有用的函数来发现经纪人 ID、他们的地址和其他东西:
import kafka.utils.Json
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.KafkaException
def listBrokers(): List[Int] = {
zkClient.getChildren("/brokers/ids").toList.map(_.toInt)
}
def listTopics(): List[String] = {
zkClient.getChildren("/brokers/topics").toList
}
def listPartitions(topic: String): List[Int] = {
val path = "/brokers/topics/" + topic + "/partitions"
if (zkClient.exists(path)) {
zkClient.getChildren(path).toList.map(_.toInt)
} else {
throw new KafkaException(s"Topic ${topic} doesn't exist")
}
}
def getBrokerAddress(brokerId: Int): (String, Int) = {
val path = s"/brokers/ids/${brokerId}"
if (zkClient.exists(path)) {
val brokerInfo = readZkData(path)
(brokerInfo.get("host").get.asInstanceOf[String], brokerInfo.get("port").get.asInstanceOf[Int])
} else {
throw new KafkaException("Broker with ID ${brokerId} doesn't exist")
}
}
def getLeaderAddress(topic: String, partitionId: Int): (String, Int) = {
val path = s"/brokers/topics/${topic}/partitions/${partitionId}/state"
if (zkClient.exists(path)) {
val leaderStr = zkClient.readData[String](path)
val leaderId = Json.parseFull(leaderStr).get.asInstanceOf[Map[String, Any]].get("leader").get.asInstanceOf[Int]
getBrokerAddress(leaderId)
} else {
throw new KafkaException(s"Topic (${topic}) or partition (${partitionId}) doesn't exist")
}
}
public KafkaProducer(String zookeeperAddress, String topic) throws IOException,
KeeperException, InterruptedException {
this.zookeeperAddress = zookeeperAddress;
this.topic = topic;
ZooKeeper zk = new ZooKeeper(zookeeperAddress, 10000, null);
List<String> brokerList = new ArrayList<String>();
List<String> ids = zk.getChildren("/brokers/ids", false);
for (String id : ids) {
String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
if (broker != null) {
brokerList.add(broker.connectionString());
}
}
props.put("serializer.class", KAFKA_STRING_ENCODER);
props.put("metadata.broker.list", String.join(",", brokerList));
producer = new Producer<String, String>(new ProducerConfig(props));
}
实际上,Kafka 中有 ZkUtils
(至少 0.8.x 行),您可以使用它,但需要注意一个小问题:您需要 re-implement ZkStringSerializer 将字符串转换为 UTF-8 编码的字节数组。如果您想使用 Java8 的流式 API,您可以通过 scala.collection.JavaConversions
迭代 Scala 集合。这对我的案子有帮助。
要使用 shell 执行此操作:
zookeeper-shell myzookeeper.example.com:2181
ls /brokers/ids
=> [2, 1, 0]
get /brokers/ids/2
get /brokers/ids/1
get /brokers/ids/0