python json to csv 逐行读取很慢

python json to csv Reading line by line is slow

我有一个很大的JSON文件,大约几十GB,我采用逐行读取文件的方式,但是写10M左右1分钟就慢了。 帮我修改一下代码

这是我的代码



import pandas as pd
import json

add_header = True

with open('1.json') as f_json:
    for line in f_json:
        line = line.strip()
        
        df = pd.json_normalize(json.loads(line))
        df.to_csv('1.csv', index=None, mode='a', header=add_header)
        add_header = False

我也试过用chunked reading,但是报错,代码:

import pandas as pd
import json


data = pd.read_json('G:\1.json',
                           encoding='utf8',lines=True,chunksize=100000)
for df in data:
    line = df.strip()
    df = pd.json_normalize(json.loads(line))
    df.to_csv('G:\1.csv', index=None, mode='a',encoding='utf8')

输出

    return object.__getattribute__(self, name)
AttributeError: 'DataFrame' object has no attribute 'strip'
Process finished with exit code -1

这是我的 JSON 文件

{"_index":"core-bvd-dmc","_type":"_doc","_id":"e22762d5c4b81fbcad62b5c1d77226ec","_score":1,"_source":{"a_id":"P305906272","a_id_type":"Contact ID","a_name":"Mr Chuanzong Chen","a_name_normal":"MR CHUANZONG CHEN","a_job_title":"Executive director and general manager","relationship":"Currently works for (Executive director and general manager)","b_id":"CN9390051924","b_id_type":"BVD ID","b_name":"Yantai haofeng trade co., ltd.","b_name_normal":"YANTAI HAOFENG TRADE CO","b_country_code":"CN","b_country":"China","b_in_compliance_db":false,"b_nationality":"CN","b_street_address":"Bei da jie 53hao 1609shi; Zhi fu qu","b_city":"Yantai","b_postcode":"264000","b_region":"East China|Shandong","b_phone":"+86 18354522200","b_email":"18354522200@163.com","b_latitude":37.511873,"b_longitude":121.396883,"b_geo_accuracy":"Community","b_national_ids":{"Unified social credit code":["91370602073035263P"],"Trade register number":["370602200112047"],"NOC":["073035263"]},"dates":{"date_of_birth":null},"file_name":"/media/hedwig/iforce/data/BvD/s3-transfer/SuperTable_v3_json/dmc/part-00020-7b09c546-2adc-413e-9e68-18b300e205cf-c000.json","b_geo_point":{"lat":37.511873,"lon":121.396883}}}
{"_index":"core-bvd-dmc","_type":"_doc","_id":"97871f8842398794e380a748f5b82ea5","_score":1,"_source":{"a_id":"P305888975","a_id_type":"Contact ID","a_name":"Mr Hengchao Jiang","a_name_normal":"MR HENGCHAO JIANG","a_job_title":"Legal representative","relationship":"Currently works for (Legal representative)","b_id":"CN9390053357","b_id_type":"BVD ID","b_name":"Yantai ji hong educate request information co., ltd.","b_name_normal":"YANTAI JI HONG EDUCATE REQUEST INFORMATION CO","b_country_code":"CN","b_country":"China","b_in_compliance_db":false,"b_nationality":"CN","b_street_address":"Ying chun da jie 131hao nei 1hao; Lai shan qu","b_city":"Yantai","b_postcode":"264000","b_region":"East China|Shandong","b_phone":"+86 18694982900","b_email":"xyw_700@163.com","b_latitude":37.511873,"b_longitude":121.396883,"b_geo_accuracy":"Community","b_national_ids":{"NOC":["597807789"],"Trade register number":["370613200023836"],"Unified social credit code":["913706135978077898"]},"dates":{"date_of_birth":null},"file_name":"/media/hedwig/iforce/data/BvD/s3-transfer/SuperTable_v3_json/dmc/part-00020-7b09c546-2adc-413e-9e68-18b300e205cf-c000.json","b_geo_point":{"lat":37.511873,"lon":121.396883}}}

分块读取代码,我设置块数=1

import pandas as pd

add_header = True
data = pd.read_json('G:\1.json',
                           encoding='utf8',lines=True,chunksize=1)
for subdf in data:
    # subdf is already a dataframe.
    temp_df = pd.concat([subdf[['_index', '_id', '_score']], pd.json_normalize(subdf._source)], axis=1)
    temp_df.to_csv('G:\1.csv', index=None, header=add_header,mode='a',encoding='utf8')
    add_header = False

输出结果,我转换了2行JSON文件,第一行转换正确,但是第二行JSON数据有错误,键'_index'的信息, '_id'、'_score'和"_source"中的信息是分开的

_index,_id,_score,a_id,a_id_type,a_name,a_name_normal,a_job_title,relationship,b_id,b_id_type,b_name,b_name_normal,b_country_code,b_country,b_in_compliance_db,b_nationality,b_street_address,b_city,b_postcode,b_region,b_phone,b_email,b_latitude,b_longitude,b_geo_accuracy,file_name,b_national_ids.Unified social credit code,b_national_ids.Trade register number,b_national_ids.NOC,dates.date_of_birth,b_geo_point.lat,b_geo_point.lon
core-bvd-dmc,e22762d5c4b81fbcad62b5c1d77226ec,1,P305906272,Contact ID,Mr Chuanzong Chen,MR CHUANZONG CHEN,Executive director and general manager,Currently works for (Executive director and general manager),CN9390051924,BVD ID,"Yantai haofeng trade co., ltd.",YANTAI HAOFENG TRADE CO,CN,China,False,CN,Bei da jie 53hao 1609shi; Zhi fu qu,Yantai,264000,East China|Shandong,+86 18354522200,18354522200@163.com,37.511873,121.396883,Community,/media/hedwig/iforce/data/BvD/s3-transfer/SuperTable_v3_json/dmc/part-00020-7b09c546-2adc-413e-9e68-18b300e205cf-c000.json,['91370602073035263P'],['370602200112047'],['073035263'],,37.511873,121.396883
,,,P305888975,Contact ID,Mr Hengchao Jiang,MR HENGCHAO JIANG,Legal representative,Currently works for (Legal representative),CN9390053357,BVD ID,"Yantai ji hong educate request information co., ltd.",YANTAI JI HONG EDUCATE REQUEST INFORMATION CO,CN,China,False,CN,Ying chun da jie 131hao nei 1hao; Lai shan qu,Yantai,264000,East China|Shandong,+86 18694982900,xyw_700@163.com,37.511873,121.396883,Community,/media/hedwig/iforce/data/BvD/s3-transfer/SuperTable_v3_json/dmc/part-00020-7b09c546-2adc-413e-9e68-18b300e205cf-c000.json,['597807789'],['370613200023836'],['913706135978077898'],,37.511873,121.396883
core-bvd-dmc,97871f8842398794e380a748f5b82ea5,1.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


chunksize 正在返回数据帧的迭代器,因此您不能对其执行 stripjson.loads

你可能需要

for subdf in data:
    # subdf is already a dataframe.  
    temp_df = pd.concat([
        subdf[['_index', '_id', '_score']].reset_index(drop=True), 
        pd.json_normalize(subdf._source)
    ], axis=1)
    temp_df.to_csv(filename, index=None, mode='a',encoding='utf8')

您可以将 pd.concat 行修改为 flatten/extract 您想要的数据,但我希望您能理解。

我的另一个想法是,虽然 csv 可以比 JSON 更好地保存大数据,但是您会考虑将输出 csv 分块到多个文件而不是创建一个巨大的 csv 吗?