使用 python 在文件中写入 avro 记录

Write avro record in file using python

我有名称为 test.avsc 且架构为:

的 avro 文件
{
    "namespace": "test",
    "type": "record",
    "name": "test.schema",
    "fields": [
      { "name": "device", "type": ["null", "string"] , "default" : null }
    ]
}

我的 avro class 是:

import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter

class AvroHelper(object):

    avro_writer = None
    codec = 'deflate'

    def __init__(self, schemaf, avrofile):
        schema = avro.schema.parse(schemaf)
        self.avro_writer = DataFileWriter(avrofile, DatumWriter(), schema, codec=self.codec)
        self.num_rows_written_to_avro = 0

    def append(self, row):
        self.avro_writer.append(row)

我正在尝试将数据写入为:

file = open('test1.avro', 'wb')
avro_writer = AvroHelper('test.avsc', file)

rec= { 'device': "1" }
avro_writer.append(rec)

我在尝试将 avro 记录写入文件时出现异常:


File "search_console_api_client.py", line 31, in __init__
    schema = avro.schema.parse(schema_str)
  File "/usr/local/lib/python3.7/site-packages/avro/schema.py", line 1238, in parse
    % (json_string, exn))
avro.schema.SchemaParseException: Error parsing schema from JSON: 'test.avsc'. Error message: JSONDecodeError('Expecting value: line 1 column 1 (char 0)').

我怎样才能让它发挥作用?

你在做

avro_writer = AvroHelper('test.avsc', file)

这意味着您的 __init__ 正在发生这种情况:

schema = avro.schema.parse('test.avsc')

但是,parse() 函数应该接受架构的 JSON 字符串,而不是文件名。相反,您可能想做这样的事情:

avro_writer = AvroHelper(open('test.avsc').read(), file)