将 JSON 个文件转换为 AVRO

Convert JSON file to AVRO

我是 AVRO 的新手 Python。我有一个用例,我想将 JSON 文件转换为 Avro 文件。我以 .avsc 格式存储了我的架构,以 .json 格式存储了 JSON 数据。现在我想把 JSON 文件和 .avsc 文件放在一起,并将 JSON 文件序列化到 Avro 中。下面是我的代码,我收到一个错误“avro.io.AvroTypeException:数据 file.json 不是架构的示例”。我不确定我做错了什么。在我的 writer.append 语句中,我希望脚本从 JSON 文件中获取数据并将其附加到 .avro 文件中,但采用序列化格式。我不确定如何处理这个问题,我们将不胜感激。

这是我的代码:

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from avro import schema, datafile, io
import json
from avro import schema, datafile, io


def json_to_avro():
        fo = open("laird.txt", "r")
        data = fo.readlines()
        final_header = []
        final_rec = []
        for header in data[0:1]:
            header = header.strip("\n")
            header = header.split(",")
            final_header = header
        for rec in data[1:]:
            rec = rec.strip("\n")
            rec = rec.split(" ")
            rec = ' '.join(rec).split()
            final_rec = rec
        final_dict = dict(zip(final_header,final_rec))
        # print(final_dict)
        json_dumps = json.dumps(final_dict, ensure_ascii=False)
        # print(json_dumps)
        schema = avro.schema.parse(open("laird.avsc", "rb").read())
        # print(schema)

        writer = DataFileWriter(open("laird.avro", "wb"), DatumWriter(), schema)

        with open("laird.json") as fp:
            contents = json.load(fp)
            print(contents)

        writer.append(contents)

        writer.close()

json_to_avro()

laird.avsc 文件中的数据:

{
  "name": "MyClass",
  "type": "record",
  "namespace": "com.acme.avro",
  "fields": [
    {
      "name": "event_type",
      "type": "string"
    },
    {
      "name": "event_data",
      "type": {
        "name": "event_data",
        "type": "record",
        "fields": [
          {
            "name": "device_id",
            "type": "string"
          },
          {
            "name": "user_id",
            "type": "string"
          },
          {
            "name": "payload",
            "type": {
              "type": "array",
              "items": {
                "name": "payload_record",
                "type": "record",
                "fields": [
                  {
                    "name": "name",
                    "type": "string"
                  },
                  {
                    "name": "sensor_id",
                    "type": "string"
                  },
                  {
                    "name": "type",
                    "type": "string"
                  },
                  {
                    "name": "unit",
                    "type": "string"
                  },
                  {
                    "name": "value",
                    "type": "float"
                  },
                  {
                    "name": "channel",
                    "type": "int"
                  },
                  {
                    "name": "timestamp",
                    "type": "long"
                  }
                ]
              }
            }
          },
          {
            "name": "client_id",
            "type": "string"
          },
          {
            "name": "hardware_id",
            "type": "string"
          },
          {
            "name": "timestamp",
            "type": "long"
          },
          {
            "name": "application_id",
            "type": "string"
          },
          {
            "name": "device_type_id",
            "type": "string"
          }
        ]
      }
    },
    {
      "name": "company",
      "type": {
        "name": "company",
        "type": "record",
        "fields": [
          {
            "name": "id",
            "type": "int"
          },
          {
            "name": "address",
            "type": "string"
          },
          {
            "name": "city",
            "type": "string"
          },
          {
            "name": "country",
            "type": "string"
          },
          {
            "name": "created_at",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "industry",
            "type": "string"
          },
          {
            "name": "latitude",
            "type": "float"
          },
          {
            "name": "longitude",
            "type": "float"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "state",
            "type": "string"
          },
          {
            "name": "status",
            "type": "int"
          },
          {
            "name": "timezone",
            "type": "string"
          },
          {
            "name": "updated_at",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "user_id",
            "type": "string"
          },
          {
            "name": "zip",
            "type": "string"
          }
        ]
      }
    },
    {
      "name": "location",
      "type": {
        "name": "location",
        "type": "record",
        "fields": [
          {
            "name": "id",
            "type": "int"
          },
          {
            "name": "address",
            "type": "string"
          },
          {
            "name": "city",
            "type": "string"
          },
          {
            "name": "country",
            "type": "string"
          },
          {
            "name": "created_at",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "industry",
            "type": "string"
          },
          {
            "name": "latitude",
            "type": "float"
          },
          {
            "name": "longitude",
            "type": "float"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "state",
            "type": "string"
          },
          {
            "name": "status",
            "type": "int"
          },
          {
            "name": "timezone",
            "type": "string"
          },
          {
            "name": "updated_at",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "user_id",
            "type": "string"
          },
          {
            "name": "zip",
            "type": "string"
          },
          {
            "name": "company_id",
            "type": "int"
          }
        ]
      }
    },
    {
      "name": "device_type",
      "type": {
        "name": "device_type",
        "type": "record",
        "fields": [
          {
            "name": "id",
            "type": "string"
          },
          {
            "name": "application_id",
            "type": "string"
          },
          {
            "name": "category",
            "type": "string"
          },
          {
            "name": "codec",
            "type": "string"
          },
          {
            "name": "data_type",
            "type": "string"
          },
          {
            "name": "description",
            "type": "string"
          },
          {
            "name": "manufacturer",
            "type": "string"
          },
          {
            "name": "model",
            "type": "string"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "parent_constraint",
            "type": "string"
          },
          {
            "name": "proxy_handler",
            "type": "string"
          },
          {
            "name": "subcategory",
            "type": "string"
          },
          {
            "name": "transport_protocol",
            "type": "string"
          },
          {
            "name": "version",
            "type": "string"
          },
          {
            "name": "created_at",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "updated_at",
            "type": "int",
            "logicalType": "date"
          }
        ]
      }
    },
    {
      "name": "device",
      "type": {
        "name": "device",
        "type": "record",
        "fields": [
          {
            "name": "id",
            "type": "int"
          },
          {
            "name": "thing_name",
            "type": "string"
          },
          {
            "name": "created_at",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "updated_at",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "status",
            "type": "int"
          }
        ]
      }
    }
  ]
}

