如何解码 Python 中的 Avro 消息?
How do I decode an Avro message in Python?
我在解码 Python (3.6.11) 中的 Avro 消息时遇到问题。我已经尝试了 avro
和 fastavro
包。所以我认为问题可能是我提供的字节不正确。
使用avro:
from avro.io import DatumReader, BinaryDecoder
import avro.schema
from io import BytesIO
schema = avro.schema.parse("""
{
"type": "record",
"name": "User",
"namespace": "example.avro",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "favorite_number",
"type": [
"int",
"null"
]
},
{
"name": "favorite_color",
"type": [
"string",
"null"
]
}
]
}
""")
rb = BytesIO(b'{"name": "Alyssa", "favorite_number": 256}')
decoder = BinaryDecoder(rb)
reader = DatumReader(schema)
msg = reader.read(decoder)
print(msg)
Traceback (most recent call last):
File "main.py", line 36, in <module>
msg = reader.read(decoder)
File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 626, in read
return self.read_data(self.writers_schema, self.readers_schema, decoder)
File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 698, in read_data
return self.read_record(writers_schema, readers_schema, decoder)
File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 898, in read_record
field_val = self.read_data(field.type, readers_field.type, decoder)
File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 638, in read_data
return self.read_union(writers_schema, readers_schema, decoder)
File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 854, in read_union
index_of_schema = int(decoder.read_long())
File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 240, in read_long
b = ord(self.read(1))
TypeError: ord() expected a character, but string of length 0 found
使用fastavro:
from fastavro import schemaless_reader, parse_schema
from io import BytesIO
schema = parse_schema(
{
"type": "record",
"name": "User",
"namespace": "example.avro",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "favorite_number",
"type": [
"int",
"null"
]
},
{
"name": "favorite_color",
"type": [
"string",
"null"
]
}
]
}
)
rb = BytesIO(b'{"name": "Alyssa", "favorite_number": 256}')
msg = schemaless_reader(rb, schema)
print(msg)
Traceback (most recent call last):
File "main.py", line 33, in <module>
msg = schemaless_reader(rb, schema)
File "fastavro/_read.pyx", line 969, in fastavro._read.schemaless_reader
File "fastavro/_read.pyx", line 981, in fastavro._read.schemaless_reader
File "fastavro/_read.pyx", line 652, in fastavro._read._read_data
File "fastavro/_read.pyx", line 510, in fastavro._read.read_record
File "fastavro/_read.pyx", line 644, in fastavro._read._read_data
File "fastavro/_read.pyx", line 429, in fastavro._read.read_union
File "fastavro/_read.pyx", line 200, in fastavro._read.read_long
StopIteration
我不知道我正在编码的消息是否格式错误,或者问题是否与编码本身有关。有什么建议吗?
我会和 fastavro 谈谈,因为这是我最了解的。
您的 rb
变量应该是您尝试读取的 avro 二进制文件(不是数据)。要获得此二进制文件的示例,您可以写:
rb = BytesIO()
schemaless_writer(rb, schema, {"name": "Alyssa", "favorite_number": 256})
rb.getvalue() # b'\x0cAlyssa\x00\x80\x04\x02'
然后你可以做你想做的并读取生成的二进制文件:
rb = BytesIO(b'\x0cAlyssa\x00\x80\x04\x02')
data = schemaless_reader(rb, schema)
# {'name': 'Alyssa', 'favorite_number': 256, 'favorite_color': None}
我在解码 Python (3.6.11) 中的 Avro 消息时遇到问题。我已经尝试了 avro
和 fastavro
包。所以我认为问题可能是我提供的字节不正确。
使用avro:
from avro.io import DatumReader, BinaryDecoder
import avro.schema
from io import BytesIO
schema = avro.schema.parse("""
{
"type": "record",
"name": "User",
"namespace": "example.avro",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "favorite_number",
"type": [
"int",
"null"
]
},
{
"name": "favorite_color",
"type": [
"string",
"null"
]
}
]
}
""")
rb = BytesIO(b'{"name": "Alyssa", "favorite_number": 256}')
decoder = BinaryDecoder(rb)
reader = DatumReader(schema)
msg = reader.read(decoder)
print(msg)
Traceback (most recent call last):
File "main.py", line 36, in <module>
msg = reader.read(decoder)
File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 626, in read
return self.read_data(self.writers_schema, self.readers_schema, decoder)
File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 698, in read_data
return self.read_record(writers_schema, readers_schema, decoder)
File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 898, in read_record
field_val = self.read_data(field.type, readers_field.type, decoder)
File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 638, in read_data
return self.read_union(writers_schema, readers_schema, decoder)
File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 854, in read_union
index_of_schema = int(decoder.read_long())
File "/opt/virtualenvs/python3/lib/python3.8/site-packages/avro/io.py", line 240, in read_long
b = ord(self.read(1))
TypeError: ord() expected a character, but string of length 0 found
使用fastavro:
from fastavro import schemaless_reader, parse_schema
from io import BytesIO
schema = parse_schema(
{
"type": "record",
"name": "User",
"namespace": "example.avro",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "favorite_number",
"type": [
"int",
"null"
]
},
{
"name": "favorite_color",
"type": [
"string",
"null"
]
}
]
}
)
rb = BytesIO(b'{"name": "Alyssa", "favorite_number": 256}')
msg = schemaless_reader(rb, schema)
print(msg)
Traceback (most recent call last):
File "main.py", line 33, in <module>
msg = schemaless_reader(rb, schema)
File "fastavro/_read.pyx", line 969, in fastavro._read.schemaless_reader
File "fastavro/_read.pyx", line 981, in fastavro._read.schemaless_reader
File "fastavro/_read.pyx", line 652, in fastavro._read._read_data
File "fastavro/_read.pyx", line 510, in fastavro._read.read_record
File "fastavro/_read.pyx", line 644, in fastavro._read._read_data
File "fastavro/_read.pyx", line 429, in fastavro._read.read_union
File "fastavro/_read.pyx", line 200, in fastavro._read.read_long
StopIteration
我不知道我正在编码的消息是否格式错误,或者问题是否与编码本身有关。有什么建议吗?
我会和 fastavro 谈谈,因为这是我最了解的。
您的 rb
变量应该是您尝试读取的 avro 二进制文件(不是数据)。要获得此二进制文件的示例,您可以写:
rb = BytesIO()
schemaless_writer(rb, schema, {"name": "Alyssa", "favorite_number": 256})
rb.getvalue() # b'\x0cAlyssa\x00\x80\x04\x02'
然后你可以做你想做的并读取生成的二进制文件:
rb = BytesIO(b'\x0cAlyssa\x00\x80\x04\x02')
data = schemaless_reader(rb, schema)
# {'name': 'Alyssa', 'favorite_number': 256, 'favorite_color': None}