无法使用 Kafka-Python 的反序列化器使用来自 Kafka 的 JSON 消息
Can't Consume JSON Messages From Kafka Using Kafka-Python's Deserializer
我正在尝试通过 Kafka 发送一个非常简单的 JSON 对象,并使用 Python 和 kafka-python 将其读出另一端。但是,我不断看到以下错误:
2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:ERROR:10620:Error processing callback
Traceback (most recent call last):
File "C:\Anaconda2\lib\site-packages\kafka\future.py", line 79, in _call_backs
f(value)
File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 760, in _handle_fetch_response
unpacked = list(self._unpack_message_set(tp, messages))
File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 539, in _unpack_message_set
tp.topic, msg.value)
File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 570, in _deserialize
return f(bytes_)
File "C:\Users\myUser\workspace\PythonKafkaTest\src\example.py", line 55, in <lambda>
value_deserializer=lambda m: json.loads(m).decode('utf-8'))
File "C:\Anaconda2\lib\json\__init__.py", line 339, in loads
return _default_decoder.decode(s)
File "C:\Anaconda2\lib\json\decoder.py", line 364, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "C:\Anaconda2\lib\json\decoder.py", line 382, in raw_decode
raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded
我做了一些研究,导致此错误的最常见原因是 JSON 错误。我已经尝试在发送 JSON 之前打印出它,方法是将以下内容添加到我的代码中,并且 JSON 打印时没有错误。
while True:
json_obj1 = json.dumps({"dataObjectID": "test1"})
print json_obj1
producer.send('my-topic', {"dataObjectID": "test1"})
producer.send('my-topic', {"dataObjectID": "test2"})
time.sleep(1)
这让我怀疑我可以生产 json,但不能消费它。
这是我的代码:
import threading
import logging
import time
import json
from kafka import KafkaConsumer, KafkaProducer
class Producer(threading.Thread):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
while True:
producer.send('my-topic', {"dataObjectID": "test1"})
producer.send('my-topic', {"dataObjectID": "test2"})
time.sleep(1)
class Consumer(threading.Thread):
daemon = True
def run(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m).decode('utf-8'))
consumer.subscribe(['my-topic'])
for message in consumer:
print (message)
def main():
threads = [
Producer(),
Consumer()
]
for t in threads:
t.start()
time.sleep(10)
if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:' +
'%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
main()
如果我删除value_serializer和value_deserializer,我可以成功发送和接收字符串。当我 运行 该代码时,我可以看到我正在发送的 JSON。这是一个简短的片段:
ConsumerRecord(topic=u'my-topic', partition=0, offset=5742, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test1"}', checksum=-1301891455, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5743, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test2"}', checksum=-1340077864, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5744, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5745, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5746, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5747, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)
所以我尝试从消费者中删除 value_deserializer,并且该代码执行但没有反序列化器,消息以字符串形式出现,这不是我需要的。那么,为什么 value_deserializer 不起作用?有没有其他方法可以从我应该使用的 Kafka 消息中获取 JSON?
原来问题出在 value_deserializer=lambda m: json.loads(m).decode('utf-8')
的解码部分,当我将其更改为 value_deserializer=lambda m: json.loads(m)
时,我看到从 Kafka 读取的对象类型现在是一个字典。根据 python 的 JSON 文档中的以下信息,哪个是正确的:
|---------------------|------------------|
| JSON | Python |
|---------------------|------------------|
| object | dict |
|---------------------|------------------|
| array | list |
|---------------------|------------------|
| string | unicode |
|---------------------|------------------|
| number (int) | int, long |
|---------------------|------------------|
| number (real) | float |
|---------------------|------------------|
| true | True |
|---------------------|------------------|
| false | False |
|---------------------|------------------|
| null | None |
|---------------------|------------------|
首先将消息解码为 utf-8,然后 json。load/dump 我的问题解决了:
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
而不是:
value_deserializer=lambda m: json.loads(m).decode('utf-8')
希望这对制作方也有用
您不需要 lambda
... 而不是
value_deserializer=lambda m: json.loads(m)
你应该使用
value_deserializer=json.load
我正在尝试通过 Kafka 发送一个非常简单的 JSON 对象,并使用 Python 和 kafka-python 将其读出另一端。但是,我不断看到以下错误:
2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:ERROR:10620:Error processing callback
Traceback (most recent call last):
File "C:\Anaconda2\lib\site-packages\kafka\future.py", line 79, in _call_backs
f(value)
File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 760, in _handle_fetch_response
unpacked = list(self._unpack_message_set(tp, messages))
File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 539, in _unpack_message_set
tp.topic, msg.value)
File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 570, in _deserialize
return f(bytes_)
File "C:\Users\myUser\workspace\PythonKafkaTest\src\example.py", line 55, in <lambda>
value_deserializer=lambda m: json.loads(m).decode('utf-8'))
File "C:\Anaconda2\lib\json\__init__.py", line 339, in loads
return _default_decoder.decode(s)
File "C:\Anaconda2\lib\json\decoder.py", line 364, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "C:\Anaconda2\lib\json\decoder.py", line 382, in raw_decode
raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded
我做了一些研究,导致此错误的最常见原因是 JSON 错误。我已经尝试在发送 JSON 之前打印出它,方法是将以下内容添加到我的代码中,并且 JSON 打印时没有错误。
while True:
json_obj1 = json.dumps({"dataObjectID": "test1"})
print json_obj1
producer.send('my-topic', {"dataObjectID": "test1"})
producer.send('my-topic', {"dataObjectID": "test2"})
time.sleep(1)
这让我怀疑我可以生产 json,但不能消费它。
这是我的代码:
import threading
import logging
import time
import json
from kafka import KafkaConsumer, KafkaProducer
class Producer(threading.Thread):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
while True:
producer.send('my-topic', {"dataObjectID": "test1"})
producer.send('my-topic', {"dataObjectID": "test2"})
time.sleep(1)
class Consumer(threading.Thread):
daemon = True
def run(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m).decode('utf-8'))
consumer.subscribe(['my-topic'])
for message in consumer:
print (message)
def main():
threads = [
Producer(),
Consumer()
]
for t in threads:
t.start()
time.sleep(10)
if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:' +
'%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
main()
如果我删除value_serializer和value_deserializer,我可以成功发送和接收字符串。当我 运行 该代码时,我可以看到我正在发送的 JSON。这是一个简短的片段:
ConsumerRecord(topic=u'my-topic', partition=0, offset=5742, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test1"}', checksum=-1301891455, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5743, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test2"}', checksum=-1340077864, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5744, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5745, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5746, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5747, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)
所以我尝试从消费者中删除 value_deserializer,并且该代码执行但没有反序列化器,消息以字符串形式出现,这不是我需要的。那么,为什么 value_deserializer 不起作用?有没有其他方法可以从我应该使用的 Kafka 消息中获取 JSON?
原来问题出在 value_deserializer=lambda m: json.loads(m).decode('utf-8')
的解码部分,当我将其更改为 value_deserializer=lambda m: json.loads(m)
时,我看到从 Kafka 读取的对象类型现在是一个字典。根据 python 的 JSON 文档中的以下信息,哪个是正确的:
|---------------------|------------------|
| JSON | Python |
|---------------------|------------------|
| object | dict |
|---------------------|------------------|
| array | list |
|---------------------|------------------|
| string | unicode |
|---------------------|------------------|
| number (int) | int, long |
|---------------------|------------------|
| number (real) | float |
|---------------------|------------------|
| true | True |
|---------------------|------------------|
| false | False |
|---------------------|------------------|
| null | None |
|---------------------|------------------|
首先将消息解码为 utf-8,然后 json。load/dump 我的问题解决了:
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
而不是:
value_deserializer=lambda m: json.loads(m).decode('utf-8')
希望这对制作方也有用
您不需要 lambda
... 而不是
value_deserializer=lambda m: json.loads(m)
你应该使用
value_deserializer=json.load