laird.json

中的数据
{
  "event_type": "uplink",
  "event_data": {
    "device_id": "fec5d310-88bf-11ea-aadf-2bd85a1038fa",
    "user_id": "ba5e224b-fcc2-4e1b-b2b3-97ef57b9ba60",
    "payload": [
      {
        "name": "Temperature",
        "sensor_id": "fef87bd0-88bf-11ea-ae96-6fae9a4d5562",
        "type": "temp",
        "unit": "c",
        "value": 21.9,
        "channel": 3,
        "timestamp": 1603300033446
      },
      {
        "name": "Humidity",
        "sensor_id": "fef399d0-88bf-11ea-a424-59b141c8d9bf",
        "type": "rel_hum",
        "unit": "p",
        "value": 74,
        "channel": 4,
        "timestamp": 1603300033446
      },
      {
        "name": "Battery",
        "sensor_id": "feef2d00-88bf-11ea-aadf-2bd85a1038fa",
        "type": "batt",
        "unit": "p",
        "value": 100,
        "channel": 5,
        "timestamp": 1603300033446
      },
      {
        "name": "RSSI",
        "sensor_id": "fef658f0-88bf-11ea-ae96-6fae9a4d5562",
        "type": "rssi",
        "unit": "dbm",
        "value": -42,
        "channel": 100,
        "timestamp": 1603300033446
      },
      {
        "name": "SNR",
        "sensor_id": "",
        "type": "snr",
        "unit": "db",
        "value": 10.2,
        "channel": 101,
        "timestamp": 1603300033446
      }
    ],
    "client_id": "1c46a2e0-88b9-11ea-90ae-77c0364d6e36",
    "hardware_id": "0025ca0a00008612",
    "timestamp": 1603300033446,
    "application_id": "shipcomwireless",
    "device_type_id": "16a4d0c0-3edf-11e9-bf5e-a180edbfa9bb"
  },
  "company": {
    "id": 6854,
    "address": "10500 University Center Dr",
    "city": "Tampa",
    "country": "United States",
    "created_at": "2020-04-27T19:44:46Z",
    "industry": "[\"Health Care\"]",
    "latitude": 28.045853,
    "longitude": -82.421,
    "name": "Dermpath Diagnostics Bay Area",
    "state": "FL",
    "status": 0,
    "timezone": "America/New_York",
    "updated_at": "2020-04-27T19:44:46Z",
    "user_id": "ba5e224b-fcc2-4e1b-b2b3-97ef57b9ba60",
    "zip": "33612"
  },
  "location": {
    "id": 8138,
    "address": "10500 University Center Dr",
    "city": "Tampa",
    "country": "United States",
    "created_at": "2020-04-27T19:44:46Z",
    "industry": "[\"Health Care\"]",
    "latitude": 28.045853,
    "longitude": -82.421,
    "name": "Dermpath Diagnostics Bay Area",
    "state": "FL",
    "status": 0,
    "timezone": "America/New_York",
    "updated_at": "2020-10-19T18:22:19Z",
    "user_id": "ba5e224b-fcc2-4e1b-b2b3-97ef57b9ba60",
    "zip": "33612",
    "company_id": 6854
  },
  "device_type": {
    "id": "16a4d0c0-3edf-11e9-bf5e-a180edbfa9bb",
    "application_id": "",
    "category": "module",
    "codec": "lorawan.laird.rs1xx",
    "data_type": "",
    "description": "Temp Sensor",
    "manufacturer": "Laird",
    "model": "RS1xx",
    "name": "Laird Temp & Humidity RS1xx Sensor - IoT in a Box",
    "parent_constraint": "NOT_ALLOWED",
    "proxy_handler": "PrometheusClient",
    "subcategory": "lora",
    "transport_protocol": "lorawan",
    "version": "",
    "created_at": "2019-03-05T00:39:09Z",
    "updated_at": "2019-03-05T00:39:09Z"
  },
  "device": {
    "id": 217200,
    "thing_name": "Slide/Block Warehouse STE250",
    "created_at": "2020-04-27T19:47:58Z",
    "updated_at": "2020-04-28T12:58:08Z",
    "status": 0
  }
}

