通过 GCP 上的 python 脚本访问 Kafka 生产者服务器
Access Kafka producer server through python script on GCP
我在 Google Cloud Platform 集群上成功连接了 Kafka 生产者和消费者:
$ cd /usr/lib/kafka
$ bin/kafka-console-producer.sh config/server.properties --broker-list \
PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092 --topic test
并在新的 shell
中执行
$ cd /usr/lib/kafka
$ bin/kafka-console-consumer.sh --bootstrap-server \
PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092 --topic test \
--from-beginning
现在,我想使用以下 python 脚本向 Kafka 生产者服务器发送消息:
from kafka import *
topic = 'test'
producer = KafkaProducer(bootstrap_servers='PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092',
api_version=(0,10))
producer.send(topic, b"Test test test")
但是,这会导致 KafkaTimeoutError
:
"Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
网上查了一下让我考虑一下:
- 取消注释
/usr/lib/kafka/config/server.properties
文件中的 listeners=...
和 advertised.listeners=...
。
但是,listeners=PLAINTEXT://:9092
不起作用,post建议设置PLAINTEXT://<external-ip>:9092
。
所以,我开始想通过 GCP 集群的外部(静态)IP 地址访问 Kafka 服务器。然后,我们设置了一个防火墙规则来访问端口(?)并允许 https 访问集群。但我不确定这是否是问题的矫枉过正。
我确实需要一些指导才能从 python 脚本成功连接到 Kafka 服务器。
您需要将 advertised.listeners
设置为您的客户端连接的地址。
更多信息:https://rmoff.net/2018/08/02/kafka-listeners-explained/
谢谢罗宾!您发布的 link 对找到以下工作配置非常有帮助。
尽管 SimpleProducer
似乎是一种已弃用的方法,但以下设置最终对我有用:
Python 脚本:
from kafka import *
topic = 'test'
kafka = KafkaClient('[project-name]-w-0.c.[cluster-id].internal:9092')
producer = SimpleProducer(kafka)
message = "Test"
producer.send_messages(topic, message.encode('utf-8'))
并在 /usr/lib/kafka/config/server.properties
文件中取消注释:
listeners=PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092
advertised.listeners=PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092
我在 Google Cloud Platform 集群上成功连接了 Kafka 生产者和消费者:
$ cd /usr/lib/kafka
$ bin/kafka-console-producer.sh config/server.properties --broker-list \
PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092 --topic test
并在新的 shell
中执行$ cd /usr/lib/kafka
$ bin/kafka-console-consumer.sh --bootstrap-server \
PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092 --topic test \
--from-beginning
现在,我想使用以下 python 脚本向 Kafka 生产者服务器发送消息:
from kafka import *
topic = 'test'
producer = KafkaProducer(bootstrap_servers='PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092',
api_version=(0,10))
producer.send(topic, b"Test test test")
但是,这会导致 KafkaTimeoutError
:
"Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
网上查了一下让我考虑一下:
- 取消注释
/usr/lib/kafka/config/server.properties
文件中的listeners=...
和advertised.listeners=...
。
但是,listeners=PLAINTEXT://:9092
不起作用,PLAINTEXT://<external-ip>:9092
。
所以,我开始想通过 GCP 集群的外部(静态)IP 地址访问 Kafka 服务器。然后,我们设置了一个防火墙规则来访问端口(?)并允许 https 访问集群。但我不确定这是否是问题的矫枉过正。
我确实需要一些指导才能从 python 脚本成功连接到 Kafka 服务器。
您需要将 advertised.listeners
设置为您的客户端连接的地址。
更多信息:https://rmoff.net/2018/08/02/kafka-listeners-explained/
谢谢罗宾!您发布的 link 对找到以下工作配置非常有帮助。
尽管 SimpleProducer
似乎是一种已弃用的方法,但以下设置最终对我有用:
Python 脚本:
from kafka import *
topic = 'test'
kafka = KafkaClient('[project-name]-w-0.c.[cluster-id].internal:9092')
producer = SimpleProducer(kafka)
message = "Test"
producer.send_messages(topic, message.encode('utf-8'))
并在 /usr/lib/kafka/config/server.properties
文件中取消注释:
listeners=PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092
advertised.listeners=PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092