Dask 压平字典列
Dask to Flatten Dictionary Column
我是 Dask 的新手,我正在寻找一种方法来展平 PANDAS 数据框中的字典列。这是 1600 万行数据框第一行的屏幕截图:
这里是三行文本的示例:
{{u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'DEBRA MEALY', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'CHAIR PERSON', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X'}, {u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'HELEN GORDON', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'VICE CHAIR', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X'}, {'F9_07_PC_HIGH_COMP_EMPLOYEE': 'X', 'F9_07_PZ_DIRTRSTKEY_NAME': 'ROB S KHANUJA', 'F9_07_PZ_COMP_OTHER': '14902', 'F9_07_PZ_COMP_RELATED': '0', 'F9_07_PZ_TITLE': 'EXEC. DIR. OPERATIONS', 'F9_07_PZ_AVE_HOURS_WEEK': '40.00', 'F9_07_PZ_COMP_DIRECT': '133173'}}
我通常会使用以下代码展平 Form990PartVIISectionAGrp 列:
df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].swifter.apply(pd.Series)], axis=1)
我想在 Dask 中执行此操作,但出现以下错误:"ValueError: The columns in the computed data do not match the columns in the provided metadata."
我正在使用 Python 2.7。我导入相关包
from dask import dataframe as dd
from dask.multiprocessing import get
from multiprocessing import cpu_count
nCores = cpu_count()
为了测试代码,我创建了一个随机数据样本:
dfs = df.sample(1000)
然后生成Dask数据帧:
ddf = dd.from_pandas(dfs, npartitions=nCores)
该列当前为字符串格式,因此我将其转换为字典。通常,我会写一行代码:
dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval)
但我在这里尝试以更 'Dask-like' 的形式进行,因此我编写了以下函数然后应用它:
def make_dict(dfs):
dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval)
return dfs
ddf_out = ddf.map_partitions(make_dict, meta=dfs[:0]).compute()
这行得通——它 returns 一个 PANDAS 数据框,其中 Form990PartVIISectionAGrp 列是字典格式(但是,它并不比非 Dask 应用程序快)。
然后我重新创建了 Dask DF:
ddf = dd.from_pandas(ddf_out, npartitions=nCores)
并写一个函数来压平列:
def flatten(ddf_out):
ddf_out = pd.concat([ddf_out.drop(['Form990PartVIISectionAGrp'], axis=1), ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
#ddf_out = ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)
return ddf_out
如果我然后运行这个代码:
result = ddf.map_partitions(flatten)
我得到以下输出,其中列没有被展平:
我也遇到了关于缺少元数据的错误,并且考虑到以上内容对解析字典列没有帮助,所以我创建了一个列列表,该列列表由 Python 平整列和用它来创建列和数据类型的字典:
metadir = {u'BusinessName': 'O', u'F9_07_PC_FORMER': 'O', u'F9_07_PC_HIGH_COMP_EMPLOYEE': 'O',
u'F9_07_PC_KEY_EMPLOYEE': 'O', u'F9_07_PC_OFFICER': 'O',
u'F9_07_PC_TRUSTEE_INDIVIDUAL': 'O', u'F9_07_PC_TRUSTEE_INSTITUTIONAL': 'O',
u'F9_07_PZ_AVE_HOURS_WEEK': 'O', u'F9_07_PZ_AVE_HOURS_WEEK_RELATED': 'O',
u'F9_07_PZ_COMP_DIRECT': 'O', u'F9_07_PZ_COMP_OTHER': 'O',
u'F9_07_PZ_COMP_RELATED': 'O', u'F9_07_PZ_DIRTRSTKEY_NAME': 'O',
u'F9_07_PZ_TITLE': 'O', u'NameBusiness': 'O', u'URL': 'O'}
然后我用这个元数据应用展平函数:
result = ddf.map_partitions(flatten, meta=metadir)
我得到以下输出结果:
运行 result.columns 产生这个:
失败的地方是 运行ning compute(),在那里我收到以下错误消息:"ValueError: The columns in the computed data do not match the columns in the provided metadata." 无论我写什么,我都会收到同样的错误:
result.compute()
或
result.compute(meta=metadir)
我不确定我做错了什么。 result 中的列似乎与 metadir 中的列匹配。任何建议将不胜感激。
更新:
这是我更新扁平化功能的尝试。
meta = pd.DataFrame(columns=['URL', 'F9_07_PC_TRUSTEE_INDIVIDUAL',
'F9_07_PZ_DIRTRSTKEY_NAME',
'F9_07_PZ_COMP_OTHER',
'F9_07_PZ_COMP_RELATED',
'F9_07_PZ_TITLE',
'F9_07_PZ_AVE_HOURS_WEEK',
'F9_07_PZ_COMP_DIRECT',
'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
'F9_07_PC_OFFICER',
'F9_07_PC_HIGH_COMP_EMPLOYEE',
'BusinessName',
'F9_07_PC_KEY_EMPLOYEE',
'F9_07_PC_TRUSTEE_INSTITUTIONAL',
'NameBusiness',
'F9_07_PC_FORMER'], dtype="O")
def flatten(ddf_out):
ddf_out = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
for m in meta:
if m not in ddf_out:
df[m] = ''
return ddf_out
那我运行:
result = ddf.map_partitions(flatten, meta=meta).compute()
一些入门注意事项:
.apply(literal_eval)
这不是比 map
更好吗?
I then re-create the Dask DF:
ddf = dd.from_pandas(ddf_out, npartitions=nCores)
ddf_out
已经是一个 dask dataframe,我不知道你为什么要这样做。
The columns in result seem to match those in metadir.
result.columns
的值取自您提供的元数据,除非您提出要求,否则不会进行任何计算(dask 在大多数操作中都是惰性的)。 ValueError 异常不提供更多信息吗?
这是一个完整的例子
x = ({'F9_07_PZ_COMP_DIRECT': '0',
'F9_07_PZ_DIRTRSTKEY_NAME': 'DEBRA MEALY',
'F9_07_PZ_COMP_OTHER': '0',
'F9_07_PZ_COMP_RELATED': '0',
'F9_07_PZ_TITLE': 'CHAIR PERSON',
'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X'},
{'F9_07_PZ_COMP_DIRECT': '0',
'F9_07_PZ_DIRTRSTKEY_NAME': 'HELEN GORDON',
'F9_07_PZ_COMP_OTHER': '0',
'F9_07_PZ_COMP_RELATED': '0',
'F9_07_PZ_TITLE': 'VICE CHAIR',
'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X'})
df = pd.DataFrame({'a': x})
d = dd.from_pandas(df, 1)
meta = pd.DataFrame(columns=['F9_07_PZ_COMP_DIRECT',
'F9_07_PZ_DIRTRSTKEY_NAME',
'F9_07_PZ_COMP_OTHER', 'F9_07_PZ_COMP_RELATED', 'F9_07_PZ_TITLE',
'F9_07_PZ_AVE_HOURS_WEEK', 'F9_07_PC_TRUSTEE_INDIVIDUAL'], dtype="O")
d.map_partitions(lambda df: df.a.apply(pd.Series), meta=meta).compute()
我怎么知道meta
要用什么?我将该函数应用于 pandas 数据框 - 您可以使用一小块数据框来执行此操作。
一些补充说明:
- 它是一个 anti-pattern 用 pandas 加载数据,传递给 dask worker,然后将整个结果收集回 pandas (在内存中)数据帧,你是不太可能以这种方式看到加速,并且可能会产生大量开销。您最好加载
dd.read_csv
之类的东西,并使用 dask 函数进行聚合或编写。只有 compute()
会很小或没有 return 任何东西(因为它涉及写入输出)。官方的例子没有使用from_pandas.
- string 和 dict 处理是 python 方法,因此持有任何 python 函数的解释器锁 (GIL):线程实际上不会 运行 并行。要获得并行性,您需要在进程中 运行,使用 https://docs.dask.org/en/latest/setup/single-distributed.html
最容易实现
- 分布式调度程序还允许您访问仪表板,其中包含许多有用的信息来诊断您的系统 运行ning 的情况。如果您有需要遵守的防火墙规则,您还可以对其行为进行很多配置。
给定一个小的或 medium-sized 数据集,简单的 PANDAS 解决方案将起作用:
df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
但是,对于 1600 万行,PANDAS 解决方案不会 运行 在具有 16GB RAM 的 Macbook 或具有 96GB 的 Windows 机器上。出于这个原因,我看向了达斯克。然而,正如上面的答案和评论所示,Dask 解决方案不起作用,因为我数据集中的每个观察不一定具有所有字典键。总的来说,Form990PartVIISectionAGrp 的 1600 万个观察值具有以下列表中的 15 个键:
newkeys = ['F9_07_PC_TRUSTEE_INDIVIDUAL',
'F9_07_PZ_DIRTRSTKEY_NAME',
'F9_07_PZ_COMP_OTHER',
'F9_07_PZ_COMP_RELATED',
'F9_07_PZ_TITLE',
'F9_07_PZ_AVE_HOURS_WEEK',
'F9_07_PZ_COMP_DIRECT',
'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
'F9_07_PC_OFFICER',
'F9_07_PC_HIGH_COMP_EMPLOYEE',
'BusinessName',
'F9_07_PC_KEY_EMPLOYEE',
'F9_07_PC_TRUSTEE_INSTITUTIONAL',
'NameBusiness',
'F9_07_PC_FORMER']
因此,我的解决方案涉及采用上面@mdurant 提供的一些提示,并首先向每一行添加任何缺失的键:
for index, row in df[:].iterrows():
for k in newkeys:
row['Form990PartVIISectionAGrp'].setdefault(k, np.nan)
我的 Macbook 花了 100 分钟。根据 mdurant 的评论,我将数据帧保存为 JSON 格式:
df.to_json('df.json', orient='records', lines=True)
并将文件作为文本读入 Dask:
import json
import dask.bag as db
b = db.read_text('df.json').map(json.loads)
然后创建一个函数来展平列:
def flatten(record):
return {
'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],
'F9_07_PZ_COMP_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_RELATED'],
'F9_07_PC_TRUSTEE_INDIVIDUAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INDIVIDUAL'],
'F9_07_PZ_DIRTRSTKEY_NAME': record['Form990PartVIISectionAGrp']['F9_07_PZ_DIRTRSTKEY_NAME'],
'F9_07_PZ_COMP_DIRECT': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_DIRECT'],
'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],
'BusinessName': record['Form990PartVIISectionAGrp']['BusinessName'],
'F9_07_PC_FORMER': record['Form990PartVIISectionAGrp']['F9_07_PC_FORMER'],
'F9_07_PC_HIGH_COMP_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_HIGH_COMP_EMPLOYEE'],
'F9_07_PC_KEY_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_KEY_EMPLOYEE'],
'F9_07_PC_OFFICER': record['Form990PartVIISectionAGrp']['F9_07_PC_OFFICER'],
'F9_07_PC_TRUSTEE_INSTITUTIONAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INSTITUTIONAL'],
'F9_07_PZ_AVE_HOURS_WEEK': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK'],
'F9_07_PZ_AVE_HOURS_WEEK_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK_RELATED'],
'F9_07_PZ_TITLE': record['Form990PartVIISectionAGrp']['F9_07_PZ_TITLE'],
'NameBusiness': record['Form990PartVIISectionAGrp']['NameBusiness'],
'URL': record['URL'],
}
然后我可以应用函数:
df = b.map(flatten).to_dataframe()
并将数据导出为 CSV:
df.to_csv('compensation*.csv')
这很有魅力!简而言之,根据上面 mdurant 的有用评论,关键是 1) 将缺失的键添加到所有观察中,以及 2) 不要将数据从 PANDAS 读入 Dask(改为使用文本或 CSV)。解决这两个问题可以很好地解决这个问题。
我是 Dask 的新手,我正在寻找一种方法来展平 PANDAS 数据框中的字典列。这是 1600 万行数据框第一行的屏幕截图:
这里是三行文本的示例:
{{u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'DEBRA MEALY', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'CHAIR PERSON', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X'}, {u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'HELEN GORDON', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'VICE CHAIR', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X'}, {'F9_07_PC_HIGH_COMP_EMPLOYEE': 'X', 'F9_07_PZ_DIRTRSTKEY_NAME': 'ROB S KHANUJA', 'F9_07_PZ_COMP_OTHER': '14902', 'F9_07_PZ_COMP_RELATED': '0', 'F9_07_PZ_TITLE': 'EXEC. DIR. OPERATIONS', 'F9_07_PZ_AVE_HOURS_WEEK': '40.00', 'F9_07_PZ_COMP_DIRECT': '133173'}}
我通常会使用以下代码展平 Form990PartVIISectionAGrp 列:
df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].swifter.apply(pd.Series)], axis=1)
我想在 Dask 中执行此操作,但出现以下错误:"ValueError: The columns in the computed data do not match the columns in the provided metadata."
我正在使用 Python 2.7。我导入相关包
from dask import dataframe as dd
from dask.multiprocessing import get
from multiprocessing import cpu_count
nCores = cpu_count()
为了测试代码,我创建了一个随机数据样本:
dfs = df.sample(1000)
然后生成Dask数据帧:
ddf = dd.from_pandas(dfs, npartitions=nCores)
该列当前为字符串格式,因此我将其转换为字典。通常,我会写一行代码:
dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval)
但我在这里尝试以更 'Dask-like' 的形式进行,因此我编写了以下函数然后应用它:
def make_dict(dfs):
dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval)
return dfs
ddf_out = ddf.map_partitions(make_dict, meta=dfs[:0]).compute()
这行得通——它 returns 一个 PANDAS 数据框,其中 Form990PartVIISectionAGrp 列是字典格式(但是,它并不比非 Dask 应用程序快)。
然后我重新创建了 Dask DF:
ddf = dd.from_pandas(ddf_out, npartitions=nCores)
并写一个函数来压平列:
def flatten(ddf_out):
ddf_out = pd.concat([ddf_out.drop(['Form990PartVIISectionAGrp'], axis=1), ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
#ddf_out = ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)
return ddf_out
如果我然后运行这个代码:
result = ddf.map_partitions(flatten)
我得到以下输出,其中列没有被展平:
我也遇到了关于缺少元数据的错误,并且考虑到以上内容对解析字典列没有帮助,所以我创建了一个列列表,该列列表由 Python 平整列和用它来创建列和数据类型的字典:
metadir = {u'BusinessName': 'O', u'F9_07_PC_FORMER': 'O', u'F9_07_PC_HIGH_COMP_EMPLOYEE': 'O',
u'F9_07_PC_KEY_EMPLOYEE': 'O', u'F9_07_PC_OFFICER': 'O',
u'F9_07_PC_TRUSTEE_INDIVIDUAL': 'O', u'F9_07_PC_TRUSTEE_INSTITUTIONAL': 'O',
u'F9_07_PZ_AVE_HOURS_WEEK': 'O', u'F9_07_PZ_AVE_HOURS_WEEK_RELATED': 'O',
u'F9_07_PZ_COMP_DIRECT': 'O', u'F9_07_PZ_COMP_OTHER': 'O',
u'F9_07_PZ_COMP_RELATED': 'O', u'F9_07_PZ_DIRTRSTKEY_NAME': 'O',
u'F9_07_PZ_TITLE': 'O', u'NameBusiness': 'O', u'URL': 'O'}
然后我用这个元数据应用展平函数:
result = ddf.map_partitions(flatten, meta=metadir)
我得到以下输出结果:
运行 result.columns 产生这个:
失败的地方是 运行ning compute(),在那里我收到以下错误消息:"ValueError: The columns in the computed data do not match the columns in the provided metadata." 无论我写什么,我都会收到同样的错误:
result.compute()
或
result.compute(meta=metadir)
我不确定我做错了什么。 result 中的列似乎与 metadir 中的列匹配。任何建议将不胜感激。
更新: 这是我更新扁平化功能的尝试。
meta = pd.DataFrame(columns=['URL', 'F9_07_PC_TRUSTEE_INDIVIDUAL',
'F9_07_PZ_DIRTRSTKEY_NAME',
'F9_07_PZ_COMP_OTHER',
'F9_07_PZ_COMP_RELATED',
'F9_07_PZ_TITLE',
'F9_07_PZ_AVE_HOURS_WEEK',
'F9_07_PZ_COMP_DIRECT',
'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
'F9_07_PC_OFFICER',
'F9_07_PC_HIGH_COMP_EMPLOYEE',
'BusinessName',
'F9_07_PC_KEY_EMPLOYEE',
'F9_07_PC_TRUSTEE_INSTITUTIONAL',
'NameBusiness',
'F9_07_PC_FORMER'], dtype="O")
def flatten(ddf_out):
ddf_out = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
for m in meta:
if m not in ddf_out:
df[m] = ''
return ddf_out
那我运行:
result = ddf.map_partitions(flatten, meta=meta).compute()
一些入门注意事项:
.apply(literal_eval)
这不是比 map
更好吗?
I then re-create the Dask DF:
ddf = dd.from_pandas(ddf_out, npartitions=nCores)
ddf_out
已经是一个 dask dataframe,我不知道你为什么要这样做。
The columns in result seem to match those in metadir.
result.columns
的值取自您提供的元数据,除非您提出要求,否则不会进行任何计算(dask 在大多数操作中都是惰性的)。 ValueError 异常不提供更多信息吗?
这是一个完整的例子
x = ({'F9_07_PZ_COMP_DIRECT': '0',
'F9_07_PZ_DIRTRSTKEY_NAME': 'DEBRA MEALY',
'F9_07_PZ_COMP_OTHER': '0',
'F9_07_PZ_COMP_RELATED': '0',
'F9_07_PZ_TITLE': 'CHAIR PERSON',
'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X'},
{'F9_07_PZ_COMP_DIRECT': '0',
'F9_07_PZ_DIRTRSTKEY_NAME': 'HELEN GORDON',
'F9_07_PZ_COMP_OTHER': '0',
'F9_07_PZ_COMP_RELATED': '0',
'F9_07_PZ_TITLE': 'VICE CHAIR',
'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X'})
df = pd.DataFrame({'a': x})
d = dd.from_pandas(df, 1)
meta = pd.DataFrame(columns=['F9_07_PZ_COMP_DIRECT',
'F9_07_PZ_DIRTRSTKEY_NAME',
'F9_07_PZ_COMP_OTHER', 'F9_07_PZ_COMP_RELATED', 'F9_07_PZ_TITLE',
'F9_07_PZ_AVE_HOURS_WEEK', 'F9_07_PC_TRUSTEE_INDIVIDUAL'], dtype="O")
d.map_partitions(lambda df: df.a.apply(pd.Series), meta=meta).compute()
我怎么知道meta
要用什么?我将该函数应用于 pandas 数据框 - 您可以使用一小块数据框来执行此操作。
一些补充说明:
- 它是一个 anti-pattern 用 pandas 加载数据,传递给 dask worker,然后将整个结果收集回 pandas (在内存中)数据帧,你是不太可能以这种方式看到加速,并且可能会产生大量开销。您最好加载
dd.read_csv
之类的东西,并使用 dask 函数进行聚合或编写。只有compute()
会很小或没有 return 任何东西(因为它涉及写入输出)。官方的例子没有使用from_pandas. - string 和 dict 处理是 python 方法,因此持有任何 python 函数的解释器锁 (GIL):线程实际上不会 运行 并行。要获得并行性,您需要在进程中 运行,使用 https://docs.dask.org/en/latest/setup/single-distributed.html 最容易实现
- 分布式调度程序还允许您访问仪表板,其中包含许多有用的信息来诊断您的系统 运行ning 的情况。如果您有需要遵守的防火墙规则,您还可以对其行为进行很多配置。
给定一个小的或 medium-sized 数据集,简单的 PANDAS 解决方案将起作用:
df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
但是,对于 1600 万行,PANDAS 解决方案不会 运行 在具有 16GB RAM 的 Macbook 或具有 96GB 的 Windows 机器上。出于这个原因,我看向了达斯克。然而,正如上面的答案和评论所示,Dask 解决方案不起作用,因为我数据集中的每个观察不一定具有所有字典键。总的来说,Form990PartVIISectionAGrp 的 1600 万个观察值具有以下列表中的 15 个键:
newkeys = ['F9_07_PC_TRUSTEE_INDIVIDUAL',
'F9_07_PZ_DIRTRSTKEY_NAME',
'F9_07_PZ_COMP_OTHER',
'F9_07_PZ_COMP_RELATED',
'F9_07_PZ_TITLE',
'F9_07_PZ_AVE_HOURS_WEEK',
'F9_07_PZ_COMP_DIRECT',
'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
'F9_07_PC_OFFICER',
'F9_07_PC_HIGH_COMP_EMPLOYEE',
'BusinessName',
'F9_07_PC_KEY_EMPLOYEE',
'F9_07_PC_TRUSTEE_INSTITUTIONAL',
'NameBusiness',
'F9_07_PC_FORMER']
因此,我的解决方案涉及采用上面@mdurant 提供的一些提示,并首先向每一行添加任何缺失的键:
for index, row in df[:].iterrows():
for k in newkeys:
row['Form990PartVIISectionAGrp'].setdefault(k, np.nan)
我的 Macbook 花了 100 分钟。根据 mdurant 的评论,我将数据帧保存为 JSON 格式:
df.to_json('df.json', orient='records', lines=True)
并将文件作为文本读入 Dask:
import json
import dask.bag as db
b = db.read_text('df.json').map(json.loads)
然后创建一个函数来展平列:
def flatten(record):
return {
'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],
'F9_07_PZ_COMP_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_RELATED'],
'F9_07_PC_TRUSTEE_INDIVIDUAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INDIVIDUAL'],
'F9_07_PZ_DIRTRSTKEY_NAME': record['Form990PartVIISectionAGrp']['F9_07_PZ_DIRTRSTKEY_NAME'],
'F9_07_PZ_COMP_DIRECT': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_DIRECT'],
'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],
'BusinessName': record['Form990PartVIISectionAGrp']['BusinessName'],
'F9_07_PC_FORMER': record['Form990PartVIISectionAGrp']['F9_07_PC_FORMER'],
'F9_07_PC_HIGH_COMP_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_HIGH_COMP_EMPLOYEE'],
'F9_07_PC_KEY_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_KEY_EMPLOYEE'],
'F9_07_PC_OFFICER': record['Form990PartVIISectionAGrp']['F9_07_PC_OFFICER'],
'F9_07_PC_TRUSTEE_INSTITUTIONAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INSTITUTIONAL'],
'F9_07_PZ_AVE_HOURS_WEEK': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK'],
'F9_07_PZ_AVE_HOURS_WEEK_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK_RELATED'],
'F9_07_PZ_TITLE': record['Form990PartVIISectionAGrp']['F9_07_PZ_TITLE'],
'NameBusiness': record['Form990PartVIISectionAGrp']['NameBusiness'],
'URL': record['URL'],
}
然后我可以应用函数:
df = b.map(flatten).to_dataframe()
并将数据导出为 CSV:
df.to_csv('compensation*.csv')
这很有魅力!简而言之,根据上面 mdurant 的有用评论,关键是 1) 将缺失的键添加到所有观察中,以及 2) 不要将数据从 PANDAS 读入 Dask(改为使用文本或 CSV)。解决这两个问题可以很好地解决这个问题。