你可以尝试使用 fastavro 和 rec_avro 模块,这里有一些例子

from fastavro import writer, reader, schema
from rec_avro import to_rec_avro_destructive, from_rec_avro_destructive, rec_avro_schema

def json_objects():
    return [{'a': 'a'}, {'b':'b'}]

# For efficiency, to_rec_avro_destructive() destroys rec, and reuses it's
# data structures to construct avro_objects 
avroObjects = (to_rec_avro_destructive(rec) for rec in json_objects())

# store records in avro
with open('json_in_avro.avro', 'wb') as x:
    writer(x, schema.parse_schema(rec_avro_schema()), avroObjects)

#load records from avro
with open('json_in_avro.avro', 'rb') as f_in:
    # For efficiency, from_rec_avro_destructive(rec) destroys rec, and 
    # reuses it's data structures to construct it's output
    loadedJson = [from_rec_avro_destructive(rec) for rec in reader(f_in)]

assert loadedJson == json_objects()

更新:

有两个问题。第一个是在架构中。您当前拥有以下内容的任何地方:

      {
        "name": "created_at",
        "type": "int",
        "logicalType": "date"
      },

应该是这样的:

      {
        "name": "created_at",
        "type": {"type": "int", "logicalType": "date"}
      },

修复后,另一个问题出在您的数据中。读入JSON文件时,created_atupdated_at字段是字符串,但需要是datetime.date对象。您可以通过以下方式为他们每个人执行此操作:

from datetime import date
contents["company"]["created_at"] = date.fromisoformat(contents["company"]["created_at"].split("T")[0])

原创:

writer.append 函数不接受 JSON 文件的名称。相反,它接受 JSON 对象。所以而不是:

writer.append("laird.json") 

你会想要这样的东西:

with open("laird.json") as fp:
    contents = json.load(fp)

writer.append(contents)