无法在容器内从 kafka 代理 运行 生产或消费
Cant produce-to OR consume-from kafka broker running inside a container
正在设置
我正在使用来自 docker 集线器的 confluent/kafka 图像来启动 zookeeper 和 kafka 实例在两个单独的容器中。我用来启动容器的命令如下:
docker run --rm --name zookeeper -p 2181:2181 confluent/zookeeper
docker run --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper confluent/kafka
我现在有两个容器 zookeeper 和 kafka 运行。
Note that I have mapped ports 2181 and 9092 of the containers to my host machine ports. I verified that this mapping is working by just trying localhost:2181/9092 in my browser and I get some errors printed in my running containers' terminals.
然后我通过在我的主机中发出以下命令来创建主题:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
这是成功的,我通过使用以下命令列出主题来验证它:
./bin/kafka-topics.sh --list --zookeeper localhost:2181
现在问题:
我正在尝试使用以下命令向代理生成一些消息:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
我遇到以下异常:
[2017-03-02 20:36:02,376] WARN Failed to send producer request with correlation id 2 to broker 0 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer$$anonfun$send$$anonfun$apply$mcV$sp.apply$mcV$sp(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$$anonfun$apply$mcV$sp.apply(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$$anonfun$apply$mcV$sp.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData.apply(DefaultEventHandler.scala:106)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData.apply(DefaultEventHandler.scala:100)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach.apply(TraversableLike.scala:778)
at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:594)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
我在 Internet 上阅读了一些主题,建议我更新我的 hosts 文件。如果是这样,我必须在我的 hosts 文件中放入什么条目?
还有一些线程建议我设置 ADVERTISED_HOST 条目以在 配置中更正 IP 文件。哪个配置文件???我在哪里进行更新?
如果它是用于 kafka 代理的 server.properties 文件,那么我确实尝试进入由 confluent/kafka 创建的容器 图片。它看起来像这样:
socket.send.buffer.bytes=102400
delete.topic.enable=true
socket.request.max.bytes=104857600
log.cleaner.enable=true
log.retention.check.interval.ms=300000
log.retention.hours=168
num.io.threads=8
broker.id=0
log4j.opts=-Dlog4j.configuration\=file\:/etc/kafka/log4j.properties
log.dirs=/var/lib/kafka
auto.create.topics.enable=true
num.network.threads=3
socket.receive.buffer.bytes=102400
log.segment.bytes=1073741824
num.recovery.threads.per.data.dir=1
num.partitions=1
zookeeper.connection.timeout.ms=6000
zookeeper.connect=zookeeper\:2181
有什么建议可以解决这个问题并解决从我的主机上可能的 kafka 容器生产和消费的问题吗??
非常感谢!!!
我在发布这个问题后的几秒钟内就弄明白了。
我必须通过发出以下命令来获取代理 运行 所在容器的 HOSTNAME:
echo $HOSTNAME
并且我使用 loopback 条目更新了主机中的 /etc/hosts 文件:
127.0.0.1 KAFKA_CONTAINER_HOSTNAME
127.0.0.1 ZOOKEEPER_CONTAINER_HOSTNAME
必须对 zookeeper 容器执行相同操作,以便消费者也能正常工作。
干杯!!!
正在设置
我正在使用来自 docker 集线器的 confluent/kafka 图像来启动 zookeeper 和 kafka 实例在两个单独的容器中。我用来启动容器的命令如下:
docker run --rm --name zookeeper -p 2181:2181 confluent/zookeeper
docker run --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper confluent/kafka
我现在有两个容器 zookeeper 和 kafka 运行。
Note that I have mapped ports 2181 and 9092 of the containers to my host machine ports. I verified that this mapping is working by just trying localhost:2181/9092 in my browser and I get some errors printed in my running containers' terminals.
然后我通过在我的主机中发出以下命令来创建主题:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
这是成功的,我通过使用以下命令列出主题来验证它:
./bin/kafka-topics.sh --list --zookeeper localhost:2181
现在问题:
我正在尝试使用以下命令向代理生成一些消息:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
我遇到以下异常:
[2017-03-02 20:36:02,376] WARN Failed to send producer request with correlation id 2 to broker 0 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer$$anonfun$send$$anonfun$apply$mcV$sp.apply$mcV$sp(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$$anonfun$apply$mcV$sp.apply(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$$anonfun$apply$mcV$sp.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData.apply(DefaultEventHandler.scala:106)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData.apply(DefaultEventHandler.scala:100)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach.apply(TraversableLike.scala:778)
at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:594)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
我在 Internet 上阅读了一些主题,建议我更新我的 hosts 文件。如果是这样,我必须在我的 hosts 文件中放入什么条目?
还有一些线程建议我设置 ADVERTISED_HOST 条目以在 配置中更正 IP 文件。哪个配置文件???我在哪里进行更新?
如果它是用于 kafka 代理的 server.properties 文件,那么我确实尝试进入由 confluent/kafka 创建的容器 图片。它看起来像这样:
socket.send.buffer.bytes=102400
delete.topic.enable=true
socket.request.max.bytes=104857600
log.cleaner.enable=true
log.retention.check.interval.ms=300000
log.retention.hours=168
num.io.threads=8
broker.id=0
log4j.opts=-Dlog4j.configuration\=file\:/etc/kafka/log4j.properties
log.dirs=/var/lib/kafka
auto.create.topics.enable=true
num.network.threads=3
socket.receive.buffer.bytes=102400
log.segment.bytes=1073741824
num.recovery.threads.per.data.dir=1
num.partitions=1
zookeeper.connection.timeout.ms=6000
zookeeper.connect=zookeeper\:2181
有什么建议可以解决这个问题并解决从我的主机上可能的 kafka 容器生产和消费的问题吗??
非常感谢!!!
我在发布这个问题后的几秒钟内就弄明白了。
我必须通过发出以下命令来获取代理 运行 所在容器的 HOSTNAME:
echo $HOSTNAME
并且我使用 loopback 条目更新了主机中的 /etc/hosts 文件:
127.0.0.1 KAFKA_CONTAINER_HOSTNAME
127.0.0.1 ZOOKEEPER_CONTAINER_HOSTNAME
必须对 zookeeper 容器执行相同操作,以便消费者也能正常工作。
干杯!!!