将 JSON 个文件转换为 AVRO
Convert JSON file to AVRO
我是 AVRO 的新手 Python。我有一个用例,我想将 JSON 文件转换为 Avro 文件。我以 .avsc 格式存储了我的架构,以 .json 格式存储了 JSON 数据。现在我想把 JSON 文件和 .avsc 文件放在一起,并将 JSON 文件序列化到 Avro 中。下面是我的代码,我收到一个错误“avro.io.AvroTypeException:数据 file.json 不是架构的示例”。我不确定我做错了什么。在我的 writer.append 语句中,我希望脚本从 JSON 文件中获取数据并将其附加到 .avro 文件中,但采用序列化格式。我不确定如何处理这个问题,我们将不胜感激。
这是我的代码:
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from avro import schema, datafile, io
import json
from avro import schema, datafile, io
def json_to_avro():
fo = open("laird.txt", "r")
data = fo.readlines()
final_header = []
final_rec = []
for header in data[0:1]:
header = header.strip("\n")
header = header.split(",")
final_header = header
for rec in data[1:]:
rec = rec.strip("\n")
rec = rec.split(" ")
rec = ' '.join(rec).split()
final_rec = rec
final_dict = dict(zip(final_header,final_rec))
# print(final_dict)
json_dumps = json.dumps(final_dict, ensure_ascii=False)
# print(json_dumps)
schema = avro.schema.parse(open("laird.avsc", "rb").read())
# print(schema)
writer = DataFileWriter(open("laird.avro", "wb"), DatumWriter(), schema)
with open("laird.json") as fp:
contents = json.load(fp)
print(contents)
writer.append(contents)
writer.close()
json_to_avro()
laird.avsc 文件中的数据:
{
"name": "MyClass",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
"name": "event_type",
"type": "string"
},
{
"name": "event_data",
"type": {
"name": "event_data",
"type": "record",
"fields": [
{
"name": "device_id",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "payload",
"type": {
"type": "array",
"items": {
"name": "payload_record",
"type": "record",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "sensor_id",
"type": "string"
},
{
"name": "type",
"type": "string"
},
{
"name": "unit",
"type": "string"
},
{
"name": "value",
"type": "float"
},
{
"name": "channel",
"type": "int"
},
{
"name": "timestamp",
"type": "long"
}
]
}
}
},
{
"name": "client_id",
"type": "string"
},
{
"name": "hardware_id",
"type": "string"
},
{
"name": "timestamp",
"type": "long"
},
{
"name": "application_id",
"type": "string"
},
{
"name": "device_type_id",
"type": "string"
}
]
}
},
{
"name": "company",
"type": {
"name": "company",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "address",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "country",
"type": "string"
},
{
"name": "created_at",
"type": "int",
"logicalType": "date"
},
{
"name": "industry",
"type": "string"
},
{
"name": "latitude",
"type": "float"
},
{
"name": "longitude",
"type": "float"
},
{
"name": "name",
"type": "string"
},
{
"name": "state",
"type": "string"
},
{
"name": "status",
"type": "int"
},
{
"name": "timezone",
"type": "string"
},
{
"name": "updated_at",
"type": "int",
"logicalType": "date"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "zip",
"type": "string"
}
]
}
},
{
"name": "location",
"type": {
"name": "location",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "address",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "country",
"type": "string"
},
{
"name": "created_at",
"type": "int",
"logicalType": "date"
},
{
"name": "industry",
"type": "string"
},
{
"name": "latitude",
"type": "float"
},
{
"name": "longitude",
"type": "float"
},
{
"name": "name",
"type": "string"
},
{
"name": "state",
"type": "string"
},
{
"name": "status",
"type": "int"
},
{
"name": "timezone",
"type": "string"
},
{
"name": "updated_at",
"type": "int",
"logicalType": "date"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "zip",
"type": "string"
},
{
"name": "company_id",
"type": "int"
}
]
}
},
{
"name": "device_type",
"type": {
"name": "device_type",
"type": "record",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "application_id",
"type": "string"
},
{
"name": "category",
"type": "string"
},
{
"name": "codec",
"type": "string"
},
{
"name": "data_type",
"type": "string"
},
{
"name": "description",
"type": "string"
},
{
"name": "manufacturer",
"type": "string"
},
{
"name": "model",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "parent_constraint",
"type": "string"
},
{
"name": "proxy_handler",
"type": "string"
},
{
"name": "subcategory",
"type": "string"
},
{
"name": "transport_protocol",
"type": "string"
},
{
"name": "version",
"type": "string"
},
{
"name": "created_at",
"type": "int",
"logicalType": "date"
},
{
"name": "updated_at",
"type": "int",
"logicalType": "date"
}
]
}
},
{
"name": "device",
"type": {
"name": "device",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "thing_name",
"type": "string"
},
{
"name": "created_at",
"type": "int",
"logicalType": "date"
},
{
"name": "updated_at",
"type": "int",
"logicalType": "date"
},
{
"name": "status",
"type": "int"
}
]
}
}
]
}
laird.json
中的数据
{
"event_type": "uplink",
"event_data": {
"device_id": "fec5d310-88bf-11ea-aadf-2bd85a1038fa",
"user_id": "ba5e224b-fcc2-4e1b-b2b3-97ef57b9ba60",
"payload": [
{
"name": "Temperature",
"sensor_id": "fef87bd0-88bf-11ea-ae96-6fae9a4d5562",
"type": "temp",
"unit": "c",
"value": 21.9,
"channel": 3,
"timestamp": 1603300033446
},
{
"name": "Humidity",
"sensor_id": "fef399d0-88bf-11ea-a424-59b141c8d9bf",
"type": "rel_hum",
"unit": "p",
"value": 74,
"channel": 4,
"timestamp": 1603300033446
},
{
"name": "Battery",
"sensor_id": "feef2d00-88bf-11ea-aadf-2bd85a1038fa",
"type": "batt",
"unit": "p",
"value": 100,
"channel": 5,
"timestamp": 1603300033446
},
{
"name": "RSSI",
"sensor_id": "fef658f0-88bf-11ea-ae96-6fae9a4d5562",
"type": "rssi",
"unit": "dbm",
"value": -42,
"channel": 100,
"timestamp": 1603300033446
},
{
"name": "SNR",
"sensor_id": "",
"type": "snr",
"unit": "db",
"value": 10.2,
"channel": 101,
"timestamp": 1603300033446
}
],
"client_id": "1c46a2e0-88b9-11ea-90ae-77c0364d6e36",
"hardware_id": "0025ca0a00008612",
"timestamp": 1603300033446,
"application_id": "shipcomwireless",
"device_type_id": "16a4d0c0-3edf-11e9-bf5e-a180edbfa9bb"
},
"company": {
"id": 6854,
"address": "10500 University Center Dr",
"city": "Tampa",
"country": "United States",
"created_at": "2020-04-27T19:44:46Z",
"industry": "[\"Health Care\"]",
"latitude": 28.045853,
"longitude": -82.421,
"name": "Dermpath Diagnostics Bay Area",
"state": "FL",
"status": 0,
"timezone": "America/New_York",
"updated_at": "2020-04-27T19:44:46Z",
"user_id": "ba5e224b-fcc2-4e1b-b2b3-97ef57b9ba60",
"zip": "33612"
},
"location": {
"id": 8138,
"address": "10500 University Center Dr",
"city": "Tampa",
"country": "United States",
"created_at": "2020-04-27T19:44:46Z",
"industry": "[\"Health Care\"]",
"latitude": 28.045853,
"longitude": -82.421,
"name": "Dermpath Diagnostics Bay Area",
"state": "FL",
"status": 0,
"timezone": "America/New_York",
"updated_at": "2020-10-19T18:22:19Z",
"user_id": "ba5e224b-fcc2-4e1b-b2b3-97ef57b9ba60",
"zip": "33612",
"company_id": 6854
},
"device_type": {
"id": "16a4d0c0-3edf-11e9-bf5e-a180edbfa9bb",
"application_id": "",
"category": "module",
"codec": "lorawan.laird.rs1xx",
"data_type": "",
"description": "Temp Sensor",
"manufacturer": "Laird",
"model": "RS1xx",
"name": "Laird Temp & Humidity RS1xx Sensor - IoT in a Box",
"parent_constraint": "NOT_ALLOWED",
"proxy_handler": "PrometheusClient",
"subcategory": "lora",
"transport_protocol": "lorawan",
"version": "",
"created_at": "2019-03-05T00:39:09Z",
"updated_at": "2019-03-05T00:39:09Z"
},
"device": {
"id": 217200,
"thing_name": "Slide/Block Warehouse STE250",
"created_at": "2020-04-27T19:47:58Z",
"updated_at": "2020-04-28T12:58:08Z",
"status": 0
}
}
你可以尝试使用 fastavro 和 rec_avro 模块,这里有一些例子
from fastavro import writer, reader, schema
from rec_avro import to_rec_avro_destructive, from_rec_avro_destructive, rec_avro_schema
def json_objects():
return [{'a': 'a'}, {'b':'b'}]
# For efficiency, to_rec_avro_destructive() destroys rec, and reuses it's
# data structures to construct avro_objects
avroObjects = (to_rec_avro_destructive(rec) for rec in json_objects())
# store records in avro
with open('json_in_avro.avro', 'wb') as x:
writer(x, schema.parse_schema(rec_avro_schema()), avroObjects)
#load records from avro
with open('json_in_avro.avro', 'rb') as f_in:
# For efficiency, from_rec_avro_destructive(rec) destroys rec, and
# reuses it's data structures to construct it's output
loadedJson = [from_rec_avro_destructive(rec) for rec in reader(f_in)]
assert loadedJson == json_objects()
更新:
有两个问题。第一个是在架构中。您当前拥有以下内容的任何地方:
{
"name": "created_at",
"type": "int",
"logicalType": "date"
},
应该是这样的:
{
"name": "created_at",
"type": {"type": "int", "logicalType": "date"}
},
修复后,另一个问题出在您的数据中。读入JSON文件时,created_at
和updated_at
字段是字符串,但需要是datetime.date
对象。您可以通过以下方式为他们每个人执行此操作:
from datetime import date
contents["company"]["created_at"] = date.fromisoformat(contents["company"]["created_at"].split("T")[0])
原创:
writer.append
函数不接受 JSON 文件的名称。相反,它接受 JSON 对象。所以而不是:
writer.append("laird.json")
你会想要这样的东西:
with open("laird.json") as fp:
contents = json.load(fp)
writer.append(contents)
我是 AVRO 的新手 Python。我有一个用例,我想将 JSON 文件转换为 Avro 文件。我以 .avsc 格式存储了我的架构,以 .json 格式存储了 JSON 数据。现在我想把 JSON 文件和 .avsc 文件放在一起,并将 JSON 文件序列化到 Avro 中。下面是我的代码,我收到一个错误“avro.io.AvroTypeException:数据 file.json 不是架构的示例”。我不确定我做错了什么。在我的 writer.append 语句中,我希望脚本从 JSON 文件中获取数据并将其附加到 .avro 文件中,但采用序列化格式。我不确定如何处理这个问题,我们将不胜感激。
这是我的代码:
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from avro import schema, datafile, io
import json
from avro import schema, datafile, io
def json_to_avro():
fo = open("laird.txt", "r")
data = fo.readlines()
final_header = []
final_rec = []
for header in data[0:1]:
header = header.strip("\n")
header = header.split(",")
final_header = header
for rec in data[1:]:
rec = rec.strip("\n")
rec = rec.split(" ")
rec = ' '.join(rec).split()
final_rec = rec
final_dict = dict(zip(final_header,final_rec))
# print(final_dict)
json_dumps = json.dumps(final_dict, ensure_ascii=False)
# print(json_dumps)
schema = avro.schema.parse(open("laird.avsc", "rb").read())
# print(schema)
writer = DataFileWriter(open("laird.avro", "wb"), DatumWriter(), schema)
with open("laird.json") as fp:
contents = json.load(fp)
print(contents)
writer.append(contents)
writer.close()
json_to_avro()
laird.avsc 文件中的数据:
{
"name": "MyClass",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
"name": "event_type",
"type": "string"
},
{
"name": "event_data",
"type": {
"name": "event_data",
"type": "record",
"fields": [
{
"name": "device_id",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "payload",
"type": {
"type": "array",
"items": {
"name": "payload_record",
"type": "record",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "sensor_id",
"type": "string"
},
{
"name": "type",
"type": "string"
},
{
"name": "unit",
"type": "string"
},
{
"name": "value",
"type": "float"
},
{
"name": "channel",
"type": "int"
},
{
"name": "timestamp",
"type": "long"
}
]
}
}
},
{
"name": "client_id",
"type": "string"
},
{
"name": "hardware_id",
"type": "string"
},
{
"name": "timestamp",
"type": "long"
},
{
"name": "application_id",
"type": "string"
},
{
"name": "device_type_id",
"type": "string"
}
]
}
},
{
"name": "company",
"type": {
"name": "company",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "address",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "country",
"type": "string"
},
{
"name": "created_at",
"type": "int",
"logicalType": "date"
},
{
"name": "industry",
"type": "string"
},
{
"name": "latitude",
"type": "float"
},
{
"name": "longitude",
"type": "float"
},
{
"name": "name",
"type": "string"
},
{
"name": "state",
"type": "string"
},
{
"name": "status",
"type": "int"
},
{
"name": "timezone",
"type": "string"
},
{
"name": "updated_at",
"type": "int",
"logicalType": "date"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "zip",
"type": "string"
}
]
}
},
{
"name": "location",
"type": {
"name": "location",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "address",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "country",
"type": "string"
},
{
"name": "created_at",
"type": "int",
"logicalType": "date"
},
{
"name": "industry",
"type": "string"
},
{
"name": "latitude",
"type": "float"
},
{
"name": "longitude",
"type": "float"
},
{
"name": "name",
"type": "string"
},
{
"name": "state",
"type": "string"
},
{
"name": "status",
"type": "int"
},
{
"name": "timezone",
"type": "string"
},
{
"name": "updated_at",
"type": "int",
"logicalType": "date"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "zip",
"type": "string"
},
{
"name": "company_id",
"type": "int"
}
]
}
},
{
"name": "device_type",
"type": {
"name": "device_type",
"type": "record",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "application_id",
"type": "string"
},
{
"name": "category",
"type": "string"
},
{
"name": "codec",
"type": "string"
},
{
"name": "data_type",
"type": "string"
},
{
"name": "description",
"type": "string"
},
{
"name": "manufacturer",
"type": "string"
},
{
"name": "model",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "parent_constraint",
"type": "string"
},
{
"name": "proxy_handler",
"type": "string"
},
{
"name": "subcategory",
"type": "string"
},
{
"name": "transport_protocol",
"type": "string"
},
{
"name": "version",
"type": "string"
},
{
"name": "created_at",
"type": "int",
"logicalType": "date"
},
{
"name": "updated_at",
"type": "int",
"logicalType": "date"
}
]
}
},
{
"name": "device",
"type": {
"name": "device",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "thing_name",
"type": "string"
},
{
"name": "created_at",
"type": "int",
"logicalType": "date"
},
{
"name": "updated_at",
"type": "int",
"logicalType": "date"
},
{
"name": "status",
"type": "int"
}
]
}
}
]
}
laird.json
中的数据{
"event_type": "uplink",
"event_data": {
"device_id": "fec5d310-88bf-11ea-aadf-2bd85a1038fa",
"user_id": "ba5e224b-fcc2-4e1b-b2b3-97ef57b9ba60",
"payload": [
{
"name": "Temperature",
"sensor_id": "fef87bd0-88bf-11ea-ae96-6fae9a4d5562",
"type": "temp",
"unit": "c",
"value": 21.9,
"channel": 3,
"timestamp": 1603300033446
},
{
"name": "Humidity",
"sensor_id": "fef399d0-88bf-11ea-a424-59b141c8d9bf",
"type": "rel_hum",
"unit": "p",
"value": 74,
"channel": 4,
"timestamp": 1603300033446
},
{
"name": "Battery",
"sensor_id": "feef2d00-88bf-11ea-aadf-2bd85a1038fa",
"type": "batt",
"unit": "p",
"value": 100,
"channel": 5,
"timestamp": 1603300033446
},
{
"name": "RSSI",
"sensor_id": "fef658f0-88bf-11ea-ae96-6fae9a4d5562",
"type": "rssi",
"unit": "dbm",
"value": -42,
"channel": 100,
"timestamp": 1603300033446
},
{
"name": "SNR",
"sensor_id": "",
"type": "snr",
"unit": "db",
"value": 10.2,
"channel": 101,
"timestamp": 1603300033446
}
],
"client_id": "1c46a2e0-88b9-11ea-90ae-77c0364d6e36",
"hardware_id": "0025ca0a00008612",
"timestamp": 1603300033446,
"application_id": "shipcomwireless",
"device_type_id": "16a4d0c0-3edf-11e9-bf5e-a180edbfa9bb"
},
"company": {
"id": 6854,
"address": "10500 University Center Dr",
"city": "Tampa",
"country": "United States",
"created_at": "2020-04-27T19:44:46Z",
"industry": "[\"Health Care\"]",
"latitude": 28.045853,
"longitude": -82.421,
"name": "Dermpath Diagnostics Bay Area",
"state": "FL",
"status": 0,
"timezone": "America/New_York",
"updated_at": "2020-04-27T19:44:46Z",
"user_id": "ba5e224b-fcc2-4e1b-b2b3-97ef57b9ba60",
"zip": "33612"
},
"location": {
"id": 8138,
"address": "10500 University Center Dr",
"city": "Tampa",
"country": "United States",
"created_at": "2020-04-27T19:44:46Z",
"industry": "[\"Health Care\"]",
"latitude": 28.045853,
"longitude": -82.421,
"name": "Dermpath Diagnostics Bay Area",
"state": "FL",
"status": 0,
"timezone": "America/New_York",
"updated_at": "2020-10-19T18:22:19Z",
"user_id": "ba5e224b-fcc2-4e1b-b2b3-97ef57b9ba60",
"zip": "33612",
"company_id": 6854
},
"device_type": {
"id": "16a4d0c0-3edf-11e9-bf5e-a180edbfa9bb",
"application_id": "",
"category": "module",
"codec": "lorawan.laird.rs1xx",
"data_type": "",
"description": "Temp Sensor",
"manufacturer": "Laird",
"model": "RS1xx",
"name": "Laird Temp & Humidity RS1xx Sensor - IoT in a Box",
"parent_constraint": "NOT_ALLOWED",
"proxy_handler": "PrometheusClient",
"subcategory": "lora",
"transport_protocol": "lorawan",
"version": "",
"created_at": "2019-03-05T00:39:09Z",
"updated_at": "2019-03-05T00:39:09Z"
},
"device": {
"id": 217200,
"thing_name": "Slide/Block Warehouse STE250",
"created_at": "2020-04-27T19:47:58Z",
"updated_at": "2020-04-28T12:58:08Z",
"status": 0
}
}
你可以尝试使用 fastavro 和 rec_avro 模块,这里有一些例子
from fastavro import writer, reader, schema
from rec_avro import to_rec_avro_destructive, from_rec_avro_destructive, rec_avro_schema
def json_objects():
return [{'a': 'a'}, {'b':'b'}]
# For efficiency, to_rec_avro_destructive() destroys rec, and reuses it's
# data structures to construct avro_objects
avroObjects = (to_rec_avro_destructive(rec) for rec in json_objects())
# store records in avro
with open('json_in_avro.avro', 'wb') as x:
writer(x, schema.parse_schema(rec_avro_schema()), avroObjects)
#load records from avro
with open('json_in_avro.avro', 'rb') as f_in:
# For efficiency, from_rec_avro_destructive(rec) destroys rec, and
# reuses it's data structures to construct it's output
loadedJson = [from_rec_avro_destructive(rec) for rec in reader(f_in)]
assert loadedJson == json_objects()
更新:
有两个问题。第一个是在架构中。您当前拥有以下内容的任何地方:
{
"name": "created_at",
"type": "int",
"logicalType": "date"
},
应该是这样的:
{
"name": "created_at",
"type": {"type": "int", "logicalType": "date"}
},
修复后,另一个问题出在您的数据中。读入JSON文件时,created_at
和updated_at
字段是字符串,但需要是datetime.date
对象。您可以通过以下方式为他们每个人执行此操作:
from datetime import date
contents["company"]["created_at"] = date.fromisoformat(contents["company"]["created_at"].split("T")[0])
原创:
writer.append
函数不接受 JSON 文件的名称。相反,它接受 JSON 对象。所以而不是:
writer.append("laird.json")
你会想要这样的东西:
with open("laird.json") as fp:
contents = json.load(fp)
writer.append(contents)