python csv,只写一次headers

python csv, writing headers only once

所以我有一个程序可以从 .Json.

创建 CSV

首先我加载 json 文件。

f = open('Data.json')
data = json.load(f)
f.close()

然后我通过它,寻找特定的关键字,如果我找到那个关键字。我会在 .csv 文件中写下与此相关的所有内容。

for item in data:
    if "light" in item:
       write_light_csv('light.csv', item)

这是我的 write_light_csv 功能:

def write_light_csv(filename,dic):

    with open (filename,'a') as csvfile:
        headers = ['TimeStamp', 'light','Proximity']
        writer = csv.DictWriter(csvfile, delimiter=',', lineterminator='\n',fieldnames=headers)

        writer.writeheader()

        writer.writerow({'TimeStamp': dic['ts'], 'light' : dic['light'],'Proximity' : dic['prox']})

我最初将 wb+ 作为模式,但每次打开文件进行写入时都会清除所有内容。我用 a 替换了它,现在每次写入时,它都会添加一个 header。我如何确保 header 只写一次?

您可以检查文件是否已经存在,然后不要调用 writeheader(),因为您正在使用追加选项打开文件。

类似的东西:

import os.path


file_exists = os.path.isfile(filename)

with open (filename, 'a') as csvfile:
    headers = ['TimeStamp', 'light', 'Proximity']
    writer = csv.DictWriter(csvfile, delimiter=',', lineterminator='\n',fieldnames=headers)

    if not file_exists:
        writer.writeheader()  # file doesn't exist yet, write a header

    writer.writerow({'TimeStamp': dic['ts'], 'light': dic['light'], 'Proximity': dic['prox']})

能否更改代码结构并一次性导出整个文件?

def write_light_csv(filename, data):
    with open (filename, 'w') as csvfile:
        headers = ['TimeStamp', 'light','Proximity']
        writer = csv.DictWriter(csvfile, delimiter=',', lineterminator='\n',fieldnames=headers)

        writer.writeheader()

        for item in data:
            if "light" in item:
                writer.writerow({'TimeStamp': item['ts'], 'light' : item['light'],'Proximity' : item['prox']})


write_light_csv('light.csv', data)

我会在写 headers 之前使用一些 flag 和 运行 检查!例如

flag=0
def get_data(lst):
    for i in lst:#say list of url
        global flag
        respons = requests.get(i)
        respons= respons.content.encode('utf-8')
        respons=respons.replace('\','')
        print respons
        data = json.loads(respons)
        fl = codecs.open(r"C:\Users\TEST\Desktop\data1.txt",'ab',encoding='utf-8')
        writer = csv.DictWriter(fl,data.keys())
        if flag==0:
            writer.writeheader()
        writer.writerow(data)
        flag+=1
        print "You have written % times"%(str(flag))
    fl.close()
get_data(urls)

您可以检查文件是否为空

import csv
import os

headers = ['head1', 'head2']

for row in interator:
    with open('file.csv', 'a') as f:
        file_is_empty = os.stat('file.csv').st_size == 0
        writer = csv.writer(f, lineterminator='\n')
        if file_is_empty:
            writer.writerow(headers)
        writer.writerow(row)

另一种方式:

with open(file_path, 'a') as file:
        w = csv.DictWriter(file, my_dict.keys())

        if file.tell() == 0:
            w.writeheader()

        w.writerow(my_dict)

您可以使用 csv.Sniffer Class 和

with open('my.csv', newline='') as csvfile:
    if csv.Sniffer().has_header(csvfile.read(1024))
    # skip writing headers

同时使用Pandas:用于将Dataframe数据存储到CSV文件) 如果您使用索引迭代 API 调用以在 CSV 文件中添加数据,只需在设置 header 属性 之前添加此检查。

if i > 0:
        dataset.to_csv('file_name.csv',index=False, mode='a', header=False)
    else:
        dataset.to_csv('file_name.csv',index=False, mode='a', header=True)

这是另一个仅依赖于 Python 的内置 csv 包的示例。此方法检查 header 是否符合预期,否则会引发错误。它还通过写入 header 来处理文件不存在或确实存在但为空的情况。希望这会有所帮助:

import csv
import os


def append_to_csv(path, fieldnames, rows):
    is_write_header = not os.path.exists(path) or _is_empty_file(path)
    if not is_write_header:
        _assert_field_names_match(path, fieldnames)

    _append_to_csv(path, fieldnames, rows, is_write_header)


def _is_empty_file(path):
    return os.stat(path).st_size == 0


def _assert_field_names_match(path, fieldnames):
    with open(path, 'r') as f:
        reader = csv.reader(f)
        header = next(reader)
        if header != fieldnames:
            raise ValueError(f'Incompatible header: expected {fieldnames}, '
                             f'but existing file has {header}')


def _append_to_csv(path, fieldnames, rows, is_write_header: bool):
    with open(path, 'a') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        if is_write_header:
            writer.writeheader()
        writer.writerows(rows)

您可以使用以下代码进行测试:


file_ = 'countries.csv'
fieldnames_ = ['name', 'area', 'country_code2', 'country_code3']
rows_ = [
    {'name': 'Albania', 'area': 28748, 'country_code2': 'AL', 'country_code3': 'ALB'},
    {'name': 'Algeria', 'area': 2381741, 'country_code2': 'DZ', 'country_code3': 'DZA'},
    {'name': 'American Samoa', 'area': 199, 'country_code2': 'AS', 'country_code3': 'ASM'}
]

append_to_csv(file_, fieldnames_, rows_)

如果你 运行 这一次你在 countries.csv 中得到以下内容:

name,area,country_code2,country_code3
Albania,28748,AL,ALB
Algeria,2381741,DZ,DZA
American Samoa,199,AS,ASM

如果你 运行 它两次你会得到以下结果(注意,没有第二个 header):

name,area,country_code2,country_code3
Albania,28748,AL,ALB
Algeria,2381741,DZ,DZA
American Samoa,199,AS,ASM
Albania,28748,AL,ALB
Algeria,2381741,DZ,DZA
American Samoa,199,AS,ASM

如果您随后更改 countries.csv 中的 header 并再次 运行 程序,您将得到一个值错误,如下所示:

ValueError: Incompatible header: expected ['name', 'area', 'country_code2', 'country_code3'], but existing file has ['not', 'right', 'fieldnames']