获取从 python KafkaProducer 发送的消息
Getting messages sent from python KafkaProducer
我的目标是从非文件源获取数据(即在程序中生成或通过 API 发送)并将其发送到火花流。为此,我通过 python-based KafkaProducer
:
发送数据
$ bin/zookeeper-server-start.sh config/zookeeper.properties &
$ bin/kafka-server-start.sh config/server.properties &
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
$ python
Python 3.6.1| Anaconda custom (64-bit)
> from kafka import KafkaProducer
> import time
> producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
> producer.send(topic = 'my-topic', value = 'MESSAGE ACKNOWLEDGED', timestamp_ms = time.time())
> producer.close()
> exit()
我的问题是从消费者 shell 脚本检查主题时没有任何显示:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic my-topic
^C$
这里有什么遗漏或错误吗?我是 spark/kafka/messaging 系统的新手,所以任何事情都会有所帮助。 Kafka 版本为 0.11.0.0 (Scala 2.11),配置文件没有任何变化。
如果您在向主题发送消息后启动消费者,消费者可能会跳过该消息,因为它会设置主题偏移量(这可以被视为 "starting point" 以供读取)话题到此结束。要更改该行为,请尝试添加 --from-beginning
选项:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
也可以试试kafkacat
,比Kafka的console consumer and producer(恕我直言)方便多了。使用 kafkacat
从 Kafka 读取消息可以使用以下命令执行:
kafkacat -C -b 'localhost:9092' -o beginning -e -D '\n' -t 'my-topic'
希望对您有所帮助。
我发现了问题,value_serializer
悄无声息地崩溃了,因为我没有将 json 模块导入解释器。有两种解决方案,一种是简单地导入模块,然后您将得到 "MESSAGE ACKNOWLEDGED"
(带引号)。或者您可以完全删除 value_serializer
并将下一行中发送的 value
字符串转换为字节字符串(即 b'MESSAGE ACKNOWLEDGED'
for Python 3),这样您就可以得到不带引号的消息返回。
我还将 Kafka 切换到版本 0.10.2.1 (Scala 2.11),因为 Kafka-python 文档中没有确认它与版本 0.11.0.0
兼容
我的目标是从非文件源获取数据(即在程序中生成或通过 API 发送)并将其发送到火花流。为此,我通过 python-based KafkaProducer
:
$ bin/zookeeper-server-start.sh config/zookeeper.properties &
$ bin/kafka-server-start.sh config/server.properties &
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
$ python
Python 3.6.1| Anaconda custom (64-bit)
> from kafka import KafkaProducer
> import time
> producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
> producer.send(topic = 'my-topic', value = 'MESSAGE ACKNOWLEDGED', timestamp_ms = time.time())
> producer.close()
> exit()
我的问题是从消费者 shell 脚本检查主题时没有任何显示:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic my-topic
^C$
这里有什么遗漏或错误吗?我是 spark/kafka/messaging 系统的新手,所以任何事情都会有所帮助。 Kafka 版本为 0.11.0.0 (Scala 2.11),配置文件没有任何变化。
如果您在向主题发送消息后启动消费者,消费者可能会跳过该消息,因为它会设置主题偏移量(这可以被视为 "starting point" 以供读取)话题到此结束。要更改该行为,请尝试添加 --from-beginning
选项:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
也可以试试kafkacat
,比Kafka的console consumer and producer(恕我直言)方便多了。使用 kafkacat
从 Kafka 读取消息可以使用以下命令执行:
kafkacat -C -b 'localhost:9092' -o beginning -e -D '\n' -t 'my-topic'
希望对您有所帮助。
我发现了问题,value_serializer
悄无声息地崩溃了,因为我没有将 json 模块导入解释器。有两种解决方案,一种是简单地导入模块,然后您将得到 "MESSAGE ACKNOWLEDGED"
(带引号)。或者您可以完全删除 value_serializer
并将下一行中发送的 value
字符串转换为字节字符串(即 b'MESSAGE ACKNOWLEDGED'
for Python 3),这样您就可以得到不带引号的消息返回。
我还将 Kafka 切换到版本 0.10.2.1 (Scala 2.11),因为 Kafka-python 文档中没有确认它与版本 0.11.0.0
兼容