了解分区在 Dask 中的工作原理
Understanding How Partitions Work in Dask
我有一个包含 17,850,209 行的 CSV,对于 Pandas
来说太大了,无法处理我的整个代码,因此我尝试使用 Dask
对其进行操作。我的所有代码 "works" 但是当我将 CSV 写入磁盘时,我没有获得所有 17,850,209 条记录。相反,我得到 N
个 CSV(其中 N
= npartitions
),每个 CSV 只有 50,000 条记录,在本例中,总共有 900,000 条记录。
首先,我读入原始 CSV 文件并创建包含前 2 行和时间戳的干净数据框:
import pandas as pd
import numpy as np
import time as t
import dask.dataframe as dd
my_dtypes = {
'uid': object,
'state': object,
'var01': np.float64,
'var02': np.float64
}
df_raw = pd.read_csv('/Users/me/input_data/Whosebug_raw.csv', dtype = my_dtypes, sep=',')
df_clean = pd.DataFrame(df_raw['uid'].str.strip().str.replace('{','').str.replace('}',''))
df_clean['state'] = pd.DataFrame(df_raw['state'].str.strip())
df_clean['rowcreatetimestamp'] = t.strftime("%Y-%m-%d %H:%M:%S")
这给了我以下(正确的)计数:
df_clean.count()
# uid 17850209
# state 17850209
# rowcreatetimestamp 17850209
# dtype: int64
然后我将它移动到 Dask
,chucksize 为 1,000,000(我团队的大多数机器都可以处理)。
df_clean = dd.from_pandas(df_clean, chunksize=1000000)
df_clean
# dd.DataFrame<from_pa..., npartitions=18, divisions=(0, 1000000, 2000000, ..., 17000000, 17850208)>
df_clean.compute()
# [17850209 rows x 3 columns]
df_clean.count().compute()
# uid 17850209
# state 17850209
# rowcreatetimestamp 17850209
# dtype: int64
然而,当我执行第一个 Dask
操作时,它仅 "keeps" 900,000 行数据帧并创建 50,000 行新列:
df_clean['var01'] = dd.from_array(np.where((df_raw['var01'] > 0), 1, 0))
df_clean.compute()
# [900000 rows x 4 columns]
df_clean.count().compute()
uid 900000
state 900000
rowcreatetimestamp 900000
var01 50000
dtype: int64
当我将 Dask
数据帧写入磁盘时,我得到 18 个 CSV,每个包含 50,000 条记录。我使用了 compute=True
参数并省略了它并得到了相同的结果:
df_clean.to_csv('/Users/me/input_data/Whosebug_clean_*.csv', header=True, sep=',', index=False, compute=True)
df_clean.to_csv('/Users/me/input_data/Whosebug_clean_*.csv', header=True, sep=',', index=False)
当我写入单个文件时,我得到 900,000 条记录加上 header:
df_clean.compute().to_csv('/Users/me/input_data/Whosebug_clean_one_file.csv', header=True, sep=',', index=False)
(在bash)
wc -l '/Users/me/input_data/Whosebug_clean_one_file.csv'
900001
虽然 900,000 条记录是错误的,但当我打开 CSV 文件时,只有前 50,000 行包含 var01
的数据。
我已经搜索了 latest documentation,但没有看到在输出包含所有数据的块文件或包含正确行数的单个文件方面我遗漏了什么。
TIA。
这条线有点奇怪
df_clean['var01'] = dd.from_array(np.where((df_raw['var01'] > 0), 1, 0))
您将 dask.dataframe、dask.array 和 numpy 混合在一起。即使支持这种行为(这是不确定的),它也可能非常非常缓慢地混合像这样的惰性和具体操作。
相反,我建议使用 dd.Series.where
df_clean['var01'] = df_raw.var01.where(df_raw.var01 > 0, 1)
df_clean['var01'] = df_raw.var01.where(df_raw.var01 < 0, 0)
我有一个包含 17,850,209 行的 CSV,对于 Pandas
来说太大了,无法处理我的整个代码,因此我尝试使用 Dask
对其进行操作。我的所有代码 "works" 但是当我将 CSV 写入磁盘时,我没有获得所有 17,850,209 条记录。相反,我得到 N
个 CSV(其中 N
= npartitions
),每个 CSV 只有 50,000 条记录,在本例中,总共有 900,000 条记录。
首先,我读入原始 CSV 文件并创建包含前 2 行和时间戳的干净数据框:
import pandas as pd
import numpy as np
import time as t
import dask.dataframe as dd
my_dtypes = {
'uid': object,
'state': object,
'var01': np.float64,
'var02': np.float64
}
df_raw = pd.read_csv('/Users/me/input_data/Whosebug_raw.csv', dtype = my_dtypes, sep=',')
df_clean = pd.DataFrame(df_raw['uid'].str.strip().str.replace('{','').str.replace('}',''))
df_clean['state'] = pd.DataFrame(df_raw['state'].str.strip())
df_clean['rowcreatetimestamp'] = t.strftime("%Y-%m-%d %H:%M:%S")
这给了我以下(正确的)计数:
df_clean.count()
# uid 17850209
# state 17850209
# rowcreatetimestamp 17850209
# dtype: int64
然后我将它移动到 Dask
,chucksize 为 1,000,000(我团队的大多数机器都可以处理)。
df_clean = dd.from_pandas(df_clean, chunksize=1000000)
df_clean
# dd.DataFrame<from_pa..., npartitions=18, divisions=(0, 1000000, 2000000, ..., 17000000, 17850208)>
df_clean.compute()
# [17850209 rows x 3 columns]
df_clean.count().compute()
# uid 17850209
# state 17850209
# rowcreatetimestamp 17850209
# dtype: int64
然而,当我执行第一个 Dask
操作时,它仅 "keeps" 900,000 行数据帧并创建 50,000 行新列:
df_clean['var01'] = dd.from_array(np.where((df_raw['var01'] > 0), 1, 0))
df_clean.compute()
# [900000 rows x 4 columns]
df_clean.count().compute()
uid 900000
state 900000
rowcreatetimestamp 900000
var01 50000
dtype: int64
当我将 Dask
数据帧写入磁盘时,我得到 18 个 CSV,每个包含 50,000 条记录。我使用了 compute=True
参数并省略了它并得到了相同的结果:
df_clean.to_csv('/Users/me/input_data/Whosebug_clean_*.csv', header=True, sep=',', index=False, compute=True)
df_clean.to_csv('/Users/me/input_data/Whosebug_clean_*.csv', header=True, sep=',', index=False)
当我写入单个文件时,我得到 900,000 条记录加上 header:
df_clean.compute().to_csv('/Users/me/input_data/Whosebug_clean_one_file.csv', header=True, sep=',', index=False)
(在bash)
wc -l '/Users/me/input_data/Whosebug_clean_one_file.csv'
900001
虽然 900,000 条记录是错误的,但当我打开 CSV 文件时,只有前 50,000 行包含 var01
的数据。
我已经搜索了 latest documentation,但没有看到在输出包含所有数据的块文件或包含正确行数的单个文件方面我遗漏了什么。
TIA。
这条线有点奇怪
df_clean['var01'] = dd.from_array(np.where((df_raw['var01'] > 0), 1, 0))
您将 dask.dataframe、dask.array 和 numpy 混合在一起。即使支持这种行为(这是不确定的),它也可能非常非常缓慢地混合像这样的惰性和具体操作。
相反,我建议使用 dd.Series.where
df_clean['var01'] = df_raw.var01.where(df_raw.var01 > 0, 1)
df_clean['var01'] = df_raw.var01.where(df_raw.var01 < 0, 0)