为什么当我增加 Dask worker 的数量时,Featuretools 会变慢?
Why does Featuretools slows down when I increase the number of Dask workers?
我使用的是具有 72 个内核和 144 GB RAM 的 Amazon SageMaker Notebook,我对整个数据样本进行了 2 次测试,以检查 Dask 集群是否正常工作。
样本有来自 5 个不同 "assets" 的 4500 行和 735 列(我的意思是每个资产有 147 列)。该代码正在过滤列并为每个过滤后的数据框创建一个特征矩阵。
首先我初始化集群如下,接收了72个worker,得到了17分钟的运行。 (我假设我创建了 72 个工人,每个工人有一个核心。)
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=True,n_workers=72,threads_per_worker=72)
def main():
import featuretools as ft
list_columns = list(df_concat_02.columns)
list_df_features=[]
from tqdm.notebook import tqdm
for asset in tqdm(list_columns,total=len(list_columns)):
dataframe = df_sma.filter(regex="^"+asset, axis=1).reset_index()
es = ft.EntitySet()
es = es.entity_from_dataframe(entity_id = 'MARKET', dataframe =dataframe,
index = 'index',
time_index = 'Date')
fm, features = ft.dfs(entityset=es,
target_entity='MARKET',
trans_primitives = ['divide_numeric'],
agg_primitives = [],
max_depth=1,
verbose=True,
dask_kwargs={'cluster': client.scheduler.address}
)
list_df_features.append(fm)
return list_df_features
if __name__ == "__main__":
list_df = main()
其次,我初始化集群如下,接收了9个worker,得到了3.5分钟的运行。 (我假设我创建了 9 个工人,每个工人有 8 个核心。)
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=True)
def main():
import featuretools as ft
list_columns = list(df_concat_02.columns)
list_df_features=[]
from tqdm.notebook import tqdm
for asset in tqdm(list_columns,total=len(list_columns)):
dataframe = df_sma.filter(regex="^"+asset, axis=1).reset_index()
es = ft.EntitySet()
es = es.entity_from_dataframe(entity_id = 'MARKET', dataframe =dataframe,
index = 'index',
time_index = 'Date')
fm, features = ft.dfs(entityset=es,
target_entity='MARKET',
trans_primitives = ['divide_numeric'],
agg_primitives = [],
max_depth=1,
verbose=True,
dask_kwargs={'cluster': client.scheduler.address}
)
list_df_features.append(fm)
return list_df_features
if __name__ == "__main__":
list_df = main()
对我来说,这是令人兴奋的,因为我认为 72 名工人可以更快地完成工作!一旦我既不是 Dask 也不是 FeatureTools 的专家,我想我设置错了。
如有任何帮助和建议,我将不胜感激!
谢谢!
您在 DFS 中正确设置了 dask_kwargs
。我认为速度变慢的原因是每个工作人员的额外开销和更少的内核。工人越多,传输数据的开销就越大。此外,可以利用来自 1 个工作人员的 8 个核心进行计算 运行 比来自 8 个工作人员的 1 个核心更快。
我使用的是具有 72 个内核和 144 GB RAM 的 Amazon SageMaker Notebook,我对整个数据样本进行了 2 次测试,以检查 Dask 集群是否正常工作。
样本有来自 5 个不同 "assets" 的 4500 行和 735 列(我的意思是每个资产有 147 列)。该代码正在过滤列并为每个过滤后的数据框创建一个特征矩阵。
首先我初始化集群如下,接收了72个worker,得到了17分钟的运行。 (我假设我创建了 72 个工人,每个工人有一个核心。)
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=True,n_workers=72,threads_per_worker=72)
def main():
import featuretools as ft
list_columns = list(df_concat_02.columns)
list_df_features=[]
from tqdm.notebook import tqdm
for asset in tqdm(list_columns,total=len(list_columns)):
dataframe = df_sma.filter(regex="^"+asset, axis=1).reset_index()
es = ft.EntitySet()
es = es.entity_from_dataframe(entity_id = 'MARKET', dataframe =dataframe,
index = 'index',
time_index = 'Date')
fm, features = ft.dfs(entityset=es,
target_entity='MARKET',
trans_primitives = ['divide_numeric'],
agg_primitives = [],
max_depth=1,
verbose=True,
dask_kwargs={'cluster': client.scheduler.address}
)
list_df_features.append(fm)
return list_df_features
if __name__ == "__main__":
list_df = main()
其次,我初始化集群如下,接收了9个worker,得到了3.5分钟的运行。 (我假设我创建了 9 个工人,每个工人有 8 个核心。)
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=True)
def main():
import featuretools as ft
list_columns = list(df_concat_02.columns)
list_df_features=[]
from tqdm.notebook import tqdm
for asset in tqdm(list_columns,total=len(list_columns)):
dataframe = df_sma.filter(regex="^"+asset, axis=1).reset_index()
es = ft.EntitySet()
es = es.entity_from_dataframe(entity_id = 'MARKET', dataframe =dataframe,
index = 'index',
time_index = 'Date')
fm, features = ft.dfs(entityset=es,
target_entity='MARKET',
trans_primitives = ['divide_numeric'],
agg_primitives = [],
max_depth=1,
verbose=True,
dask_kwargs={'cluster': client.scheduler.address}
)
list_df_features.append(fm)
return list_df_features
if __name__ == "__main__":
list_df = main()
对我来说,这是令人兴奋的,因为我认为 72 名工人可以更快地完成工作!一旦我既不是 Dask 也不是 FeatureTools 的专家,我想我设置错了。
如有任何帮助和建议,我将不胜感激!
谢谢!
您在 DFS 中正确设置了 dask_kwargs
。我认为速度变慢的原因是每个工作人员的额外开销和更少的内核。工人越多,传输数据的开销就越大。此外,可以利用来自 1 个工作人员的 8 个核心进行计算 运行 比来自 8 个工作人员的 1 个核心更快。