Apache Avro - 读取二进制流
Apache Avro - Read binary stream
我正在尝试构建一个摄取 Avro 流消息的 Kafka 消费者。
这是我用来阅读 Avro 消息文件和工作的资源:(https://avro.apache.org/docs/current/gettingstartedpython.html)
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
reader = DataFileReader(open("weather.avro", "rb"), DatumReader(avro.schema.parse(open("mySchema.avsc").read())))
for user in reader:
print (user)
reader.close()
但是,我的问题是,如何读取 Avro 流而不是文件?
我试过类似的方法,但 returns 我的值不对。
def avro_decoder(msg_value):
schema = avro.schema.parse(open("mySchema.avsc").read())
reader = DatumReader(schema)
message_bytes = io.BytesIO(msg_value)
message_bytes.seek(5)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
这是架构
schema = {
"type": "record",
"name": "Weather",
"namespace": "test",
"doc": "A weather reading.",
"fields": [
{
"name": "station",
"type": "string"
},
{
"name": "time",
"type": "long"
},
{
"name": "temp",
"type": "int"
}
]
}
这是 avro 消息:
Objavro.codecnullavro.schemaú{"type": "record", "doc": "A weather reading.", "name": "test.Weather", "fields": [{"name": "station", "type": "string"}, {"name": "time", "type": "long"}, {"name": "temp", "type": "int"}]} ñKœV^¤‹ÿŸÕ>Z:v&011990-99999˜ÒïÖ
ñKœV^¤‹ÿŸÕ>Z:v
有人可以帮忙吗?
我刚刚弄清楚问题出在哪里。
所以问题是我发送了错误的消息!我发送了 .avro 文件中的消息,但实际上,正确的必须是这样的:123111
我正在尝试构建一个摄取 Avro 流消息的 Kafka 消费者。 这是我用来阅读 Avro 消息文件和工作的资源:(https://avro.apache.org/docs/current/gettingstartedpython.html)
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
reader = DataFileReader(open("weather.avro", "rb"), DatumReader(avro.schema.parse(open("mySchema.avsc").read())))
for user in reader:
print (user)
reader.close()
但是,我的问题是,如何读取 Avro 流而不是文件?
我试过类似的方法,但 returns 我的值不对。
def avro_decoder(msg_value):
schema = avro.schema.parse(open("mySchema.avsc").read())
reader = DatumReader(schema)
message_bytes = io.BytesIO(msg_value)
message_bytes.seek(5)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
这是架构
schema = {
"type": "record",
"name": "Weather",
"namespace": "test",
"doc": "A weather reading.",
"fields": [
{
"name": "station",
"type": "string"
},
{
"name": "time",
"type": "long"
},
{
"name": "temp",
"type": "int"
}
]
}
这是 avro 消息:
Objavro.codecnullavro.schemaú{"type": "record", "doc": "A weather reading.", "name": "test.Weather", "fields": [{"name": "station", "type": "string"}, {"name": "time", "type": "long"}, {"name": "temp", "type": "int"}]} ñKœV^¤‹ÿŸÕ>Z:v&011990-99999˜ÒïÖ
ñKœV^¤‹ÿŸÕ>Z:v
有人可以帮忙吗?
我刚刚弄清楚问题出在哪里。 所以问题是我发送了错误的消息!我发送了 .avro 文件中的消息,但实际上,正确的必须是这样的:123111