使用 fastavro 从 Kafka 反序列化 Avro
Avro deserialization from Kafka using fastavro
我正在构建一个从 Kafka 接收数据的应用程序。使用 Apache ( https://pypi.org/project/avro-python3/ ) 提供的标准 avro 库时,结果是正确的,但是,反序列化过程非常慢。
class KafkaReceiver:
data = {}
def __init__(self, bootstrap='192.168.1.111:9092'):
self.client = KafkaConsumer(
'topic',
bootstrap_servers=bootstrap,
client_id='app',
api_version=(0, 10, 1)
)
self.schema = avro.schema.parse(open("Schema.avsc", "rb").read())
self.reader = avro.io.DatumReader(self.schema)
def do(self):
for msg in self.client:
bytes_reader = io.BytesIO(msg.value)
decoder = BinaryDecoder(bytes_reader)
self.data = self.reader.read(decoder)
在阅读为什么这么慢时,我发现 fastavro
应该快得多。我是这样使用的:
def do(self):
schema = fastavro.schema.load_schema('Schema.avsc')
for msg in self.client:
bytes_reader = io.BytesIO(msg.value)
bytes_reader.seek(0)
for record in reader(bytes_reader, schema):
self.data = record
而且,由于在使用 Apache 的库时一切正常,我希望在 fastavro
中一切都将以相同的方式工作。但是,当 运行 这个时,我得到
File "fastavro/_read.pyx", line 389, in fastavro._read.read_map
File "fastavro/_read.pyx", line 290, in fastavro._read.read_utf8
File "fastavro/_six.pyx", line 22, in fastavro._six.py3_btou
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xfc in position 3: invalid start byte
我通常不在 Python 中编程,所以我不知道如何处理这个问题。有什么想法吗?
fastavro.reader
需要包含 header 的 avro 文件格式。看起来您拥有的是没有 header 的序列化记录。我认为您可以使用 fastavro.schemaless_reader
.
阅读本文
所以代替:
for record in reader(bytes_reader, schema):
self.data = record
你会做:
self.data = schemaless_reader(bytes_reader, schema)
我正在构建一个从 Kafka 接收数据的应用程序。使用 Apache ( https://pypi.org/project/avro-python3/ ) 提供的标准 avro 库时,结果是正确的,但是,反序列化过程非常慢。
class KafkaReceiver:
data = {}
def __init__(self, bootstrap='192.168.1.111:9092'):
self.client = KafkaConsumer(
'topic',
bootstrap_servers=bootstrap,
client_id='app',
api_version=(0, 10, 1)
)
self.schema = avro.schema.parse(open("Schema.avsc", "rb").read())
self.reader = avro.io.DatumReader(self.schema)
def do(self):
for msg in self.client:
bytes_reader = io.BytesIO(msg.value)
decoder = BinaryDecoder(bytes_reader)
self.data = self.reader.read(decoder)
在阅读为什么这么慢时,我发现 fastavro
应该快得多。我是这样使用的:
def do(self):
schema = fastavro.schema.load_schema('Schema.avsc')
for msg in self.client:
bytes_reader = io.BytesIO(msg.value)
bytes_reader.seek(0)
for record in reader(bytes_reader, schema):
self.data = record
而且,由于在使用 Apache 的库时一切正常,我希望在 fastavro
中一切都将以相同的方式工作。但是,当 运行 这个时,我得到
File "fastavro/_read.pyx", line 389, in fastavro._read.read_map
File "fastavro/_read.pyx", line 290, in fastavro._read.read_utf8
File "fastavro/_six.pyx", line 22, in fastavro._six.py3_btou
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xfc in position 3: invalid start byte
我通常不在 Python 中编程,所以我不知道如何处理这个问题。有什么想法吗?
fastavro.reader
需要包含 header 的 avro 文件格式。看起来您拥有的是没有 header 的序列化记录。我认为您可以使用 fastavro.schemaless_reader
.
所以代替:
for record in reader(bytes_reader, schema):
self.data = record
你会做:
self.data = schemaless_reader(bytes_reader, schema)