kafka-python 消费者给出错误
kafka-python consumer giving errors
我对 kafka 和 kafka-python 还很陌生。安装 kafka-python 后,我从这里尝试了一个简单的消费者代码实现 - http://kafka-python.readthedocs.io/en/master/usage.html
我一直在 kafka 的 bin 目录中编写消费者代码,并尝试从那里 运行ning python 代码。但是我收到以下错误:
Traceback (most recent call last):
File "KafkaConsumer.py", line 4, in
for message in consumer:
File "/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py", line 559, in next
return type(self).next(self)
File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 915, in next
return next(self._iterator)
File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 876, in _message_generator
for msg in self._fetcher:
File "/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py", line 559, in next
return type(self).next(self)
File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 520, in next
return next(self._iterator)
File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 477, in _message_generator
for msg in self._unpack_message_set(tp, messages):
File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 372, in _unpack_message_set
inner_mset = msg.decompress()
File "/usr/local/lib/python2.7/dist-packages/kafka/protocol/message.py", line 121, in decompress
assert has_snappy(), 'Snappy decompression unsupported'
AssertionError: Snappy decompression unsupported
这是我一直在尝试的代码 运行:
from kafka import KafkaConsumer
consumer = KafkaConsumer ('mytopic',bootstrap_servers = ['localhost:9092'], group_id='test-consumer-group')
print "Consuming messages from the given topic"
for message in consumer:
print("%s:%d%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
因为我是 Kafka 的新手,所以我很难理解我做错了什么。
您似乎缺少 python-snappy,它是读取以 snappy 格式压缩的数据所必需的。
您需要 snappy
和 snappy-devel
,您可以使用 yum、apt-get 等安装它们。
然后尝试 pip install python-snappy
我对 kafka 和 kafka-python 还很陌生。安装 kafka-python 后,我从这里尝试了一个简单的消费者代码实现 - http://kafka-python.readthedocs.io/en/master/usage.html
我一直在 kafka 的 bin 目录中编写消费者代码,并尝试从那里 运行ning python 代码。但是我收到以下错误:
Traceback (most recent call last): File "KafkaConsumer.py", line 4, in for message in consumer: File "/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py", line 559, in next return type(self).next(self) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 915, in next return next(self._iterator) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 876, in _message_generator for msg in self._fetcher: File "/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py", line 559, in next return type(self).next(self) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 520, in next return next(self._iterator) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 477, in _message_generator for msg in self._unpack_message_set(tp, messages): File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 372, in _unpack_message_set inner_mset = msg.decompress() File "/usr/local/lib/python2.7/dist-packages/kafka/protocol/message.py", line 121, in decompress assert has_snappy(), 'Snappy decompression unsupported' AssertionError: Snappy decompression unsupported
这是我一直在尝试的代码 运行:
from kafka import KafkaConsumer
consumer = KafkaConsumer ('mytopic',bootstrap_servers = ['localhost:9092'], group_id='test-consumer-group')
print "Consuming messages from the given topic"
for message in consumer:
print("%s:%d%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
因为我是 Kafka 的新手,所以我很难理解我做错了什么。
您似乎缺少 python-snappy,它是读取以 snappy 格式压缩的数据所必需的。
您需要 snappy
和 snappy-devel
,您可以使用 yum、apt-get 等安装它们。
然后尝试 pip install python-snappy