为什么当我增加 Dask worker 的数量时,Featuretools 会变慢?

Why does Featuretools slows down when I increase the number of Dask workers?

我使用的是具有 72 个内核和 144 GB RAM 的 Amazon SageMaker Notebook,我对整个数据样本进行了 2 次测试,以检查 Dask 集群是否正常工作。

样本有来自 5 个不同 "assets" 的 4500 行和 735 列(我的意思是每个资产有 147 列)。该代码正在过滤列并为每个过滤后的数据框创建一个特征矩阵。

首先我初始化集群如下,接收了72个worker,得到了17分钟的运行。 (我假设我创建了 72 个工人,每个工人有一个核心。)

    from dask.distributed import Client, LocalCluster
    cluster = LocalCluster(processes=True,n_workers=72,threads_per_worker=72)

    def main():
      import featuretools as ft
      list_columns = list(df_concat_02.columns)

      list_df_features=[]
      from tqdm.notebook import tqdm

      for asset in tqdm(list_columns,total=len(list_columns)):
        dataframe = df_sma.filter(regex="^"+asset, axis=1).reset_index()

        es = ft.EntitySet()  
        es = es.entity_from_dataframe(entity_id = 'MARKET', dataframe =dataframe, 
                                      index = 'index', 
                                      time_index = 'Date')
        fm, features = ft.dfs(entityset=es, 
                              target_entity='MARKET',
                              trans_primitives = ['divide_numeric'],
                              agg_primitives = [],
                              max_depth=1,
                              verbose=True,
                              dask_kwargs={'cluster': client.scheduler.address}

                              )
        list_df_features.append(fm)
      return list_df_features

    if __name__ == "__main__":
        list_df = main()

其次,我初始化集群如下,接收了9个worker,得到了3.5分钟的运行。 (我假设我创建了 9 个工人,每个工人有 8 个核心。)

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=True)

def main():
  import featuretools as ft
  list_columns = list(df_concat_02.columns)

  list_df_features=[]
  from tqdm.notebook import tqdm

  for asset in tqdm(list_columns,total=len(list_columns)):
    dataframe = df_sma.filter(regex="^"+asset, axis=1).reset_index()

    es = ft.EntitySet()  
    es = es.entity_from_dataframe(entity_id = 'MARKET', dataframe =dataframe, 
                                  index = 'index', 
                                  time_index = 'Date')
    fm, features = ft.dfs(entityset=es, 
                          target_entity='MARKET',
                          trans_primitives = ['divide_numeric'],
                          agg_primitives = [],
                          max_depth=1,
                          verbose=True,
                          dask_kwargs={'cluster': client.scheduler.address}

                          )
    list_df_features.append(fm)
  return list_df_features

if __name__ == "__main__":
    list_df = main()

对我来说,这是令人兴奋的,因为我认为 72 名工人可以更快地完成工作!一旦我既不是 Dask 也不是 FeatureTools 的专家,我想我设置错了。

如有任何帮助和建议,我将不胜感激!

谢谢!

您在 DFS 中正确设置了 dask_kwargs。我认为速度变慢的原因是每个工作人员的额外开销和更少的内核。工人越多,传输数据的开销就越大。此外,可以利用来自 1 个工作人员的 8 个核心进行计算 运行 比来自 8 个工作人员的 1 个核心更快。