Kafka-python如何消费json消息
Kafka-python How to consume json message
我是 Python 的新手,从 Kafka 开始。我有一个要求,我需要发送和使用 json 消息。为此,我使用 kafka-python 与 Kafka 进行通信。
#Producer.py
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('offering_new', {"dataObjectID": "test1"})
#Consumer.py
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',auto_offset_reset='earliest', value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['offering_new'])
for message in consumer :
print(message)
但是,我在消费者方面遇到以下异常:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1111, in __next__
return next(self._iterator)
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in _message_generator
for msg in self._fetcher:
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 482, in __next__
return next(self._iterator)
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 388, in _message_generator
self._next_partition_records = self._parse_fetched_data(completion)
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 799, in _parse_fetched_data
unpacked = list(self._unpack_message_set(tp, records))
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 458, in _unpack_message_set
tp.topic, record.value)
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 492, in _deserialize
return f(bytes_)
File "<stdin>", line 1, in <lambda>
File "/usr/lib/python3.6/json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/usr/lib/python3.6/json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib/python3.6/json/decoder.py", line 357, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
我是运行上面的代码在PythonShell。谁能告诉我哪里出错了?
在 auto_offset_reset='earliest'
中,您已将您的消费者配置为阅读该主题中的所有消息。 JSON 解码错误表明之前为主题生成的某些消息实际上不是 JSON 格式。
部分解决方案:
(1) 从主题的尾部开始消费:auto_offset_reset='latest'
(2) 开始新话题:consumer.subscribe(['offering_new_too'])
(3) 使用更全面的解串器:
def forgiving_json_deserializer(v):
return if v is None
try:
return json.loads(v.decode('utf-8'))
except json.decoder.JSONDecodeError:
log.exception('Unable to decode: %s', v)
return None
KafkaConsumer(value_deserializer=forgiving_json_deserializer, ...)
希望对您有所帮助!
确保您已遵循以下流程
pip install kafka-python
在 python 脚本中的 lib 下面导入
from kafka import KafkaConsumer
from json import loads
像下面这样创建 kafka 消费者对象
consumer = KafkaConsumer(
'spring_test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')));
打印所有消息
for message in consumer:
message = message.value;
print('{}'.format(message))
希望这也能有所帮助
我是 Python 的新手,从 Kafka 开始。我有一个要求,我需要发送和使用 json 消息。为此,我使用 kafka-python 与 Kafka 进行通信。
#Producer.py
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('offering_new', {"dataObjectID": "test1"})
#Consumer.py
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',auto_offset_reset='earliest', value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['offering_new'])
for message in consumer :
print(message)
但是,我在消费者方面遇到以下异常:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1111, in __next__
return next(self._iterator)
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in _message_generator
for msg in self._fetcher:
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 482, in __next__
return next(self._iterator)
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 388, in _message_generator
self._next_partition_records = self._parse_fetched_data(completion)
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 799, in _parse_fetched_data
unpacked = list(self._unpack_message_set(tp, records))
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 458, in _unpack_message_set
tp.topic, record.value)
File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 492, in _deserialize
return f(bytes_)
File "<stdin>", line 1, in <lambda>
File "/usr/lib/python3.6/json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/usr/lib/python3.6/json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib/python3.6/json/decoder.py", line 357, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
我是运行上面的代码在PythonShell。谁能告诉我哪里出错了?
在 auto_offset_reset='earliest'
中,您已将您的消费者配置为阅读该主题中的所有消息。 JSON 解码错误表明之前为主题生成的某些消息实际上不是 JSON 格式。
部分解决方案:
(1) 从主题的尾部开始消费:auto_offset_reset='latest'
(2) 开始新话题:consumer.subscribe(['offering_new_too'])
(3) 使用更全面的解串器:
def forgiving_json_deserializer(v):
return if v is None
try:
return json.loads(v.decode('utf-8'))
except json.decoder.JSONDecodeError:
log.exception('Unable to decode: %s', v)
return None
KafkaConsumer(value_deserializer=forgiving_json_deserializer, ...)
希望对您有所帮助!
确保您已遵循以下流程
pip install kafka-python
在 python 脚本中的 lib 下面导入
from kafka import KafkaConsumer
from json import loads
像下面这样创建 kafka 消费者对象
consumer = KafkaConsumer(
'spring_test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')));
打印所有消息
for message in consumer:
message = message.value;
print('{}'.format(message))
希望这也能有所帮助