创建主题,但在 Kubernetes 上使用 Python 获取 Kafka FailedPayloadsError
Creating topics, but getting Kafka FailedPayloadsError with Python on Kubernetes
我在 python kafka 库中使用 SimpleProducer。
该脚本之前与我尝试过的其他更硬配置的 kafka 设置一起完美运行。
kafka = KafkaClient(u'[masterNodeIp]:[servicePort]')
producer = SimpleProducer(kafka)
#make a simple message, while true run
producer.send_messages(b'oneMoreTopic', sentence)
在 运行 运行此脚本一次后,我在 python-console 中收到此响应。
kafka.common.LeaderNotAvailableError: TopicMetadata(topic='oneMoreTopic', error=5, partitions=[])
然后我可以在 zookeeper.log 上进入我的节点并查看:
2015-09-14 12:16:32,276 - INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x34fcb982d030000 type:setData cxid:0x71 zxid:0x1000000d8 txntype:-1 reqpath:n/a Error Path:/config/topics/oneMoreTopic Error:KeeperErrorCode = NoNode for /config/topics/oneMoreTopic
2015-09-14 12:16:32,278 - INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x34fcb982d030000 type:create cxid:0x72 zxid:0x1000000d9 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics
2015-09-14 12:16:32,302 - INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x34fcb982d030000 type:create cxid:0x7b zxid:0x1000000dc txntype:-1 reqpath:n/a Error Path:/brokers/topics/oneMoreTopic/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/oneMoreTopic/partitions/0
2015-09-14 12:16:32,304 - INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x34fcb982d030000 type:create cxid:0x7c zxid:0x1000000dd txntype:-1 reqpath:n/a Error Path:/brokers/topics/oneMoreTopic/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/oneMoreTopic/partitions
这似乎只是 Zookeeper 为该主题创建了一个新的 Znode,因为它以前不存在。 Kafka server.log 打印:
[2015-09-14 12:16:32,282] INFO Topic creation {"version":1,"partitions":{"0":[10200119]}} (kafka.admin.AdminUtils$)
[2015-09-14 12:16:32,287] INFO [KafkaApi-10200219] Auto creation of topic oneMoreTopic with 1 partitions and replication factor 1 is successful! (kafka.server.KafkaApis)
[2015-09-14 12:16:51,579] INFO Closing socket connection to /10.240.1.94. (kafka.network.Processor)
但是,我的消息从未发布到主题,下次我 运行 我总是得到 python-脚本:
kafka.common.FailedPayloadsError
在我让它工作的情况下,advertised.host.name 始终是节点的外部 IP,但我似乎无法通过 Kubernetes 使其工作。是否可以从容器中调用外部 IP?
我的 kafka/config/server.properties 对于所有经纪人来说都是这样的:
broker.id=10200121
host.name=kafka-f8p06
advertised.host.name=kafka-f8p06
++
broker.id=10200121
host.name=kafka-f8p06 <----- use IP here
advertised.host.name=kafka-f8p06 <---- use IP here
我认为您应该拥有 host.name
和 advertised.host.name
的 IP,因为 K8s 不会通过主机名解析 Pods,而是通过 IP 解析。
您的 kafka 节点可能无法通过这种方式相互通信,也找不到领导者。
我的问题的根本原因已解决,方法是确保我的所有节点都可以访问互联网
sudo iptables -t nat -A POSTROUTING ! -d 10.0.0.0/8 -o ens4v1 -j MASQUERADE
在此之后,我将 advertised.host.name 更改为我试图通过其联系经纪人的外部 IP 地址。因此,如果我的 GCE 节点位于 156.99.33.101,并且在 33777 上为我的 kafka 服务打开了一个节点端口,我会输入:
advertised.host.name=156.99.33.101
advertised.host.port=33777
advertised.host.name 是 kafka 在其中一个代理收到请求时用来连接回自身的内容,因此使用外部地址至少可以使其可达。虽然我不确定将它指向本地地址之外是否有任何后果-space.
我在 python kafka 库中使用 SimpleProducer。 该脚本之前与我尝试过的其他更硬配置的 kafka 设置一起完美运行。
kafka = KafkaClient(u'[masterNodeIp]:[servicePort]')
producer = SimpleProducer(kafka)
#make a simple message, while true run
producer.send_messages(b'oneMoreTopic', sentence)
在 运行 运行此脚本一次后,我在 python-console 中收到此响应。
kafka.common.LeaderNotAvailableError: TopicMetadata(topic='oneMoreTopic', error=5, partitions=[])
然后我可以在 zookeeper.log 上进入我的节点并查看:
2015-09-14 12:16:32,276 - INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x34fcb982d030000 type:setData cxid:0x71 zxid:0x1000000d8 txntype:-1 reqpath:n/a Error Path:/config/topics/oneMoreTopic Error:KeeperErrorCode = NoNode for /config/topics/oneMoreTopic
2015-09-14 12:16:32,278 - INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x34fcb982d030000 type:create cxid:0x72 zxid:0x1000000d9 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics
2015-09-14 12:16:32,302 - INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x34fcb982d030000 type:create cxid:0x7b zxid:0x1000000dc txntype:-1 reqpath:n/a Error Path:/brokers/topics/oneMoreTopic/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/oneMoreTopic/partitions/0
2015-09-14 12:16:32,304 - INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x34fcb982d030000 type:create cxid:0x7c zxid:0x1000000dd txntype:-1 reqpath:n/a Error Path:/brokers/topics/oneMoreTopic/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/oneMoreTopic/partitions
这似乎只是 Zookeeper 为该主题创建了一个新的 Znode,因为它以前不存在。 Kafka server.log 打印:
[2015-09-14 12:16:32,282] INFO Topic creation {"version":1,"partitions":{"0":[10200119]}} (kafka.admin.AdminUtils$)
[2015-09-14 12:16:32,287] INFO [KafkaApi-10200219] Auto creation of topic oneMoreTopic with 1 partitions and replication factor 1 is successful! (kafka.server.KafkaApis)
[2015-09-14 12:16:51,579] INFO Closing socket connection to /10.240.1.94. (kafka.network.Processor)
但是,我的消息从未发布到主题,下次我 运行 我总是得到 python-脚本:
kafka.common.FailedPayloadsError
在我让它工作的情况下,advertised.host.name 始终是节点的外部 IP,但我似乎无法通过 Kubernetes 使其工作。是否可以从容器中调用外部 IP?
我的 kafka/config/server.properties 对于所有经纪人来说都是这样的:
broker.id=10200121
host.name=kafka-f8p06
advertised.host.name=kafka-f8p06
++
broker.id=10200121
host.name=kafka-f8p06 <----- use IP here
advertised.host.name=kafka-f8p06 <---- use IP here
我认为您应该拥有 host.name
和 advertised.host.name
的 IP,因为 K8s 不会通过主机名解析 Pods,而是通过 IP 解析。
您的 kafka 节点可能无法通过这种方式相互通信,也找不到领导者。
我的问题的根本原因已解决,方法是确保我的所有节点都可以访问互联网
sudo iptables -t nat -A POSTROUTING ! -d 10.0.0.0/8 -o ens4v1 -j MASQUERADE
在此之后,我将 advertised.host.name 更改为我试图通过其联系经纪人的外部 IP 地址。因此,如果我的 GCE 节点位于 156.99.33.101,并且在 33777 上为我的 kafka 服务打开了一个节点端口,我会输入:
advertised.host.name=156.99.33.101
advertised.host.port=33777
advertised.host.name 是 kafka 在其中一个代理收到请求时用来连接回自身的内容,因此使用外部地址至少可以使其可达。虽然我不确定将它指向本地地址之外是否有任何后果-space.