通过元组名称将不同命名元组的列表传递给不同的 csv

Pass list of different named tuples to different csv by tuple name

我有一个程序,我希望在其中记录发生的所有重大变化。例如:每次变量x的值发生变化时,记录变化的时间和变化本身。在程序中有许多这样的更改,并且并非所有都具有相同数量的参数。

我决定使用 namedtuples 来存储每个更改实例,然后将这些 namedtuples 放入单个主数据列表中 - 准备导出到 csv。我使用了元组,因为它们当然是不可变的,这是保存记录的理想选择。下面我试图以尽可能简洁的方式解释我所做和尝试的事情。希望到目前为止我的问题和尝试是清楚的。

所以我有:

data = [] 

作为主要存储库,具有以下形式的命名元组:

a_tuple = namedtuple('x_change', ['Time', 'Change'])
another_tuple = namedtuple('y_change', ['Time', 'Change', 'id'])

然后,每次使用以下命令检测到数据更改时,我都可以附加这些命名元组的实例:

data.append(a_tuple(a_time, a_change))
data.append(another_tuple(a_time, a_change, an_id))

如果我然后打印出数据的内容,我将得到如下输出:

x_change(a_time=4, a_change=1)
y_change(a_time=5, a_change=3, an_id = 2)
y_change(a_time=7, a_change=1, an_id = 3)
x_change(a_time=8, a_change=3)

我想做的是按元组名称将这些元组导出到 csv 文件。所以在上面的例子中,我最终会得到两个形式的 csv 文件:

name, time, change
x_change, 4, 1
x_change, 8, 3

和;

name, time, change, id
y_change, 5, 3, 2
y_change, 7, 1, 3

到目前为止,我必须设法写入单个 csv,如下所示:

with open ('events.csv', 'w', newline='') as csvfile:
    output = csv.writer(csvfile, delimiter = ',')
    for row in data:
        output.writerow(row)

生成减去元组名称的输出。所以:

4, 1
5, 3, 2
7, 1, 3
8, 3

我也试过:

with open ('events.csv', 'w', newline='') as csvfile:
    output = csv.writer(csvfile, delimiter = ',')
    for row in data:
        output.writerow(str(row))

通过获取的每个字符(仅包括第一行)将文件拆分为 csv 格式,包括元组名称:

x, _, c, h, a, n, g, e, 4, 1

我已经搜索了一个解决方案,但没有找到适合我正在尝试做的事情的任何东西,现在我不知所措。如有任何帮助,我们将不胜感激。

查看 namedtuple 实例表示 - __repr__

>>>import namedtuple

>>>Row = namedtuple('Row', 'time, change')
>>>record = Row(4, 1)

...

>>>help(record)
class Row(builtins.tuple)
 |  Row(time, change)
 |
 |  Method resolution order:
 |      Row
 |      builtins.tuple
 |      builtins.object
 |
 |  Methods defined here:
 |
 |  __getnewargs__(self)
 |      Return self as a plain tuple.  Used by copy and pickle.
 |
 |  __getstate__(self)
 |      Exclude the OrderedDict from pickling
 |
 |  __repr__(self)
 |      Return a nicely formatted representation string
 |
 |  _asdict(self)
 |      Return a new OrderedDict which maps field names to their values.
 |
...


>>> record.__repr__()
'Row(time=4, change=1)'
>>> repr(record)
'Row(time=4, change=1)'

从那里您可能需要做一些解析,但这是一个很好的起点,因为存在 namedtuple 名称。

希望对您有所帮助。

将名称设为 namedtuple 中的一个字段可以让您的生活更轻松。

xChange = namedtuple('xChange', ['name', 'time' 'change'])
yChange = namedtuple('yChange', ['name', 'time', 'change', 'id'])

namedtuple 没有默认值,但您可以将 xChange 子类化,例如,创建一个将名称设置为 'x_change' in __init__

的命名元组

您需要完成两件事:

  1. 将类型的名称放入行中
  2. 为每个元组类型创建一个文件

对于#1,为return您真正想要的行创建一个函数。

def get_row_output(row):
    return [type(row).__name__] + list(row)

(命名元组最终会创建一个自定义类型,该类型以 namedtuple() 的第一个参数的值命名。该名称不会成为该类型实例数据的任何表示形式,因此你必须自己提取它。)

对于 #2,要以不同的文件结束,您需要循环遍历写出 CSV 的代码之外的数据。最简单的方法:

for row in data:
    with open(row.__name__ + '.csv', 'a+') as csvfile:
        output = csv.writer(csvfile, delimiter=',')
        output.writerow(get_row_output(row))

(注意在追加模式下打开文件,因为您在每一行打开和关闭文件。)

如果 data 非常大,那么最好保持文件句柄打开而不是打开和关闭每一行。类似于:

def get_filename(row):
    return row.__name__ + '.csv'

def write_changes(data):
    file_handles = {}
    csv_writers = {}

    for row in data:
        filename = get_filename(row)
        if filename not in file_handles:
            f = open(filename, 'wb')
            file_handles[filename] = f
            csv_writer = csv.writer(f, delimiter=',')
            csv_writers[filename] = csv_writer

        csv_writers[filename].writerow(get_row_output(row))

    for f in file_handles.values():
        f.close()

以下方法应该有效。这将获取包含所有命名元组的数据,并首先按元组的类型对其进行排序。然后它将所有条目分组并为每个条目创建一个 CSV 文件。每个 CSV 文件中的第一行包含字段名称:

from collections import namedtuple
from itertools import groupby
import csv

data = [] 

a_tuple = namedtuple('x_change', ['Time', 'Change'])
another_tuple = namedtuple('y_change', ['Time', 'Change', 'id'])

data.append(a_tuple(6, 1))
data.append(a_tuple(2, 1))
data.append(another_tuple(5, 3, 2))
data.append(another_tuple(7, 1, 3))
data.append(a_tuple(5, 2))

data.sort(key=lambda x: type(x).__name__)

for k, g in groupby(data, lambda x: type(x).__name__):
    with open('{}.csv'.format(k), 'w', newline='') as f_output:
        csv_output = csv.writer(f_output)
        rows = list(g)
        csv_output.writerow(['Name'] + list(rows[0]._fields))
        for row in rows:
            csv_output.writerow([type(row).__name__] + list(row))

对于我的数据,这将为您提供两个 CSV 文件,如下所示:

x_change.csv

Name,Time,Change
x_change,6,1
x_change,2,1
x_change,5,2

y_change.csv

Name,Time,Change,id
y_change,5,3,2
y_change,7,1,3