TypeError: Object of type 'mappingproxy' is not JSON serializable
TypeError: Object of type 'mappingproxy' is not JSON serializable
我正在尝试使用模式注册表使用 confluent-kafka-python 的 AvroProducer 发布 avro 消息。但是代码无法序列化枚举类型。下面是代码和错误跟踪。非常感谢任何帮助。
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
from example_schema.schema_classes import SCHEMA as value_schema
from example_schema.com.acme import *
import json
def function():
avroProducer = AvroProducer({ 'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8081' }, default_value_schema=value_schema)
print(avroProducer)
obj = Test()
obj.name = 'vinay'
obj.age = 11
obj.sex = 'm'
obj.myenum = Suit.CLUBS
print(str(obj))
avroProducer.produce(topic='test_topic',value=obj)
avroProducer.flush()
function()
File "main.py", line 16, in function
avroProducer.produce(topic='test_topic',value=json.dumps(obj))
File "/home/priv/anaconda3/lib/python3.6/site-packages/confluent_kafka/avro/__init__.py", line 80, in produce
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
File "/home/priv/anaconda3/lib/python3.6/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 105, in encode_record_with_schema
schema_id = self.registry_client.register(subject, schema)
File "/home/priv/anaconda3/lib/python3.6/site-packages/confluent_kafka/avro/cached_schema_registry_client.py", line 216, in register
body = {'schema': json.dumps(avro_schema.to_json())}
File "/home/priv/anaconda3/lib/python3.6/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/home/priv/anaconda3/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/home/priv/anaconda3/lib/python3.6/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/home/priv/anaconda3/lib/python3.6/json/encoder.py", line 180, in default
o.__class__.__name__)
TypeError: Object of type 'mappingproxy' is not JSON serializable
Avro 架构 -
{
"type": "record",
"name": "Test",
"namespace": "com.acme",
"fields": [{
"name": "name",
"type": "string"
}, {
"name": "age",
"type": "int"
}, {
"name": "sex",
"type": "string"
}, {
"name": "myenum",
"type": ["null", {
"type": "enum",
"name": "Suit",
"symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
}
]
}
]
}
既然您使用的是 AvroProducer,请不要json.dumps
任何东西。
如果你看生产者的例子,发送的对象是一个字典,而不是一个JSON字符串
https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro-cli.py
根据此 link,Confluent-Kafka python API 与 avro-python3 1.9.0 存在兼容性问题。对我有用的解决方案是将 avro-python3 API 从 1.9.0 降级到 1.8.2.
如果您没有将 avro-python3 API 从 1.9.0 降级到 1.8.2 的选项,您可能不得不放弃 enum
类型并替换它用 string
代替。
"type": ["null", {
"type": "enum",
"name": "Suit",
"symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
}
]
变成
"type": ["null", "string"]
我正在尝试使用模式注册表使用 confluent-kafka-python 的 AvroProducer 发布 avro 消息。但是代码无法序列化枚举类型。下面是代码和错误跟踪。非常感谢任何帮助。
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
from example_schema.schema_classes import SCHEMA as value_schema
from example_schema.com.acme import *
import json
def function():
avroProducer = AvroProducer({ 'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8081' }, default_value_schema=value_schema)
print(avroProducer)
obj = Test()
obj.name = 'vinay'
obj.age = 11
obj.sex = 'm'
obj.myenum = Suit.CLUBS
print(str(obj))
avroProducer.produce(topic='test_topic',value=obj)
avroProducer.flush()
function()
File "main.py", line 16, in function
avroProducer.produce(topic='test_topic',value=json.dumps(obj))
File "/home/priv/anaconda3/lib/python3.6/site-packages/confluent_kafka/avro/__init__.py", line 80, in produce
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
File "/home/priv/anaconda3/lib/python3.6/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 105, in encode_record_with_schema
schema_id = self.registry_client.register(subject, schema)
File "/home/priv/anaconda3/lib/python3.6/site-packages/confluent_kafka/avro/cached_schema_registry_client.py", line 216, in register
body = {'schema': json.dumps(avro_schema.to_json())}
File "/home/priv/anaconda3/lib/python3.6/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/home/priv/anaconda3/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/home/priv/anaconda3/lib/python3.6/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/home/priv/anaconda3/lib/python3.6/json/encoder.py", line 180, in default
o.__class__.__name__)
TypeError: Object of type 'mappingproxy' is not JSON serializable
Avro 架构 -
{
"type": "record",
"name": "Test",
"namespace": "com.acme",
"fields": [{
"name": "name",
"type": "string"
}, {
"name": "age",
"type": "int"
}, {
"name": "sex",
"type": "string"
}, {
"name": "myenum",
"type": ["null", {
"type": "enum",
"name": "Suit",
"symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
}
]
}
]
}
既然您使用的是 AvroProducer,请不要json.dumps
任何东西。
如果你看生产者的例子,发送的对象是一个字典,而不是一个JSON字符串
https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro-cli.py
Confluent-Kafka python API 与 avro-python3 1.9.0 存在兼容性问题。对我有用的解决方案是将 avro-python3 API 从 1.9.0 降级到 1.8.2.
如果您没有将 avro-python3 API 从 1.9.0 降级到 1.8.2 的选项,您可能不得不放弃 enum
类型并替换它用 string
代替。
"type": ["null", {
"type": "enum",
"name": "Suit",
"symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
}
]
变成
"type": ["null", "string"]