运行 两个 dask-ml 输入器同时进行而不是顺序进行
Running two dask-ml imputers simultaneously instead of sequentially
我可以像这样使用 dask-ml 估算平均值和最频繁的值,这很好用:
mean_imputer = impute.SimpleImputer(strategy='mean')
most_frequent_imputer = impute.SimpleImputer(strategy='most_frequent')
data = [[100, 2, 5], [np.nan, np.nan, np.nan], [70, 7, 5]]
df = pd.DataFrame(data, columns = ['Weight', 'Age', 'Height'])
df.iloc[:, [0,1]] = mean_imputer.fit_transform(df.iloc[:,[0,1]])
df.iloc[:, [2]] = most_frequent_imputer.fit_transform(df.iloc[:,[2]])
print(df)
Weight Age Height
0 100.0 2.0 5.0
1 85.0 4.5 5.0
2 70.0 7.0 5.0
但是,如果我有 1 亿行数据,dask 似乎会执行两个循环,而它只能执行一个循环,是否有可能 运行 两个输入同时 and/or 并行而不是顺序?实现该目标的示例代码是什么?
如果实体相互独立,您可以使用 dask.delayed as suggested in docs and Dask Toutorial 并行计算。
您的代码如下所示:
from dask.distributed import Client
client = Client(n_workers=4)
from dask import delayed
import numpy as np
import pandas as pd
from dask_ml import impute
mean_imputer = impute.SimpleImputer(strategy='mean')
most_frequent_imputer = impute.SimpleImputer(strategy='most_frequent')
def fit_transform_mi(d):
return mean_imputer.fit_transform(d)
def fit_transform_mfi(d):
return most_frequent_imputer.fit_transform(d)
def setdf(a,b,df):
df.iloc[:, [0,1]]=a
df.iloc[:, [2]]=b
return df
data = [[100, 2, 5], [np.nan, np.nan, np.nan], [70, 7, 5]]
df = pd.DataFrame(data, columns = ['Weight', 'Age', 'Height'])
a = delayed(fit_transform_mi)(df.iloc[:,[0,1]])
b = delayed(fit_transform_mfi)(df.iloc[:,[2]])
c = delayed(setdf)(a,b,df)
df= c.compute()
print(df)
client.close()
c对象是一个惰性Delayed对象。该对象包含我们计算最终结果所需的一切,包括对所有所需函数的引用以及它们的输入和彼此之间的关系。
Dask 对于通过并行处理加速计算以及当数据不适合内存时非常有用。在下面的示例中,十个文件中包含的 300M 行数据是使用 Dask 估算的。该过程的图形表明: 1. 均值和最频繁的输入器是运行并行的; 2. 所有十个文件也并行处理。
设置
为了准备大量的数据,将你问题中的三行数据复制,组成一个30M行的数据框。数据框保存在十个不同的文件中,总共产生 300M 行,这些行的统计数据与您的问题相同。
import numpy as np
import pandas as pd
N = 10000000
weight = np.array([100, np.nan, 70]*N)
age = np.array([2, np.nan, 7]*N)
height = np.array([5, np.nan, 5]*N)
df = pd.DataFrame({'Weight': weight, 'Age': age, 'Height': height})
# Save ten large data frames to disk
for i in range(10):
df.to_parquet(f'./df_to_impute_{i}.parquet', compression='gzip',
index=False)
Dask 插补
import graphviz
import dask
import dask.dataframe as dd
from dask_ml.impute import SimpleImputer
# Read all files for imputation in a dask data frame from a specific directory
df = dd.read_parquet('./df_to_impute_*.parquet')
# Set up the imputers and columns
mean_imputer = SimpleImputer(strategy='mean')
mostfreq_imputer = SimpleImputer(strategy='most_frequent')
imputers = [mean_imputer, mostfreq_imputer]
mean_cols = ['Weight', 'Age']
freq_cols = ['Height']
columns = [mean_cols, freq_cols]
# Create a new data frame with imputed values, then visualize the computation.
df_list = []
for imputer, col in zip(imputers, columns):
df_list.append(imputer.fit_transform(df.loc[:, col]))
imputed_df = dd.concat(df_list, axis=1)
imputed_df.visualize(filename='imputed.svg', rankdir='LR')
# Save the new data frame to disk
imputed_df.to_parquet('imputed_df.parquet', compression='gzip')
输出
imputed_df.head()
Weight Age Height
0 100.0 2.0 5.0
1 85.0 4.5 5.0
2 70.0 7.0 5.0
3 100.0 2.0 5.0
4 85.0 4.5 5.0
# Check the summary statistics make sense - 300M rows and stats as expected
imputed_df.describe().compute()
Weight Age Height
count 3.000000e+08 3.000000e+08 300000000.0
mean 8.500000e+01 4.500000e+00 5.0
std 1.224745e+01 2.041241e+00 0.0
min 7.000000e+01 2.000000e+00 5.0
25% 7.000000e+01 2.000000e+00 5.0
50% 8.500000e+01 4.500000e+00 5.0
75% 1.000000e+02 7.000000e+00 5.0
max 1.000000e+02 7.000000e+00 5.0
我可以像这样使用 dask-ml 估算平均值和最频繁的值,这很好用:
mean_imputer = impute.SimpleImputer(strategy='mean')
most_frequent_imputer = impute.SimpleImputer(strategy='most_frequent')
data = [[100, 2, 5], [np.nan, np.nan, np.nan], [70, 7, 5]]
df = pd.DataFrame(data, columns = ['Weight', 'Age', 'Height'])
df.iloc[:, [0,1]] = mean_imputer.fit_transform(df.iloc[:,[0,1]])
df.iloc[:, [2]] = most_frequent_imputer.fit_transform(df.iloc[:,[2]])
print(df)
Weight Age Height
0 100.0 2.0 5.0
1 85.0 4.5 5.0
2 70.0 7.0 5.0
但是,如果我有 1 亿行数据,dask 似乎会执行两个循环,而它只能执行一个循环,是否有可能 运行 两个输入同时 and/or 并行而不是顺序?实现该目标的示例代码是什么?
如果实体相互独立,您可以使用 dask.delayed as suggested in docs and Dask Toutorial 并行计算。
您的代码如下所示:
from dask.distributed import Client
client = Client(n_workers=4)
from dask import delayed
import numpy as np
import pandas as pd
from dask_ml import impute
mean_imputer = impute.SimpleImputer(strategy='mean')
most_frequent_imputer = impute.SimpleImputer(strategy='most_frequent')
def fit_transform_mi(d):
return mean_imputer.fit_transform(d)
def fit_transform_mfi(d):
return most_frequent_imputer.fit_transform(d)
def setdf(a,b,df):
df.iloc[:, [0,1]]=a
df.iloc[:, [2]]=b
return df
data = [[100, 2, 5], [np.nan, np.nan, np.nan], [70, 7, 5]]
df = pd.DataFrame(data, columns = ['Weight', 'Age', 'Height'])
a = delayed(fit_transform_mi)(df.iloc[:,[0,1]])
b = delayed(fit_transform_mfi)(df.iloc[:,[2]])
c = delayed(setdf)(a,b,df)
df= c.compute()
print(df)
client.close()
c对象是一个惰性Delayed对象。该对象包含我们计算最终结果所需的一切,包括对所有所需函数的引用以及它们的输入和彼此之间的关系。
Dask 对于通过并行处理加速计算以及当数据不适合内存时非常有用。在下面的示例中,十个文件中包含的 300M 行数据是使用 Dask 估算的。该过程的图形表明: 1. 均值和最频繁的输入器是运行并行的; 2. 所有十个文件也并行处理。
设置
为了准备大量的数据,将你问题中的三行数据复制,组成一个30M行的数据框。数据框保存在十个不同的文件中,总共产生 300M 行,这些行的统计数据与您的问题相同。
import numpy as np
import pandas as pd
N = 10000000
weight = np.array([100, np.nan, 70]*N)
age = np.array([2, np.nan, 7]*N)
height = np.array([5, np.nan, 5]*N)
df = pd.DataFrame({'Weight': weight, 'Age': age, 'Height': height})
# Save ten large data frames to disk
for i in range(10):
df.to_parquet(f'./df_to_impute_{i}.parquet', compression='gzip',
index=False)
Dask 插补
import graphviz
import dask
import dask.dataframe as dd
from dask_ml.impute import SimpleImputer
# Read all files for imputation in a dask data frame from a specific directory
df = dd.read_parquet('./df_to_impute_*.parquet')
# Set up the imputers and columns
mean_imputer = SimpleImputer(strategy='mean')
mostfreq_imputer = SimpleImputer(strategy='most_frequent')
imputers = [mean_imputer, mostfreq_imputer]
mean_cols = ['Weight', 'Age']
freq_cols = ['Height']
columns = [mean_cols, freq_cols]
# Create a new data frame with imputed values, then visualize the computation.
df_list = []
for imputer, col in zip(imputers, columns):
df_list.append(imputer.fit_transform(df.loc[:, col]))
imputed_df = dd.concat(df_list, axis=1)
imputed_df.visualize(filename='imputed.svg', rankdir='LR')
# Save the new data frame to disk
imputed_df.to_parquet('imputed_df.parquet', compression='gzip')
输出
imputed_df.head()
Weight Age Height
0 100.0 2.0 5.0
1 85.0 4.5 5.0
2 70.0 7.0 5.0
3 100.0 2.0 5.0
4 85.0 4.5 5.0
# Check the summary statistics make sense - 300M rows and stats as expected
imputed_df.describe().compute()
Weight Age Height
count 3.000000e+08 3.000000e+08 300000000.0
mean 8.500000e+01 4.500000e+00 5.0
std 1.224745e+01 2.041241e+00 0.0
min 7.000000e+01 2.000000e+00 5.0
25% 7.000000e+01 2.000000e+00 5.0
50% 8.500000e+01 4.500000e+00 5.0
75% 1.000000e+02 7.000000e+00 5.0
max 1.000000e+02 7.000000e+00 5.0