将 pandas df 添加到 dask df
adding pandas df to dask df
我最近遇到的 dask 问题之一是编码需要很多时间,我想加快速度。
问题:给定一个 dask df (ddf),对其进行编码,然后return ddf。
这是一些开始的代码:
# !pip install feature_engine
import dask.dataframe as dd
import pandas as pd
import numpy as np
from feature_engine.encoding import CountFrequencyEncoder
df = pd.DataFrame(np.random.randint(1, 5, (100,3)), columns=['a', 'b', 'c'])
# make it object cols
for col in df.columns:
df[col] = df[col].astype(str)
ddf = dd.from_pandas(df, npartitions=3)
x_freq = ddf.copy()
for col_idx, col_name in enumerate(x_freq.columns):
freq_enc = CountFrequencyEncoder(encoding_method='frequency')
col_to_encode = x_freq[col_name].to_frame().compute()
encoded_col = freq_enc.fit_transform(col_to_encode).rename(columns={col_name: col_name + '_freq'})
x_freq = dd.concat([x_freq, encoded_col], axis=1)
x_freq.head()
它将 运行 如我所料,将 pandas df 添加到 dask df - 没问题。
但是当我尝试另一个ddf时,出现错误:
x_freq = x.copy()
# npartitions = x_freq.npartitions
# x_freq = x_freq.repartition(npartitions=npartitions).reset_index(drop=True)
for col_idx, col_name in enumerate(x_freq.columns):
freq_enc = CountFrequencyEncoder(encoding_method='frequency')
col_to_encode = x_freq[col_name].to_frame().compute()
encoded_col = freq_enc.fit_transform(col_to_encode).rename(columns={col_name: col_name + '_freq'})
x_freq = dd.concat([x_freq, encoded_col], axis=1)
break
x_freq.head()
连接期间发生错误:
ValueError: Unable to concatenate DataFrame with unknown division specifying axis=1
这是我加载“错误”ddf 的方式:
ddf = dd.read_parquet(os.path.join(dir_list[0], '*.parquet'), engine='pyarrow').repartition(partition_size='100MB')
我读到我应该尝试重新分区 and/or 重置索引 and/or 使用分配。都没有用。
x_freq = x.copy()
在第二个例子中类似于:
x_freq = ddf.copy()
从某种意义上说,在第一个示例中,x 只是一些我正在尝试编码的 ddf,但在这里定义它会需要很多代码。
有人能帮忙吗?
这是我认为可能会发生的事情。
您的镶木地板文件中可能没有分区信息。因此,您不能只是 dd.concat
,因为尚不清楚分区如何对齐。
您可以通过
查看
x_freq.known_divisions # is likely False
x_freq.divisions # is likely (None, None, None, None)
由于未知除法是问题所在,您可以使用第一个示例中的合成数据重现该问题
x_freq = ddf.clear_divisions().copy()
您可以通过重新设置索引来解决此问题:
x_freq.reset_index().set_index(index_column_name)
其中 index_column_name
是索引列的名称。
之后还可以考虑使用正确的索引保存数据,这样就不必每次都计算了。
注 1:并行化
顺便说一下,由于您是在使用每一列之前对其进行计算,因此您并没有真正利用 dask 的并行化能力。这是一个可能会更好地利用并行化的工作流程:
def count_frequency_encoder(s):
return s.replace(s.value_counts(normalize=True).compute().to_dict())
frequency_columns = {
f'{col_name}_freq': count_frequency_encoder(x_freq[col_name])
for col_name in x_freq.columns}
x_freq = x_freq.assign(**frequency_columns)
注2:to_frame
小提示:
x_freq[col_name].to_frame()
等同于
x_freq[[col_name]]
我最近遇到的 dask 问题之一是编码需要很多时间,我想加快速度。
问题:给定一个 dask df (ddf),对其进行编码,然后return ddf。
这是一些开始的代码:
# !pip install feature_engine
import dask.dataframe as dd
import pandas as pd
import numpy as np
from feature_engine.encoding import CountFrequencyEncoder
df = pd.DataFrame(np.random.randint(1, 5, (100,3)), columns=['a', 'b', 'c'])
# make it object cols
for col in df.columns:
df[col] = df[col].astype(str)
ddf = dd.from_pandas(df, npartitions=3)
x_freq = ddf.copy()
for col_idx, col_name in enumerate(x_freq.columns):
freq_enc = CountFrequencyEncoder(encoding_method='frequency')
col_to_encode = x_freq[col_name].to_frame().compute()
encoded_col = freq_enc.fit_transform(col_to_encode).rename(columns={col_name: col_name + '_freq'})
x_freq = dd.concat([x_freq, encoded_col], axis=1)
x_freq.head()
它将 运行 如我所料,将 pandas df 添加到 dask df - 没问题。 但是当我尝试另一个ddf时,出现错误:
x_freq = x.copy()
# npartitions = x_freq.npartitions
# x_freq = x_freq.repartition(npartitions=npartitions).reset_index(drop=True)
for col_idx, col_name in enumerate(x_freq.columns):
freq_enc = CountFrequencyEncoder(encoding_method='frequency')
col_to_encode = x_freq[col_name].to_frame().compute()
encoded_col = freq_enc.fit_transform(col_to_encode).rename(columns={col_name: col_name + '_freq'})
x_freq = dd.concat([x_freq, encoded_col], axis=1)
break
x_freq.head()
连接期间发生错误:
ValueError: Unable to concatenate DataFrame with unknown division specifying axis=1
这是我加载“错误”ddf 的方式:
ddf = dd.read_parquet(os.path.join(dir_list[0], '*.parquet'), engine='pyarrow').repartition(partition_size='100MB')
我读到我应该尝试重新分区 and/or 重置索引 and/or 使用分配。都没有用。
x_freq = x.copy()
在第二个例子中类似于:
x_freq = ddf.copy()
从某种意义上说,在第一个示例中,x 只是一些我正在尝试编码的 ddf,但在这里定义它会需要很多代码。
有人能帮忙吗?
这是我认为可能会发生的事情。
您的镶木地板文件中可能没有分区信息。因此,您不能只是 dd.concat
,因为尚不清楚分区如何对齐。
您可以通过
查看x_freq.known_divisions # is likely False
x_freq.divisions # is likely (None, None, None, None)
由于未知除法是问题所在,您可以使用第一个示例中的合成数据重现该问题
x_freq = ddf.clear_divisions().copy()
您可以通过重新设置索引来解决此问题:
x_freq.reset_index().set_index(index_column_name)
其中 index_column_name
是索引列的名称。
之后还可以考虑使用正确的索引保存数据,这样就不必每次都计算了。
注 1:并行化
顺便说一下,由于您是在使用每一列之前对其进行计算,因此您并没有真正利用 dask 的并行化能力。这是一个可能会更好地利用并行化的工作流程:
def count_frequency_encoder(s):
return s.replace(s.value_counts(normalize=True).compute().to_dict())
frequency_columns = {
f'{col_name}_freq': count_frequency_encoder(x_freq[col_name])
for col_name in x_freq.columns}
x_freq = x_freq.assign(**frequency_columns)
注2:to_frame
小提示:
x_freq[col_name].to_frame()
等同于
x_freq[[col_name]]