将多个 json 文件附加在一起并使用 Python 输出 1 个 Avro 文件

append multiple json files together and ouptut 1 Avro file using Python

我有一个用例,我需要附加多个 json 文件,然后将它们转换为 1 个 Avro 文件。我编写了下面的代码,将 json 文件附加在一起,然后将它们转换为 AVRO 文件。但是我遇到的问题是附加了 JSON 文件,但是主 JSON 包含在 [] 括号中,因此在将其转换为 AVRO 文件时出现错误。我想弄清楚如何从 JSON 文件的第一行和最后一行中删除 [] ?感谢任何帮助。

我得到的错误是(错误片段,错误太长无法粘贴:avro.io.AvroTypeException:数据[{'event_type':'uplink'..... }] 不是模式的例子

我的代码: Laird.py

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from avro import schema, datafile, io
import json
from datetime import date
import glob

data = []
for f in glob.glob("*.txt"):
    with open(f,) as infile:
        data.append(json.load(infile))

    # json.dumps(data)
with open("laird.json",'w') as outfile:

  json.dump(data, outfile)


def json_to_avro():
        fo = open("laird.json", "r")
        data = fo.readlines()
        final_header = []
        final_rec = []
        for header in data[0:1]:
            header = header.strip("\n")
            header = header.split(",")
            final_header = header
        for rec in data[1:]:
            rec = rec.strip("\n")
            rec = rec.split(" ")
            rec = ' '.join(rec).split()
            final_rec = rec
        final_dict = dict(zip(final_header,final_rec))
        # print(final_dict)
        json_dumps = json.dumps(final_dict, ensure_ascii=False)
        # print(json_dumps)
        schema = avro.schema.parse(open("laird.avsc", "rb").read())
        # print(schema)

        writer = DataFileWriter(open("laird.avro", "wb"), DatumWriter(), schema)

        with open("laird.json") as fp:
            contents = json.load(fp)
            
            print(contents)

        writer.append(contents)

        writer.close()

json_to_avro()


#Script to read/convert AVRO file to JSON
reader = DataFileReader(open("laird.avro", "rb"), DatumReader())
for user in reader:
    print(user)
reader.close()

架构:lair.avsc

{
  "name": "MyClass",
  "type": "record",
  "namespace": "com.acme.avro",
  "fields": [
    {
      "name": "event_type",
      "type": "string"
    },
    {
      "name": "event_data",
      "type": {
        "name": "event_data",
        "type": "record",
        "fields": [
          {
            "name": "device_id",
            "type": "string"
          },
          {
            "name": "user_id",
            "type": "string"
          },
          {
            "name": "payload",
            "type": {
              "type": "array",
              "items": {
                "name": "payload_record",
                "type": "record",
                "fields": [
                  {
                    "name": "name",
                    "type": "string"
                  },
                  {
                    "name": "sensor_id",
                    "type": "string"
                  },
                  {
                    "name": "type",
                    "type": "string"
                  },
                  {
                    "name": "unit",
                    "type": "string"
                  },
                  {
                    "name": "value",
                    "type": "float"
                  },
                  {
                    "name": "channel",
                    "type": "int"
                  },
                  {
                    "name": "timestamp",
                    "type": "long"
                  }
                ]
              }
            }
          },
          {
            "name": "client_id",
            "type": "string"
          },
          {
            "name": "hardware_id",
            "type": "string"
          },
          {
            "name": "timestamp",
            "type": "long"
          },
          {
            "name": "application_id",
            "type": "string"
          },
          {
            "name": "device_type_id",
            "type": "string"
          }
        ]
      }
    },
    {
      "name": "company",
      "type": {
        "name": "company",
        "type": "record",
        "fields": [
          {
            "name": "id",
            "type": "int"
          },
          {
            "name": "address",
            "type": "string"
          },
          {
            "name": "city",
            "type": "string"
          },
          {
            "name": "country",
            "type": "string"
          },
          {

            "name": "created_at",
            "type": "string"
          },
          {
            "name": "industry",
            "type": "string"
          },
          {
            "name": "latitude",
            "type": "float"
          },
          {
            "name": "longitude",
            "type": "float"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "state",
            "type": "string"
          },
          {
            "name": "status",
            "type": "int"
          },
          {
            "name": "timezone",
            "type": "string"
          },
          {
            "name": "updated_at",
            "type": "string"
          },
          {
            "name": "user_id",
            "type": "string"
          },
          {
            "name": "zip",
            "type": "string"
          }
        ]
      }
    },
    {
      "name": "location",
      "type": {
        "name": "location",
        "type": "record",
        "fields": [
          {
            "name": "id",
            "type": "int"
          },
          {
            "name": "address",
            "type": "string"
          },
          {
            "name": "city",
            "type": "string"
          },
          {
            "name": "country",
            "type": "string"
          },
          {
            "name": "created_at",
            "type": "string"
          },
          {
            "name": "industry",
            "type": "string"
          },
          {
            "name": "latitude",
            "type": "float"
          },
          {
            "name": "longitude",
            "type": "float"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "state",
            "type": "string"
          },
          {
            "name": "status",
            "type": "int"
          },
          {
            "name": "timezone",
            "type": "string"
          },
          {
            "name": "updated_at",
            "type": "string"
          },
          {
            "name": "user_id",
            "type": "string"
          },
          {
            "name": "zip",
            "type": "string"
          },
          {
            "name": "company_id",
            "type": "int"
          }
        ]
      }
    },
    {
      "name": "device_type",
      "type": {
        "name": "device_type",
        "type": "record",
        "fields": [
          {
            "name": "id",
            "type": "string"
          },
          {
            "name": "application_id",
            "type": "string"
          },
          {
            "name": "category",
            "type": "string"
          },
          {
            "name": "codec",
            "type": "string"
          },
          {
            "name": "data_type",
            "type": "string"
          },
          {
            "name": "description",
            "type": "string"
          },
          {
            "name": "manufacturer",
            "type": "string"
          },
          {
            "name": "model",
            "type": "string"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "parent_constraint",
            "type": "string"
          },
          {
            "name": "proxy_handler",
            "type": "string"
          },
          {
            "name": "subcategory",
            "type": "string"
          },
          {
            "name": "transport_protocol",
            "type": "string"
          },
          {
            "name": "version",
            "type": "string"
          },
          {
            "name": "created_at",
            "type": "string"
          },
          {
            "name": "updated_at",
            "type": "string"
          }
        ]
      }
    },
    {
      "name": "device",
      "type": {
        "name": "device",
        "type": "record",
        "fields": [
          {
            "name": "id",
            "type": "int"
          },
          {
            "name": "thing_name",
            "type": "string"
          },
          {
            "name": "created_at",
            "type": "string"
          },
          {
            "name": "updated_at",
            "type": "string"
          },
          {
            "name": "status",
            "type": "int"
          }
        ]
      }
    }
  ]
}

已生成 JSON 文件:laird.json

[{"event_type": "uplink", "event_data": {"device_id": "42934500-fcfb-11ea-9f13-d1d0271289a6", "user_id": "a5d78945-9f24-48a1-9107-5bee62bf007a", "payload": [{"name": "Humidity", "sensor_id": "42abaf00-fcfb-11ea-9c71-c517ac227ea5", "type": "rel_hum", "unit": "p", "value": 94.29, "channel": 4, "timestamp": 1605007797789}, {"name": "Temperature", "sensor_id": "42b0df20-fcfb-11ea-bf5c-d11ce3dbc1cb", "type": "temp", "unit": "c", "value": 21.64, "channel": 3, "timestamp": 1605007797789}, {"name": "Battery", "sensor_id": "42a98c20-fcfb-11ea-b4dd-cd2887a335f7", "type": "batt", "unit": "p", "value": 100, "channel": 5, "timestamp": 1605007797789}, {"name": "Local Backup", "sensor_id": "42b01bd0-fcfb-11ea-9f13-d1d0271289a6", "type": "digital_sensor", "unit": "d", "value": 1, "channel": 400, "timestamp": 1605007797789}, {"name": "RSSI", "sensor_id": "42b39e40-fcfb-11ea-bf5c-d11ce3dbc1cb", "type": "rssi", "unit": "dbm", "value": -53, "channel": 100, "timestamp": 1605007797789}, {"name": "SNR", "sensor_id": "", "type": "snr", "unit": "db", "value": 10.2, "channel": 101, "timestamp": 1605007797789}], "client_id": "b8468c50-baf0-11ea-a5e9-89c3b09de43a", "hardware_id": "0025ca0a0000e232", "timestamp": 1605007797789, "application_id": "shipcomwireless", "device_type_id": "70776630-e15e-11ea-a8c9-05cd631755a5"}, "company": {"id": 7696, "address": "9240 Kirby Dr", "city": "Houston", "country": "United States", "created_at": "2020-09-11T18:44:50Z", "industry": "[\"Health Care\"]", "latitude": 29.671324, "longitude": -95.415535, "name": "Harris Health System - Production", "state": "TX", "status": 0, "timezone": "America/Chicago", "updated_at": "2020-09-15T03:34:58Z", "user_id": "a5d78945-9f24-48a1-9107-5bee62bf007a", "zip": "77054"}, "location": {"id": 9153, "address": "9240 Kirby Dr", "city": "Houston", "country": "United States", "created_at": "2020-09-18T02:08:03Z", "industry": "[\"Health Care\"]", "latitude": 29.671324, "longitude": -95.415535, "name": "HHS Van Sensors", "state": "TX", "status": 0, "timezone": "America/Chicago", "updated_at": "2020-09-18T02:08:03Z", "user_id": "a5d78945-9f24-48a1-9107-5bee62bf007a", "zip": "77054", "company_id": 7696}, "device_type": {"id": "70776630-e15e-11ea-a8c9-05cd631755a5", "application_id": "", "category": "module", "codec": "lorawan.laird.rs1xx-backup", "data_type": "", "description": "Temp Sensor", "manufacturer": "Laird", "model": "RS1xx", "name": "Laird Temp & Humidity with Local Backup", "parent_constraint": "NOT_ALLOWED", "proxy_handler": "PrometheusClient", "subcategory": "lora", "transport_protocol": "lorawan", "version": "", "created_at": "2020-08-18T14:23:51Z", "updated_at": "2020-08-18T18:16:37Z"}, "device": {"id": 269231, "thing_name": "Van 18-1775 (Ambient)", "created_at": "2020-09-22T17:44:27Z", "updated_at": "2020-09-25T22:39:57Z", "status": 0}}, {"event_type": "uplink", "event_data": {"device_id": "7de32cf0-f9d2-11ea-b4dd-cd2887a335f7", "user_id": "a5d78945-9f24-48a1-9107-5bee62bf007a", "payload": [{"name": "Humidity", "sensor_id": "7dfbbe00-f9d2-11ea-9c71-c517ac227ea5", "type": "rel_hum", "unit": "p", "value": 0, "channel": 4, "timestamp": 1604697684139}, {"name": "Temperature", "sensor_id": "7dfb48d0-f9d2-11ea-9c71-c517ac227ea5", "type": "temp", "unit": "c", "value": -27.22, "channel": 3, "timestamp": 1604697684139}, {"name": "Battery", "sensor_id": "7dfa5e70-f9d2-11ea-bf5c-d11ce3dbc1cb", "type": "batt", "unit": "p", "value": 100, "channel": 5, "timestamp": 1604697684139}, {"name": "Local Backup", "sensor_id": "7dfb96f0-f9d2-11ea-b4dd-cd2887a335f7", "type": "digital_sensor", "unit": "d", "value": 1, "channel": 400, "timestamp": 1604697684139}, {"name": "RSSI", "sensor_id": "7dfc5a40-f9d2-11ea-b4dd-cd2887a335f7", "type": "rssi", "unit": "dbm", "value": -7, "channel": 100, "timestamp": 1604697684139}, {"name": "SNR", "sensor_id": "", "type": "snr", "unit": "db", "value": 10, "channel": 101, "timestamp": 1604697684139}], "client_id": "b8468c50-baf0-11ea-a5e9-89c3b09de43a", "hardware_id": "0025ca0a0000be6a", "timestamp": 1604697684139, "application_id": "shipcomwireless", "device_type_id": "70776630-e15e-11ea-a8c9-05cd631755a5"}, "company": {"id": 7696, "address": "9240 Kirby Dr", "city": "Houston", "country": "United States", "created_at": "2020-09-11T18:44:50Z", "industry": "[\"Health Care\"]", "latitude": 29.671324, "longitude": -95.415535, "name": "Harris Health System - Production", "state": "TX", "status": 0, "timezone": "America/Chicago", "updated_at": "2020-09-15T03:34:58Z", "user_id": "a5d78945-9f24-48a1-9107-5bee62bf007a", "zip": "77054"}, "location": {"id": 9080, "address": "9240 Kirby Dr", "city": "Houston", "country": "United States", "created_at": "2020-09-11T18:46:07Z", "industry": "[\"Health Care\"]", "latitude": 29.671324, "longitude": -95.415535, "name": "HHS Cooler Sensors", "state": "TX", "status": 0, "timezone": "America/Chicago", "updated_at": "2020-09-18T14:17:28Z", "user_id": "a5d78945-9f24-48a1-9107-5bee62bf007a", "zip": "77054", "company_id": 7696}, "device_type": {"id": "70776630-e15e-11ea-a8c9-05cd631755a5", "application_id": "", "category": "module", "codec": "lorawan.laird.rs1xx-backup", "data_type": "", "description": "Temp Sensor", "manufacturer": "Laird", "model": "RS1xx", "name": "Laird Temp & Humidity with Local Backup", "parent_constraint": "NOT_ALLOWED", "proxy_handler": "PrometheusClient", "subcategory": "lora", "transport_protocol": "lorawan", "version": "", "created_at": "2020-08-18T14:23:51Z", "updated_at": "2020-08-18T18:16:37Z"}, "device": {"id": 268369, "thing_name": "Cooler F-0201-AH", "created_at": "2020-09-18T17:15:04Z", "updated_at": "2020-09-25T22:39:57Z", "status": 0}}, {"event_type": "uplink", "event_data": {"device_id": "1c5c66f0-fcfb-11ea-8ae3-2ffdc909c57b", "user_id": "a5d78945-9f24-48a1-9107-5bee62bf007a", "payload": [{"name": "Humidity", "sensor_id": "1c7a4f30-fcfb-11ea-8ae3-2ffdc909c57b", "type": "rel_hum", "unit": "p", "value": 81.22, "channel": 4, "timestamp": 1605148608302}, {"name": "Temperature", "sensor_id": "1c793dc0-fcfb-11ea-bf5c-d11ce3dbc1cb", "type": "temp", "unit": "c", "value": 24.47, "channel": 3, "timestamp": 1605148608302}, {"name": "Battery", "sensor_id": "1c76a5b0-fcfb-11ea-bf5c-d11ce3dbc1cb", "type": "batt", "unit": "p", "value": 100, "channel": 5, "timestamp": 1605148608302}, {"name": "Local Backup", "sensor_id": "1c73e690-fcfb-11ea-9c71-c517ac227ea5", "type": "digital_sensor", "unit": "d", "value": 1, "channel": 400, "timestamp": 1605148608302}, {"name": "RSSI", "sensor_id": "1c780540-fcfb-11ea-b4dd-cd2887a335f7", "type": "rssi", "unit": "dbm", "value": -14, "channel": 100, "timestamp": 1605148608302}, {"name": "SNR", "sensor_id": "", "type": "snr", "unit": "db", "value": 8.8, "channel": 101, "timestamp": 1605148608302}], "client_id": "b8468c50-baf0-11ea-a5e9-89c3b09de43a", "hardware_id": "0025ca0a0000e1e3", "timestamp": 1605148608302, "application_id": "shipcomwireless", "device_type_id": "70776630-e15e-11ea-a8c9-05cd631755a5"}, "company": {"id": 7696, "address": "9240 Kirby Dr", "city": "Houston", "country": "United States", "created_at": "2020-09-11T18:44:50Z", "industry": "[\"Health Care\"]", "latitude": 29.671324, "longitude": -95.415535, "name": "Harris Health System - Production", "state": "TX", "status": 0, "timezone": "America/Chicago", "updated_at": "2020-09-15T03:34:58Z", "user_id": "a5d78945-9f24-48a1-9107-5bee62bf007a", "zip": "77054"}, "location": {"id": 9153, "address": "9240 Kirby Dr", "city": "Houston", "country": "United States", "created_at": "2020-09-18T02:08:03Z", "industry": "[\"Health Care\"]", "latitude": 29.671324, "longitude": -95.415535, "name": "HHS Van Sensors", "state": "TX", "status": 0, "timezone": "America/Chicago", "updated_at": "2020-09-18T02:08:03Z", "user_id": "a5d78945-9f24-48a1-9107-5bee62bf007a", "zip": "77054", "company_id": 7696}, "device_type": {"id": "70776630-e15e-11ea-a8c9-05cd631755a5", "application_id": "", "category": "module", "codec": "lorawan.laird.rs1xx-backup", "data_type": "", "description": "Temp Sensor", "manufacturer": "Laird", "model": "RS1xx", "name": "Laird Temp & Humidity with Local Backup", "parent_constraint": "NOT_ALLOWED", "proxy_handler": "PrometheusClient", "subcategory": "lora", "transport_protocol": "lorawan", "version": "", "created_at": "2020-08-18T14:23:51Z", "updated_at": "2020-08-18T18:16:37Z"}, "device": {"id": 269213, "thing_name": "Van 19-1800 (Ambient)", "created_at": "2020-09-22T17:43:23Z", "updated_at": "2020-09-25T22:39:56Z", "status": 0}}, {"event_type": "uplink", "event_data": {"device_id": "851fd480-f70e-11ea-9f13-d1d0271289a6", "user_id": "a5d78945-9f24-48a1-9107-5bee62bf007a", "payload": [{"name": "Humidity", "sensor_id": "85411820-f70e-11ea-8ae3-2ffdc909c57b", "type": "rel_hum", "unit": "p", "value": 49.52, "channel": 4, "timestamp": 1604558153188}, {"name": "Temperature", "sensor_id": "853f9180-f70e-11ea-9f13-d1d0271289a6", "type": "temp", "unit": "c", "value": 20.52, "channel": 3, "timestamp": 1604558153188}, {"name": "Battery", "sensor_id": "85429ec0-f70e-11ea-9621-a51b22d5dc1d", "type": "batt", "unit": "p", "value": 100, "channel": 5, "timestamp": 1604558153188}, {"name": "Local Backup", "sensor_id": "853f4360-f70e-11ea-9f13-d1d0271289a6", "type": "digital_sensor", "unit": "d", "value": 1, "channel": 400, "timestamp": 1604558153188}, {"name": "RSSI", "sensor_id": "8543b030-f70e-11ea-8ae3-2ffdc909c57b", "type": "rssi", "unit": "dbm", "value": -91, "channel": 100, "timestamp": 1604558153188}, {"name": "SNR", "sensor_id": "", "type": "snr", "unit": "db", "value": 8.5, "channel": 101, "timestamp": 1604558153188}], "client_id": "b8468c50-baf0-11ea-a5e9-89c3b09de43a", "hardware_id": "0025ca0a0000be5b", "timestamp": 1604558153188, "application_id": "shipcomwireless", "device_type_id": "70776630-e15e-11ea-a8c9-05cd631755a5"}, "company": {"id": 7696, "address": "9240 Kirby Dr", "city": "Houston", "country": "United States", "created_at": "2020-09-11T18:44:50Z", "industry": "[\"Health Care\"]", "latitude": 29.671324, "longitude": -95.415535, "name": "Harris Health System - Production", "state": "TX", "status": 0, "timezone": "America/Chicago", "updated_at": "2020-09-15T03:34:58Z", "user_id": "a5d78945-9f24-48a1-9107-5bee62bf007a", "zip": "77054"}, "location": {"id": 9080, "address": "9240 Kirby Dr", "city": "Houston", "country": "United States", "created_at": "2020-09-11T18:46:07Z", "industry": "[\"Health Care\"]", "latitude": 29.671324, "longitude": -95.415535, "name": "HHS Cooler Sensors", "state": "TX", "status": 0, "timezone": "America/Chicago", "updated_at": "2020-09-18T14:17:28Z", "user_id": "a5d78945-9f24-48a1-9107-5bee62bf007a", "zip": "77054", "company_id": 7696}, "device_type": {"id": "70776630-e15e-11ea-a8c9-05cd631755a5", "application_id": "", "category": "module", "codec": "lorawan.laird.rs1xx-backup", "data_type": "", "description": "Temp Sensor", "manufacturer": "Laird", "model": "RS1xx", "name": "Laird Temp & Humidity with Local Backup", "parent_constraint": "NOT_ALLOWED", "proxy_handler": "PrometheusClient", "subcategory": "lora", "transport_protocol": "lorawan", "version": "", "created_at": "2020-08-18T14:23:51Z", "updated_at": "2020-08-18T18:16:37Z"}, "device": {"id": 265040, "thing_name": "Cooler R-0306-PHAR", "created_at": "2020-09-15T04:47:12Z", "updated_at": "2020-09-25T22:39:54Z", "status": 0}}]

contents 是记录列表,但 writer.append 需要一条记录,因此您遍历记录并逐条追加。

你只需要改变:

writer.append(contents)

至:

for record in contents:
    writer.append(record)