Kafka 生产者无法从代理获取元数据

Kafka producer fails fetching metadata from broker

我在三个独立的节点上托管 Kafka,每个节点都有本地 Zookeeper 实例。在生成主题 live 时,出现以下异常。 metadata.brokers.list 不是设置为 localhost,而是 kafka1.domain.comkafka2.domain.comkafka3.domain.comadvertised.host.name 设置为亚马逊实例的本地 IP 地址。 Kafka 设置中没有对 localhost 的引用。

这是生产者配置:

props.put("metadata.broker.list", "kafka1.domain.com:9092,kafka2.domain.com:9092,kafka3.domain.com:9092")
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("producer.type", "async")
props.put("batch.num.messages", "1000")
props.put("queue.buffering.max.ms", "20000")
props.put("request.required.acks", "0")

每个 producer.send 都会产生异常。

有人知道哪里出了问题吗?

WARN ClientUtils$: Fetching topic metadata with correlation id 5 for topics [Set(live)] from broker [id:0,host:localhost,port:9092] failed
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
        at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate.apply(DefaultEventHandler.scala:150)
        at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate.apply(DefaultEventHandler.scala:149)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:94)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
ERROR DefaultEventHandler: Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(live)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
WARN ClientUtils$: Fetching topic metadata with correlation id 6 for topics [Set(live)] from broker [id:0,host:localhost,port:9092] failed
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.async.DefaultEventHandler$$anonfun$handle.apply$mcV$sp(DefaultEventHandler.scala:78)
        at kafka.utils.Utils$.swallow(Utils.scala:172)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:45)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:94)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
ERROR Utils$: fetching topic metadata for topics [Set(live)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
kafka.common.KafkaException: fetching topic metadata for topics [Set(live)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.async.DefaultEventHandler$$anonfun$handle.apply$mcV$sp(DefaultEventHandler.scala:78)
        at kafka.utils.Utils$.swallow(Utils.scala:172)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:45)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:94)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Caused by: java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
        ... 9 more

我在 kafka:0.8.2.1 中使用 kafka.producer.Producer。显然它被窃听了,通过在 kafka-clients:0.8.2.1 中选择 org.apache.kafka.clients.producer.KafkaProducer,它在相同的配置上立即运行,没有任何问题。